diff options
author | Igor Pashev <pashev.igor@gmail.com> | 2012-06-24 22:28:35 +0000 |
---|---|---|
committer | Igor Pashev <pashev.igor@gmail.com> | 2012-06-24 22:28:35 +0000 |
commit | 3950ffe2a485479f6561c27364d3d7df5a21d124 (patch) | |
tree | 468c6e14449d1b1e279222ec32f676b0311917d2 /src/lib/libcoshell/cowait.c | |
download | ksh-upstream.tar.gz |
Imported Upstream version 93u+upstream
Diffstat (limited to 'src/lib/libcoshell/cowait.c')
-rw-r--r-- | src/lib/libcoshell/cowait.c | 411 |
1 files changed, 411 insertions, 0 deletions
diff --git a/src/lib/libcoshell/cowait.c b/src/lib/libcoshell/cowait.c new file mode 100644 index 0000000..ccc98d3 --- /dev/null +++ b/src/lib/libcoshell/cowait.c @@ -0,0 +1,411 @@ +/*********************************************************************** +* * +* This software is part of the ast package * +* Copyright (c) 1990-2011 AT&T Intellectual Property * +* and is licensed under the * +* Eclipse Public License, Version 1.0 * +* by AT&T Intellectual Property * +* * +* A copy of the License is available at * +* http://www.eclipse.org/org/documents/epl-v10.html * +* (with md5 checksum b35adb5213ca9657e911e9befb180842) * +* * +* Information and Software Systems Research * +* AT&T Research * +* Florham Park NJ * +* * +* Glenn Fowler <gsf@research.att.com> * +* * +***********************************************************************/ +#pragma prototyped +/* + * Glenn Fowler + * AT&T Research + * + * wait for and return status of job or the next coshell job that completes + * job==co for non-blocking wait + */ + +#include "colib.h" + +#include <ctype.h> + +/* + * cat and remove fd {1,2} serialized output + */ + +static void +cat(Cojob_t* job, char** path, Sfio_t* op) +{ + Sfio_t* sp; + + if (sp = sfopen(NiL, *path, "r")) + { + sfmove(sp, op, SF_UNBOUND, -1); + sfclose(sp); + } + else + errormsg(state.lib, ERROR_LIBRARY|2, "%s: cannot open job %d serialized output", *path, job->id); + remove(*path); + free(*path); + *path = 0; +} + +/* + * the number of running+zombie jobs + * these would count against --jobs or NPROC + */ + +int +cojobs(Coshell_t* co) +{ + int any; + int n; + + if (co) + any = 0; + else if (!(co = state.coshells)) + return -1; + else + any = 1; + n = 0; + do + { + n += co->outstanding; + } while (any && (co = co->next)); + return n; +} + +/* + * the number of pending cowait()'s + */ + +int +copending(Coshell_t* co) +{ + int any; + int n; + + if (co) + any = 0; + else if (!(co = state.coshells)) + return -1; + else + any = 1; + n = 0; + do + { + n += co->outstanding + co->svc_outstanding; + } while (any && (co = co->next)); + return n; +} + +/* + * the number of completed jobs not cowait()'d for + * cowait() always reaps the zombies first + */ + +int +cozombie(Coshell_t* co) +{ + int any; + int n; + + if (co) + any = 0; + else if (!(co = state.coshells)) + return -1; + else + any = 1; + n = 0; + do + { + n += (co->outstanding + co->svc_outstanding) - (co->running + co->svc_running); + } while (any && (co = co->next)); + return n; +} + +Cojob_t* +cowait(register Coshell_t* co, Cojob_t* job, int timeout) +{ + register char* s; + register Cojob_t* cj; + register Coservice_t* cs; + register ssize_t n; + char* b; + char* e; + unsigned long user; + unsigned long sys; + int active; + int any; + int id; + int loop; + int to; + int type; + char buf[128]; + + static unsigned long serial = 0; + + serial++; + if (co || job && (co = job->coshell)) + any = 0; + else if (!(co = state.coshells)) + goto echild; + else + any = 1; + + /* + * first drain the zombies + */ + + active = 0; + to = timeout >= 0 ? timeout : 60 * 1000; + zombies: + do + { +#if 0 + errormsg(state.lib, 2, "coshell %d zombie wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d>", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running); +#endif + if ((co->outstanding + co->svc_outstanding) > (co->running + co->svc_running)) + for (cj = co->jobs; cj; cj = cj->next) + if (cj->pid == CO_PID_ZOMBIE && (!job || cj == job)) + { + cj->pid = CO_PID_FREE; + if (cj->service) + co->svc_outstanding--; + else + co->outstanding--; +#if 0 + errormsg(state.lib, 2, "coshell %d zombie wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d> reap job %d", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running, cj->id); +#endif + return cj; + } + else if (cj->service && !cj->service->pid) + { + cj->pid = CO_PID_ZOMBIE; + cj->status = 2; + cj->service = 0; + co->svc_running--; + } + if (co->running > 0) + active = 1; + else if (co->svc_running > 0) + { + n = 0; + for (cs = co->service; cs; cs = cs->next) + if (cs->pid && kill(cs->pid, 0)) + { + cs->pid = 0; + close(cs->fd); + cs->fd = -1; + n = 1; + } + if (n) + goto zombies; + active = 1; + } + } while (any && (co = co->next)); + + /* + * reap the active jobs + */ + + if (!active) + goto echild; + if (any) + co = state.coshells; + do + { + loop = 0; + for (;;) + { + if (co->flags & CO_DEBUG) + { + loop++; + errormsg(state.lib, 2, "coshell %d wait %lu.%d timeout=%d outstanding=<%d,%d> running=<%d,%d>", co->index, serial, loop, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running); + for (cj = co->jobs; cj; cj = cj->next) + if (cj->pid != CO_PID_FREE) + errormsg(state.lib, 2, "\tjob %d pid=%d status=%d", cj->id, cj->pid, cj->status); + } + if (co->running <= 0) + break; + while ((n = sfpoll(&co->msgfp, 1, to)) < 1) + { + if (n < 0) + { + if (errno == EINTR) + return 0; + break; + } + if (timeout >= 0) + break; + + /* + * check for a killed job with no status + */ + + for (cj = co->jobs; cj; cj = cj->next) + if (cj->pid > 0) + { + n = sfsprintf(buf, sizeof(buf), "kill -0 %d 2>/dev/null || echo k %d `wait %d 2>/dev/null; echo $?` >&$%s\n", cj->pid, cj->id, cj->pid, CO_ENV_MSGFD); + write(co->cmdfd, buf, n); + break; + } + } + + /* + * get one coshell message + */ + + if (!(s = b = sfgetr(co->msgfp, '\n', 1))) + break; +#if 0 + errormsg(state.lib, 2, "coshell %d active wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d>", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running); +#endif + + /* + * read and parse a coshell message packet of the form + * + * <type> <id> <args> <newline> + * %c %d %s %c + */ + + while (isspace(*s)) + s++; + if (!(type = *s) || type != 'a' && type != 'j' && type != 'k' && type != 'x') + goto invalid; + while (*++s && !isspace(*s)); + id = strtol(s, &e, 10); + if (*e && !isspace(*e)) + goto invalid; + for (s = e; isspace(*s); s++); + + /* + * locate id in the job list + */ + + for (cj = co->jobs; cj; cj = cj->next) + if (id == cj->id) + break; + if ((co->flags | (cj ? cj->flags : 0)) & CO_DEBUG) + errormsg(state.lib, 2, "coshell %d message \"%c %d %s\"", co->index, type, id, s); + if (!cj) + { + if (type == 'k') + continue; + errormsg(state.lib, 2, "coshell %d job id %d not found [%s]", co->index, id, b); + errno = ESRCH; + return 0; + } + + /* + * now interpret the message + */ + + switch (type) + { + + case 'a': + /* + * coexec() ack + */ + + if (cj == job) + return cj; + break; + + case 'j': + /* + * <s> is the job pid + */ + + n = cj->pid; + cj->pid = strtol(s, NiL, 10); + if (n == CO_PID_WARPED) + goto nuke; + break; + + case 'k': + /* + * <s> is a synthesized killed status + */ + + if (cj->pid < 0) + continue; + /*FALLTHROUGH*/ + + case 'x': + /* + * <s> is the job exit code and user,sys times + */ + + cj->status = strtol(s, &e, 10); + user = sys = 0; + for (;;) + { + if (e <= s) + break; + for (s = e; isalpha(*s) || isspace(*s); s++); + user += strelapsed(s, &e, CO_QUANT); + if (e <= s) + break; + for (s = e; isalpha(*s) || isspace(*s); s++); + sys += strelapsed(s, &e, CO_QUANT); + } + cj->user += user; + cj->sys += sys; + co->user += user; + co->sys += sys; + if (cj->out) + cat(cj, &cj->out, sfstdout); + if (cj->err) + cat(cj, &cj->err, sfstderr); + if (cj->pid > 0 || cj->service || (co->flags & (CO_INIT|CO_SERVER))) + { + nuke: + if (cj->pid > 0 && type != 'k') + { + /* + * nuke the zombies + */ + + n = sfsprintf(buf, sizeof(buf), "wait %d\n", cj->pid); + write(co->cmdfd, buf, n); + } + if (cj->service) + co->svc_running--; + else + co->running--; + if (!job || cj == job) + { + cj->pid = CO_PID_FREE; + if (cj->service) + co->svc_outstanding--; + else + co->outstanding--; +#if 0 + errormsg(state.lib, 2, "coshell %d active wait %lu timeout=%d outstanding=<%d,%d> running=<%d,%d> reap job %d", co->index, serial, timeout, co->outstanding, co->svc_outstanding, co->running, co->svc_running, cj->id); +#endif + return cj; + } + cj->pid = CO_PID_ZOMBIE; + } + else + cj->pid = CO_PID_WARPED; + break; + + } + } + } while (any && (co = co->next)); + return 0; + echild: +#if 0 + errormsg(state.lib, 2, "coshell wait ECHILD"); +#endif + errno = ECHILD; + return 0; + invalid: + errormsg(state.lib, 2, "coshell %d invalid message \"%-.*s>>>%s<<<\"", co->index, s - b, b, s); + errno = EINVAL; + return 0; +} |