We are hiring ! See our job offers.
Revision 3d21cbc109aa07b0381edc505de0ce86c017a1ef authored by Simon Urbanek on 08 August 1977, 00:00:00 UTC, committed by Gabor Csardi on 08 August 1977, 00:00:00 UTC
1 parent a37f1e7
Raw File
fork.c
/* multicore R package

   fork.c
   interface to system-level tools for sawning copies of the current
   process and IPC

   (C)Copyright 2008 Simon Urbanek

   see package DESCRIPTION for licensing terms */

#include <sys/types.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/wait.h>
#include <signal.h>

#include <R.h>
#include <Rinternals.h>

/* use printf instead of Rprintf for debugging to avoid forked console interactions */
#define Dprintf printf

typedef struct child_info {
	pid_t pid;
	int pfd, sifd;
	struct child_info *next;
} child_info_t;

static child_info_t children, *last_child = &children;

static int master_fd = -1;
static int is_master = 1;

static int rm_child_(int pid) {
	child_info_t *ci = &children, *prev = 0;
#ifdef MC_DEBUG
	Dprintf("removing child %d\n", pid);
#endif
	while (ci) {
		if (ci->pid == pid) {
			/* make sure we close all descriptors */
			if (ci->pfd > 0) { close(ci->pfd); ci->pfd = -1; }
			if (ci->sifd > 0) { close(ci->sifd); ci->sifd = -1; }
			/* now remove it from the list */
			if (!prev) { /* ci is actually children */
				if (ci->next) { /* there is a next? copy it into children */
					child_info_t *next0 = ci->next;					
					children = *next0;
					free(next0);
				} else { /* there is no next? reset everything */
					children.pid = 0;
					children.sifd = -1;
					children.pfd = -1;
					children.next = 0;
				}
				last_child = &children;
			} else {
				prev->next = ci->next;
				if (last_child == ci) last_child = prev;
				free(ci);
			}
			kill(pid, SIGUSR1); /* send USR1 to the child to make sure it exits */
			return 1;
		}
		prev = ci;
		ci = ci->next;
	}
#ifdef MC_DEBUG
	Dprintf("WARNING: child %d was to be removed but it doesn't exist\n", pid);
#endif
	return 0;
}

#ifndef STDIN_FILENO
#define STDIN_FILENO 0
#endif
#ifndef STDOUT_FILENO
#define STDOUT_FILENO 1
#endif
#ifndef STDERR_FILENO
#define STDERR_FILENO 2
#endif

static int child_can_exit = 0, child_exit_status = -1;

static void child_sig_handler(int sig) {
	if (sig == SIGUSR1) {
#ifdef MC_DEBUG
		Dprintf("child process %d got SIGUSR1; child_exit_status=%d\n", getpid(), child_exit_status);
#endif
		child_can_exit = 1;
		if (child_exit_status >= 0)
			exit(child_exit_status);
	}
}

SEXP mc_fork() {
	int pipefd[2];
	int sipfd[2];
	pid_t pid;
	SEXP res = allocVector(INTSXP, 3);
	int *res_i = INTEGER(res);
	if (pipe(pipefd)) error("Unable to create a pipe.");
	if (pipe(sipfd)) {
		close(pipefd[0]); close(pipefd[1]);
		error("Unable to create a pipe.");
	}
	pid = fork();
	if (pid == -1) {
		close(pipefd[0]); close(pipefd[1]);
		close(sipfd[0]); close(sipfd[1]);
		error("Unable to fork.");
	}
	res_i[0] = (int) pid;
	if (pid == 0) { /* child */
		close(pipefd[0]); /* close read end */
		master_fd = res_i[1] = pipefd[1];
		is_master = 0;
		/* re-map stdin */
		dup2(sipfd[0], STDIN_FILENO);
		close(sipfd[0]);
		/* master uses USR1 to signal that the child process can terminate */
		child_exit_status = -1;
		child_can_exit = 0;
		signal(SIGUSR1, child_sig_handler);
#ifdef MC_DEBUG
		Dprintf("child process %d started\n", getpid());
#endif
		
	} else { /* master process */
		child_info_t *ci;
		close(pipefd[1]); /* close write end of the data pipe */
		close(sipfd[0]);  /* close read end of the child-stdin pipe */
		res_i[1] = pipefd[0];
		res_i[2] = sipfd[1];
#ifdef MC_DEBUG
		Dprintf("parent registers new child %d\n", pid);
#endif
		/* register the new child and its pipes */
		if (children.pid == 0)
			ci = &children;
		else
			last_child->next = ci = (child_info_t*) malloc(sizeof(child_info_t));
		if (!ci) error("Memory allocation error.");
		ci->pid = pid;
		ci->pfd = pipefd[0];
		ci->sifd= sipfd[1];
		ci->next = 0;
		last_child = ci;
	}
	return res;
}

SEXP close_stdout() {
	close(STDOUT_FILENO);
	return R_NilValue;
}

SEXP close_stderr() {
	close(STDERR_FILENO);
	return R_NilValue;
}

SEXP close_fds(SEXP sFDS) {
	int *fd, fds, i = 0;
	if (TYPEOF(sFDS) != INTSXP) error("descriptors must be integers");
	fds = LENGTH(sFDS);
	fd = INTEGER(sFDS);
	while (i < fds) close(fd[i++]);
	return ScalarLogical(1);
}

SEXP send_master(SEXP what) {
	unsigned char *b;
	unsigned int len = 0, i = 0;
	if (is_master) error("only children can send data to the master process");
	if (master_fd == -1) error("there is no pipe to the master process");
	if (TYPEOF(what) != RAWSXP) error("content to send must be RAW, use serialize if needed");
	len = LENGTH(what);
	b = RAW(what);
#ifdef MC_DEBUG
	Dprintf("child %d: send_master (%d bytes)\n", getpid(), len);
#endif
	if (write(master_fd, &len, sizeof(len)) != sizeof(len)) {
		close(master_fd);
		master_fd = -1;
		error("write error, closing pipe to the master");
	}
	while (i < len) {
		int n = write(master_fd, b + i, len - i);
		if (n < 1) {
			close(master_fd);
			master_fd = -1;
			error("write error, closing pipe to the master");
		}
		i += n;
	}
	return ScalarLogical(1);
}

SEXP send_child_stdin(SEXP sPid, SEXP what) {
	unsigned char *b;
	unsigned int len = 0, i = 0, fd;
	int pid = asInteger(sPid);
	if (!is_master) error("only master (parent) process can send data to a child process");
	if (TYPEOF(what) != RAWSXP) error("what must be a raw vector");
	child_info_t *ci = &children;
	while (ci) {
		if (ci->pid == pid) break;
		ci = ci -> next;
	}
	if (!ci) error("child %d doesn't exist", pid);
	len = LENGTH(what);
	b = RAW(what);
	fd = ci -> sifd;
	while (i < len) {
		int n = write(fd, b + i, len - i);
		if (n < 1)
			error("write error");
		i += n;
	}
	return ScalarLogical(1);
}

SEXP select_children(SEXP sTimeout, SEXP sWhich) {
	int maxfd = 0, sr, wstat, zombies = 0;
	unsigned int wlen = 0, wcount = 0;
	SEXP res;
	int *res_i, *which = 0;
	child_info_t *ci = &children;
	fd_set fs;
	struct timeval tv = { 0, 0 }, *tvp = &tv;
	if (isReal(sTimeout) && LENGTH(sTimeout) == 1) {
		double tov = asReal(sTimeout);
		if (tov < 0.0) tvp = 0; /* Note: I'm not sure we really should allow this .. */
		else {
			tv.tv_sec = (int) tov;
			tv.tv_usec = (int) ((tov - ((double) tv.tv_sec)) * 1000000.0);
		}
	}
	if (TYPEOF(sWhich) == INTSXP && LENGTH(sWhich)) {
		which = INTEGER(sWhich);
		wlen = LENGTH(sWhich);
	}
	while (waitpid(-1, &wstat, WNOHANG) > 0) {}; /* check for zombies */
	FD_ZERO(&fs);
	while (ci && ci->pid) {
		if (ci->pfd == -1) zombies++;
		if (ci->pfd > maxfd) maxfd = ci->pfd;
		if (ci->pfd > 0) {
			if (which) { /* check for the FD only if it's on the list */
				unsigned int k = 0;
				while (k < wlen) if (which[k++] == ci->pid) { FD_SET(ci->pfd, &fs); wcount++; break; }
			} else
				FD_SET(ci->pfd, &fs);
		}
		ci = ci -> next;
	}
#ifdef MC_DEBUG
	Dprintf("select_children: maxfd=%d, wlen=%d, wcount=%d, zombies=%d, timeout=%d:%d\n", maxfd, wlen, wcount, zombies, tv.tv_sec, tv.tv_usec);
#endif
	if (zombies) { /* oops, this should never really hapen - it did while we had a bug in rm_child_ but hopefully not anymore */
		while (zombies) { /* this is rather more complicated than it should be if we used pointers to delete, but well ... */
			ci = &children;
			while (ci) {
				if (ci->pfd == -1) {
#ifdef MC_DEBUG
					Dprintf("detected zombie: pid=%d, pfd=%d, sifd=%d\n", ci->pid, ci->pfd, ci->sifd);
#endif
					rm_child_(ci->pid);
					zombies--;
					break;
				}
				ci = ci->next;
			}
			if (!ci) break;
		}
	}
	if (maxfd == 0 || (wlen && !wcount)) return R_NilValue; /* NULL signifies no children to tend to */
	sr = select(maxfd + 1, &fs, 0, 0, tvp);
#ifdef MC_DEBUG
	Dprintf("  sr = %d\n", sr);
#endif
	if (sr < 0) {
		perror("select");
		return ScalarLogical(0); /* FALSE on select error */
	}
	if (sr < 1) return ScalarLogical(1); /* TRUE on timeout */
	ci = &children;
	maxfd = 0;
	while (ci && ci->pid) { /* pass 1 - count the FDs (in theory not necessary since that's what select should have returned)  */
		if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) maxfd++;
		ci = ci -> next;
	}
	ci = &children;
#ifdef MC_DEBUG
	Dprintf(" - read select %d children: ", maxfd);
#endif
	res = allocVector(INTSXP, maxfd);
	res_i = INTEGER(res);
	while (ci && ci->pid) { /* pass 2 - fill the array */
		if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) (res_i++)[0] = ci->pid;
#ifdef MC_DEBUG
		if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) Dprintf("%d ", ci->pid);
#endif
		ci = ci -> next;
	}
#ifdef MC_DEBUG
	Dprintf("\n");
#endif
	return res;
}

static SEXP read_child_ci(child_info_t *ci) {
	unsigned int len = 0;
	int fd = ci->pfd;
	int n = read(fd, &len, sizeof(len));
#ifdef MC_DEBUG
	Dprintf(" read_child_ci(%d) - read length returned %d\n", ci->pid, n);
#endif
	if (n != sizeof(len) || len == 0) { /* error or child is exiting */
		int pid = ci->pid;
		close(fd);
		ci->pfd = -1;
		rm_child_(pid);
		return ScalarInteger(pid);
	} else {
		SEXP rv = allocVector(RAWSXP, len);
		unsigned char *rvb = RAW(rv);
		unsigned int i = 0;
		while (i < len) {
			n = read(fd, rvb + i, len - i);
#ifdef MC_DEBUG
			Dprintf(" read_child_ci(%d) - read %d at %d returned %d\n", ci->pid, len-i, i, n);
#endif
			if (n < 1) {
				int pid = ci->pid;
				close(fd);
				ci->pfd = -1;
				rm_child_(pid);
				return ScalarInteger(pid);
			}
			i += n;
		}
		PROTECT(rv);
		{
			SEXP pa = allocVector(INTSXP, 1);
			INTEGER(pa)[0] = ci->pid;
			setAttrib(rv, install("pid"), pa);
		}
		UNPROTECT(1);
		return rv;
	}
}

SEXP read_child(SEXP sPid) {
	int pid = asInteger(sPid);
	child_info_t *ci = &children;
	while (ci) {
		if (ci->pid == pid) break;
		ci = ci->next;
	}
#ifdef MC_DEBUG
	if (!ci) Dprintf("read_child(%d) - pid is not in the list of children\n", pid);
#endif
	if (!ci) return R_NilValue; /* if the child doesn't exist anymore, returns NULL */
	return read_child_ci(ci);	
}

SEXP read_children(SEXP sTimeout) {
	int maxfd = 0, sr, wstat;
	child_info_t *ci = &children;
	fd_set fs;
	struct timeval tv = { 0, 0 }, *tvp = &tv;
	if (isReal(sTimeout) && LENGTH(sTimeout) == 1) {
		double tov = asReal(sTimeout);
		if (tov < 0.0) tvp = 0; /* Note: I'm not sure we really should allow this .. */
		else {
			tv.tv_sec = (int) tov;
			tv.tv_usec = (int) ((tov - ((double) tv.tv_sec)) * 1000000.0);
		}
	}
	while (waitpid(-1, &wstat, WNOHANG) > 0) {}; /* check for zombies */
	FD_ZERO(&fs);
	while (ci && ci->pid) {
		if (ci->pfd > maxfd) maxfd = ci->pfd;
		if (ci->pfd > 0) FD_SET(ci->pfd, &fs);
		ci = ci -> next;
	}
#ifdef MC_DEBUG
	Dprintf("read_children: maxfd=%d, timeout=%d:%d\n", maxfd, tv.tv_sec, tv.tv_usec);
#endif
	if (maxfd == 0) return R_NilValue; /* NULL signifies no children to tend to */
	sr = select(maxfd+1, &fs, 0, 0, tvp);
#ifdef MC_DEBUG
	Dprintf("sr = %d\n", sr);
#endif
	if (sr < 0) {
		perror("select");
		return ScalarLogical(0); /* FALSE on select error */
	}
	if (sr < 1) return ScalarLogical(1); /* TRUE on timeout */
	ci = &children;
	while (ci && ci->pid) {
		if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) break;
		ci = ci -> next;
	}
#ifdef MC_DEBUG
	Dprintf("set ci=%p (%d, %d)\n", ci, ci?ci->pid:0, ci?ci->pfd:0);
#endif
	/* this should never occur really - select signalled a read handle
	   but none of the handles is set - let's treat it as a timeout */
	if (!ci) return ScalarLogical(1);
	else
		return read_child_ci(ci);
	/* we should never land here */
	return R_NilValue;
}

SEXP rm_child(SEXP sPid) {
	int pid = asInteger(sPid);
	return ScalarLogical(rm_child_(pid));
}

SEXP mc_children() {
	unsigned int count = 0;
	SEXP res;
	int *pids;
	child_info_t *ci = &children;
	while (ci && ci->pid > 0) {
		count++;
		ci = ci->next;
	}
	res = allocVector(INTSXP, count);
	if (count) {
		pids = INTEGER(res);
		ci = &children;
		while (ci && ci->pid > 0) {
			(pids++)[0] = ci->pid;
			ci = ci->next;
		}
	}
	return res;
}

SEXP mc_fds(SEXP sFdi) {
	int fdi = asInteger(sFdi);
	unsigned int count = 0;
	SEXP res;
	child_info_t *ci = &children;
	while (ci && ci->pid > 0) {
		count++;
		ci = ci->next;
	}
	res = allocVector(INTSXP, count);
	if (count) {
		int *fds = INTEGER(res);
		ci = &children;
		while (ci && ci->pid > 0) {
			(fds++)[0] = (fdi == 0) ? ci->pfd : ci->sifd;
			ci = ci->next;
		}
	}
	return res;
}


SEXP mc_master_fd() {
	return ScalarInteger(master_fd);
}

SEXP mc_is_child() {
	return ScalarLogical(is_master?0:1);
}

SEXP mc_kill(SEXP sPid, SEXP sSig) {
	int pid = asInteger(sPid);
	int sig = asInteger(sSig);
	if (kill((pid_t) pid, sig))
		error("Kill failed.");
	return ScalarLogical(1);
}

SEXP mc_exit(SEXP sRes) {
	int res = asInteger(sRes);
#ifdef MC_DEBUG
	Dprintf("child %d: exit called\n", getpid());
#endif
	if (is_master) error("exit can only be used in a child process");
	if (master_fd != -1) { /* send 0 to signify that we're leaving */
		unsigned int len = 0;
		write(master_fd, &len, sizeof(len));
		/* make sure the pipe is closed before we enter any waiting */
		close(master_fd);
		master_fd = -1;
	}
	if (!child_can_exit) {
#ifdef MC_DEBUG
		Dprintf("child %d is waiting for permission to exit\n", getpid());
#endif
		while (!child_can_exit) {
			sleep(1);
		}
	}
		
#ifdef MC_DEBUG
	Dprintf("child %d: exiting\n", getpid());
#endif
	exit(res);
	error("exit failed");
	return R_NilValue;
}
back to top