/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */ /* vim: set sts=2 sw=2 et tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ "use strict"; /* exported Process */ /* globals BaseProcess, BasePipe */ importScripts("resource://gre/modules/subprocess/subprocess_shared.js", "resource://gre/modules/subprocess/subprocess_shared_unix.js", "resource://gre/modules/subprocess/subprocess_worker_common.js"); const POLL_TIMEOUT = 5000; let io; let nextPipeId = 0; class Pipe extends BasePipe { constructor(process, fd) { super(); this.process = process; this.fd = fd; this.id = nextPipeId++; } get pollEvents() { throw new Error("Not implemented"); } /** * Closes the file descriptor. * * @param {boolean} [force=false] * If true, the file descriptor is closed immediately. If false, the * file descriptor is closed after all current pending IO operations * have completed. * * @returns {Promise} * Resolves when the file descriptor has been closed. */ close(force = false) { if (!force && this.pending.length) { this.closing = true; return this.closedPromise; } for (let {reject} of this.pending) { let error = new Error("File closed"); error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; reject(error); } this.pending.length = 0; if (!this.closed) { this.fd.dispose(); this.closed = true; this.resolveClosed(); io.pipes.delete(this.id); io.updatePollFds(); } return this.closedPromise; } /** * Called when an error occurred while polling our file descriptor. */ onError() { this.close(true); this.process.wait(); } } class InputPipe extends Pipe { /** * A bit mask of poll() events which we currently wish to be notified of on * this file descriptor. */ get pollEvents() { if (this.pending.length) { return LIBC.POLLIN; } return 0; } /** * Asynchronously reads at most `length` bytes of binary data from the file * descriptor into an ArrayBuffer of the same size. Returns a promise which * resolves when the operation is complete. * * @param {integer} length * The number of bytes to read. * * @returns {Promise} */ read(length) { if (this.closing || this.closed) { throw new Error("Attempt to read from closed pipe"); } return new Promise((resolve, reject) => { this.pending.push({resolve, reject, length}); io.updatePollFds(); }); } /** * Synchronously reads at most `count` bytes of binary data into an * ArrayBuffer, and returns that buffer. If no data can be read without * blocking, returns null instead. * * @param {integer} count * The number of bytes to read. * * @returns {ArrayBuffer|null} */ readBuffer(count) { let buffer = new ArrayBuffer(count); let read = +libc.read(this.fd, buffer, buffer.byteLength); if (read < 0 && ctypes.errno != LIBC.EAGAIN) { this.onError(); } if (read <= 0) { return null; } if (read < buffer.byteLength) { return ArrayBuffer.transfer(buffer, read); } return buffer; } /** * Called when one of the IO operations matching the `pollEvents` mask may be * performed without blocking. * * @returns {boolean} * True if any data was successfully read. */ onReady() { let result = false; let reads = this.pending; while (reads.length) { let {resolve, length} = reads[0]; let buffer = this.readBuffer(length); if (buffer) { result = true; this.shiftPending(); resolve(buffer); } else { break; } } if (reads.length == 0) { io.updatePollFds(); } return result; } } class OutputPipe extends Pipe { /** * A bit mask of poll() events which we currently wish to be notified of on * this file discriptor. */ get pollEvents() { if (this.pending.length) { return LIBC.POLLOUT; } return 0; } /** * Asynchronously writes the given buffer to our file descriptor, and returns * a promise which resolves when the operation is complete. * * @param {ArrayBuffer} buffer * The buffer to write. * * @returns {Promise} * Resolves to the number of bytes written when the operation is * complete. */ write(buffer) { if (this.closing || this.closed) { throw new Error("Attempt to write to closed pipe"); } return new Promise((resolve, reject) => { this.pending.push({resolve, reject, buffer, length: buffer.byteLength}); io.updatePollFds(); }); } /** * Attempts to synchronously write the given buffer to our file descriptor. * Writes only as many bytes as can be written without blocking, and returns * the number of byes successfully written. * * Closes the file descriptor if an IO error occurs. * * @param {ArrayBuffer} buffer * The buffer to write. * * @returns {integer} * The number of bytes successfully written. */ writeBuffer(buffer) { let bytesWritten = libc.write(this.fd, buffer, buffer.byteLength); if (bytesWritten < 0 && ctypes.errno != LIBC.EAGAIN) { this.onError(); } return bytesWritten; } /** * Called when one of the IO operations matching the `pollEvents` mask may be * performed without blocking. */ onReady() { let writes = this.pending; while (writes.length) { let {buffer, resolve, length} = writes[0]; let written = this.writeBuffer(buffer); if (written == buffer.byteLength) { resolve(length); this.shiftPending(); } else if (written > 0) { writes[0].buffer = buffer.slice(written); } else { break; } } if (writes.length == 0) { io.updatePollFds(); } } } class Signal { constructor(fd) { this.fd = fd; } cleanup() { libc.close(this.fd); this.fd = null; } get pollEvents() { return LIBC.POLLIN; } /** * Called when an error occurred while polling our file descriptor. */ onError() { io.shutdown(); } /** * Called when one of the IO operations matching the `pollEvents` mask may be * performed without blocking. */ onReady() { let buffer = new ArrayBuffer(16); let count = +libc.read(this.fd, buffer, buffer.byteLength); if (count > 0) { io.messageCount += count; } } } class Process extends BaseProcess { /** * Each Process object opens an additional pipe from the target object, which * will be automatically closed when the process exits, but otherwise * carries no data. * * This property contains a bit mask of poll() events which we wish to be * notified of on this descriptor. We're not expecting any input from this * pipe, but we need to poll for input until the process exits in order to be * notified when the pipe closes. */ get pollEvents() { if (this.exitCode === null) { return LIBC.POLLIN; } return 0; } /** * Kills the process with the given signal. * * @param {integer} signal */ kill(signal) { libc.kill(this.pid, signal); this.wait(); } /** * Initializes the IO pipes for use as standard input, output, and error * descriptors in the spawned process. * * @param {object} options * The Subprocess options object for this process. * @returns {unix.Fd[]} * The array of file descriptors belonging to the spawned process. */ initPipes(options) { let stderr = options.stderr; let our_pipes = []; let their_pipes = new Map(); let pipe = input => { let fds = ctypes.int.array(2)(); let res = libc.pipe(fds); if (res == -1) { throw new Error("Unable to create pipe"); } fds = Array.from(fds, unix.Fd); if (input) { fds.reverse(); } if (input) { our_pipes.push(new InputPipe(this, fds[1])); } else { our_pipes.push(new OutputPipe(this, fds[1])); } libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC); libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC); libc.fcntl(fds[1], LIBC.F_SETFL, LIBC.O_NONBLOCK); return fds[0]; }; their_pipes.set(0, pipe(false)); their_pipes.set(1, pipe(true)); if (stderr == "pipe") { their_pipes.set(2, pipe(true)); } else if (stderr == "stdout") { their_pipes.set(2, their_pipes.get(1)); } // Create an additional pipe that we can use to monitor for process exit. their_pipes.set(3, pipe(true)); this.fd = our_pipes.pop().fd; this.pipes = our_pipes; return their_pipes; } spawn(options) { let {command, arguments: args} = options; let argv = this.stringArray(args); let envp = this.stringArray(options.environment); let actions = unix.posix_spawn_file_actions_t(); let actionsp = actions.address(); let fds = this.initPipes(options); let cwd; try { if (options.workdir) { cwd = ctypes.char.array(LIBC.PATH_MAX)(); libc.getcwd(cwd, cwd.length); if (libc.chdir(options.workdir) < 0) { throw new Error(`Unable to change working directory to ${options.workdir}`); } } libc.posix_spawn_file_actions_init(actionsp); for (let [i, fd] of fds.entries()) { libc.posix_spawn_file_actions_adddup2(actionsp, fd, i); } let pid = unix.pid_t(); let rv = libc.posix_spawn(pid.address(), command, actionsp, null, argv, envp); if (rv != 0) { for (let pipe of this.pipes) { pipe.close(); } throw new Error(`Failed to execute command "${command}"`); } this.pid = pid.value; } finally { libc.posix_spawn_file_actions_destroy(actionsp); this.stringArrays.length = 0; if (cwd) { libc.chdir(cwd); } for (let fd of new Set(fds.values())) { fd.dispose(); } } } /** * Called when input is available on our sentinel file descriptor. * * @see pollEvents */ onReady() { // We're not actually expecting any input on this pipe. If we get any, we // can't poll the pipe any further without reading it. if (this.wait() == undefined) { this.kill(9); } } /** * Called when an error occurred while polling our sentinel file descriptor. * * @see pollEvents */ onError() { this.wait(); } /** * Attempts to wait for the process's exit status, without blocking. If * successful, resolves the `exitPromise` to the process's exit value. * * @returns {integer|null} * The process's exit status, if it has already exited. */ wait() { if (this.exitCode !== null) { return this.exitCode; } let status = ctypes.int(); let res = libc.waitpid(this.pid, status.address(), LIBC.WNOHANG); if (res == this.pid) { let sig = unix.WTERMSIG(status.value); if (sig) { this.exitCode = -sig; } else { this.exitCode = unix.WEXITSTATUS(status.value); } this.fd.dispose(); io.updatePollFds(); this.resolveExit(this.exitCode); return this.exitCode; } } } io = { pollFds: null, pollHandlers: null, pipes: new Map(), processes: new Map(), messageCount: 0, running: true, init(details) { this.signal = new Signal(details.signalFd); this.updatePollFds(); setTimeout(this.loop.bind(this), 0); }, shutdown() { if (this.running) { this.running = false; this.signal.cleanup(); this.signal = null; self.postMessage({msg: "close"}); self.close(); } }, getPipe(pipeId) { let pipe = this.pipes.get(pipeId); if (!pipe) { let error = new Error("File closed"); error.errorCode = SubprocessConstants.ERROR_END_OF_FILE; throw error; } return pipe; }, getProcess(processId) { let process = this.processes.get(processId); if (!process) { throw new Error(`Invalid process ID: ${processId}`); } return process; }, updatePollFds() { let handlers = [this.signal, ...this.pipes.values(), ...this.processes.values()]; handlers = handlers.filter(handler => handler.pollEvents); let pollfds = unix.pollfd.array(handlers.length)(); for (let [i, handler] of handlers.entries()) { let pollfd = pollfds[i]; pollfd.fd = handler.fd; pollfd.events = handler.pollEvents; pollfd.revents = 0; } this.pollFds = pollfds; this.pollHandlers = handlers; }, loop() { this.poll(); if (this.running) { setTimeout(this.loop.bind(this), 0); } }, poll() { let handlers = this.pollHandlers; let pollfds = this.pollFds; let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT; let count = libc.poll(pollfds, pollfds.length, timeout); for (let i = 0; count && i < pollfds.length; i++) { let pollfd = pollfds[i]; if (pollfd.revents) { count--; let handler = handlers[i]; try { let success = false; if (pollfd.revents & handler.pollEvents) { success = handler.onReady(); } // Only call the error handler in this iteration if we didn't also // have a success. This is necessary because Linux systems set POLLHUP // on a pipe when it's closed but there's still buffered data to be // read, and Darwin sets POLLIN and POLLHUP on a closed pipe, even // when there's no data to be read. if (!success && (pollfd.revents & (LIBC.POLLERR | LIBC.POLLHUP | LIBC.POLLNVAL))) { handler.onError(); } } catch (e) { console.error(e); debug(`Worker error: ${e} :: ${e.stack}`); handler.onError(); } pollfd.revents = 0; } } }, addProcess(process) { this.processes.set(process.id, process); for (let pipe of process.pipes) { this.pipes.set(pipe.id, pipe); } }, cleanupProcess(process) { this.processes.delete(process.id); }, };