/* Python Module "psem" : ChiScan Mobility multi-processor Job Controller * * Presented under terms of the GNU / FSF GPL - Copyright Jason Vas Dias 2006+ * * Provides Python module methods: * psem.fork() : * Replacement for os.fork() that ONLY allows N processes globally * for all python modules that use it, where N == { number of CPUs }. * Returns: -1 : fork disallowed (too many processes) or fork() error * 0 : fork succeeded, receiver is the child process * >0 : pid: fork succeeded, receiver is the parent process * psem.controller(): * Returns the pid of the "process group controlling process", the * first process to successfully fork a child. * psem.n_jobs(): * Returns current value of the semaphore. * psem.n_processors(): * Returns initial value of semaphore (number of CPUs on machine). * psem.n_kids(): * Returns current number of children of this process. * psem.wait([optional pid], [optional non-block boolean]) : * If this process has psem.fork()ed jobs, does a waitpid(-1,0,WNOHANG) * Accepts optional ( {pid to wait for}, {block : 1|0} ) parameters. * Returns what waitpid(2) returns. * * This Python module is implemented in "C" because of lack of support for SysV-IPC * semaphores by any Python module shipped by default (it also simplifies the * implementation!). * * A single semaphore in the set mapped to the "PSEM_SEM_KEY" is used; * it is initialized ONCE to the number of processors on the system. * A successfully fork()-ed child process atomically decrements the * semaphore with the "SEM_UNDO" option set; thus, even if the process * is killed with -KILL (9), the semaphore is automatically incremented * by the kernel when the process exits; this avoids extreme complications * about how/when to re-initialize the semaphore . * * Jason Vas Dias August 2008 */ #include #include #include #include #include #include #include #include #include #include #include #ifndef PSEM_SEM_KEY #define PSEM_SEM_KEY 0x434d4f42 /* hex of ascii for "PSEM" - if this hard-coded default presents * problems, compile with -DPSEM_SEM_KEY={new key} */ #endif /* NOTE: Every object defined herein EXCEPT the initpsem Python initialization routine MUST have static linkage !! */ #ifdef PSEM_DEBUG static char *timestamp(void) { static char buf[512]; struct timeval tv; struct tm tm; gettimeofday(&tv,0); localtime_r( &(tv.tv_sec), &tm ); snprintf(buf, 512, "%0.4u-%0.2u-%0.2u_%0.2u:%0.2u:%0.2u.%0.9lu", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, tv.tv_usec ); return buf; } #define Log( fmt, ... ) fprintf(stderr, "%s: " fmt "\n", timestamp(), ## __VA_ARGS__) #else #define Log( fmt, ... ) #endif static int n_processors = -1, psem_semid = -1, original_parent = -1, n_kids = 0, job = 0, parent = -1; static void psem_initialize(void) { if( n_processors == -1 ) n_processors = sysconf(_SC_NPROCESSORS_ONLN); if( original_parent == -1 ) original_parent = getpid(); if( parent == -1 ) parent = getpid(); else parent = getppid(); job=0; } #define BLOCK 1 #define ALL 2 #define ANY -1 static void psem_wait(int pid, unsigned block) { int s,r, p =getpid(); do { errno = 0; r = waitpid(pid, &s, block ? 0 : WNOHANG ); }while(((r == -1) && ( (errno == EINTR) || ( (block == ALL) && (errno != ECHILD) ) ) )|| ((r > 0) && (block == ALL) ) ); Log("wait for %d by %d - r:%d b:%d e:%d", pid, p, r, block, errno); if( r > 0 ) { n_kids = n_kids ? (n_kids - 1) : 0; Log("pid %d of parent %d exited: %x", r, getpid(), s ); } } static PyObject* psem_fork ( void ) { struct sembuf sop; int r = 0, pid = -1, semval = 0; char pid_buf[16] = ""; psem_initialize(); if ( psem_semid == -1 ) return Py_BuildValue("i", pid); if( n_kids > 0 ) psem_wait(-1,0); if( getpid() == original_parent ) snprintf(pid_buf, 16, "%d", original_parent); else snprintf(pid_buf, 16, "%d", job); semval = semctl(psem_semid, 0, GETVAL); Log("process %d new job %d %s",getpid(), semval, pid_buf); if( semval > 0 ) { pid = fork(); if ( pid == 0 ) { PyOS_AfterFork(); memset(&sop,'\0',sizeof(sop)); sop.sem_op = -1; sop.sem_flg= IPC_NOWAIT | SEM_UNDO; do r = semop( psem_semid, &sop, 1 ) ; while( (r == -1) && (errno == EINTR)); do r = semctl(psem_semid, 0, GETVAL); while( (r == -1) && (errno == EINTR)); job = n_processors - r; parent = getppid(); n_kids = 0; Log("new process %d %s %s", getpid(), parent == original_parent ? "of controller" : "of job", pid_buf ); }else if( pid == -1 ) Log("fork() failed: %d %s", errno, strerror(errno)); else { if( n_kids > 0 ) psem_wait(-1,0); n_kids += 1; } } return Py_BuildValue("i", pid); } static PyObject* psem_py_wait( PyObject *unused_self, PyObject *args ) { int pid=-1, blocking=0, ok=0; if( args ) ok = PyArg_ParseTuple(args, "|ii:psem_wait", &pid, &blocking); if( !ok ) { pid = -1; blocking = 0; } psem_wait(pid, blocking); Py_RETURN_NONE; } static PyObject* psem_job(void) { return Py_BuildValue("i", job); } static PyObject* psem_n_jobs(void) { return Py_BuildValue("i", (psem_semid > -1) ? (n_processors - semctl(psem_semid, 0, GETVAL)) : 0 ); } static PyObject* psem_n_kids(void) { return Py_BuildValue("i", n_kids); } static PyObject* psem_controller(void) { return Py_BuildValue("i", original_parent); } static PyMethodDef psem_methods[] = { {"fork", (void*)psem_fork, METH_NOARGS, "A fork() which only succeeds when the number of processors have not been exceeded.\n" "Returns: >0 in the parent or 0 in the child on successful fork(), -1 otherwise ." }, {"wait", (void*)psem_py_wait, METH_VARARGS, "Parameters: (pid, blocking): optional pid to wait for, optional boolean block: don't use WNOHANG" }, {"job",(void*)psem_job, METH_NOARGS, "Returns Job number of this process." }, {"n_jobs",(void*)psem_n_jobs, METH_NOARGS, "Returns current value of semaphore." }, {"n_kids",(void*)psem_n_kids, METH_NOARGS, "Returns number of processes this process has forked with psem.fork()." }, {"controller",(void*)psem_controller, METH_NOARGS, "Returns process group controller pid." }, {NULL, NULL, 0, NULL} /* Sentinel */ }; void initpsem(void) { PyObject *m = Py_InitModule("psem", psem_methods); int r = 0; union { int val; void *ptr; } s_un; if ((m==0) || (PyErr_Occurred())) Py_FatalError("can't initialize module psem"); psem_initialize(); PyModule_AddIntConstant(m, "BLOCK", 1); PyModule_AddIntConstant(m, "ALL", 2); PyModule_AddIntConstant(m, "ANY", -1); errno = 0; do psem_semid = semget(PSEM_SEM_KEY, 1, 0660); while ( (psem_semid == -1) && (errno == EINTR)); Log("psem initialization - semid: %d\n",psem_semid); if( ( psem_semid == -1 ) && ( errno == ENOENT ) ) { /* semaphore does NOT exist; create and initialize */ psem_semid = semget(PSEM_SEM_KEY, 1, IPC_CREAT | 0660); if( psem_semid != -1 ) { /* initialize to N_PROCESSORS */ s_un.val = n_processors; errno = 0; do r = semctl(psem_semid, 0, SETVAL, s_un); while((r == -1) && (errno == EINTR)); } } if( psem_semid > -1 ) Log("semaphore value: %d\n", semctl(psem_semid, 0, GETVAL)); else { fprintf(stderr, "psem module unable to obtain semid - %d %s\n",errno,strerror(errno)); } }