Just stumbled upon the need to move data chunks between subprocesses in a non-linear way with some logic in-between, so tee(1) and fifo(7)'s weren't too good option. Inspired by 440554, but rewritten from scratch to remove unnecessary delays due to sleep(3) calls and suboptimal try/sleep-based polling.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151  | from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from subprocess import Popen, PIPE
import errno, fcntl
from time import time
import os, sys
# Exit conditions (states)
class Time: pass # hit-the-time-limit state
class Size: pass # hit-the-size-limit state
class End: pass # hit-the-end state
class AWrapper(object):
	'''Async I/O objects' wrapper'''
	bs_default = 8192
	bs_max = 65536
	def __init__(self, pipe):
		if isinstance(pipe, int):
			fd = self._fd = pipe
			pipe = os.fromfd(pipe)
		else: fd = self._fd = pipe.fileno()
		self._poll_in, self._poll_out = epoll(), epoll()
		self._poll_in.register(fd, EPOLLIN | EPOLLERR | EPOLLHUP)
		self._poll_out.register(fd, EPOLLOUT | EPOLLERR | EPOLLHUP)
		self.close = pipe.close
		self.reads = pipe.read
		self.writes = pipe.write
	def __del__(self):
		self._poll_in.close()
		self._poll_out.close()
		self.close()
	def read(self, bs=-1, to=-1, state=False): # read until timeout
		if to < 0: # use regular sync I/O
			buff = self.reads(bs)
			if state: return (buff, Size) if len(buff) == bs else (buff, End) # "Size" might mean "Size | End" here
			else: return buff
		try:
			flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
			fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
			deadline = time() + to
			buff = buffer('')
			while bs:
				try: fd, event = self._poll_in.poll(to, 1)[0] # get first event, fd should be eq self._fd
				except IndexError:
					if state: state = Time
					break
				if event != EPOLLHUP: # some data or error present
					ext = self.reads(min(bs, self.bs_max) if bs > 0 else self.bs_default) # min() for G+ reads
					buff += ext
				if event & EPOLLHUP: # socket is closed on the other end
					if state: state = End
					break
				to = deadline - time()
				if to < 0:
					if state: state = Time
					break
				bs -= len(ext)
			else: state = Size # got bs bytes
		finally:
			try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags) # restore blocking state
			except: pass # in case there was an error, caused by wrong fd/pipe (not to raise another one)
		return buff if not state else (buff, state)
	def write(self, buff, to=-1, state=False): # mostly similar (in reverse) to read
		if to < 0:
			bs = self.writes(buff)
			if state: return (bs, Size) if len(buff) == bs else (bs, End) # "Size" might mean "Size | End" here
			else: return bs
		try:
			flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
			fcntl.fcntl(self._fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
			bs = 0
			deadline = time() + to
			while buff:
				try: fd, event = self._poll_out.poll(to, 1)[0]
				except IndexError:
					if state: state = Time
					break
				if event != EPOLLHUP:
					ext = os.write(fd, buff)
					bs += ext
				if event & EPOLLHUP:
					if state: state = End
					break
				to = deadline - time()
				if to < 0:
					if state: state = Time
					break
				buff = buffer(buff, ext)
		finally:
			try: fcntl.fcntl(self._fd, fcntl.F_SETFL, flags)
			except: pass
		return bs if not state else (bs, state)
import signal
class AExec(Popen):
	def __init__(self, *argz, **kwz): # keywords aren't used yet
		if len(argz) == 1: argz = (argz[0],) if isinstance(argz[0], (str, unicode, buffer)) else argz[0]
		try: sync = kwz.pop('sync')
		except KeyError: sync = True
		super(AExec, self).__init__(argz, **kwz)
		if self.stdin:
			if not sync: self.stdin = AWrapper(self.stdin)
			self.write = self.stdin.write
		if self.stdout:
			if not sync: self.stdout = AWrapper(self.stdout)
			self.read = self.stdout.read
		if self.stderr:
			if not sync: self.stderr = AWrapper(self.stderr)
			self.read_err = self.stderr.read
	def wait(self, to=-1):
		if to > 0:
			ts, fuse, action = time(), signal.alarm(to), signal.getsignal(signal.SIGALRM)
			def quit(s,f): raise StopIteration
			signal.signal(signal.SIGALRM, quit)
			try: status = super(AExec, self).wait()
			except StopIteration: return Time
			signal.signal(signal.SIGALRM, action)
			if fuse:
				fuse = int(time() - ts + fuse)
				if fuse > 0: signal.alarm(fuse)
				else: # trigger it immediately
					signal.alarm(0)
					os.kill(os.getpid(), signal.SIGALRM)
			else: signal.alarm(0)
		else: status = super(AExec, self).wait()
		return status
	def close(self, to=-1, to_sever=3):
		try:
			if self.stdin: # try to strangle it
				try: self.stdin.close()
				except: pass
				if to_sever and to > to_sever: # wait for process to die on it's own
					status = self.wait(to_sever)
					if not status is Time: return status
					else: to -= to_sever
			self.terminate() # soft-kill
			status = self.wait(to)
			if status is Time:
				self.kill() # hard-kill
				return Time
		except: return None # already taken care of
	__del__ = close
 | 
Linux epoll(7) reactor can be easily replaced by kqueue or select for cross-platform compatibility.
Example usage:
## Simple tar to secure remote system and insecure one thru gpg
##  (simple case, can also be implemented by tee & fifos)
# Note that only one pipe is actually asynchronous
source = AExec('tar', '-czf', '-', '/')
target1 = AExec('ssh', 'backup@somehost', 'cat > /mnt/backups/backup.tar.gz')
filter = AExec('gpg', '--encrypt', '--recipient', 'admin@somehost', sync=False)
target2 = AExec('ssh', 'backup@publichost', 'cat > /public/my_backup.tar.gz.gpg')
max_timeout = 10
to = 0 # filter timeout, raised in case of jam
while True:
    chunk = source.read(8192) # 8 KiB chunks, shouldn't be larger than pipe read/write buffers
    if chunk and not to: # got data chunk and filter isn't jammed
        target1.write(chunk)
        while chunk:
            bytes = filter.write(chunk, to) # try feeding the filter (could be overflown)
            if bytes: chunk = buffer(chunk, bytes) # unsent leftover (should be zero, if chunk < buffers)
            buff = filter.read(-1, 0) # return all we can grab from the filter at the moment
            if buff:
                target2.write(buff)
                to = 0
            elif not bytes:
                if not to: to = max_timeout # try harder, one last time
                else:
                    log.error('Filter looks jammed (not responsible to I/O) for %ds'%max_timeout)
                    break
            else: to = 0
        else: to = 0 # also indicates chunk sending success
    else: # either there's no more data to read or filter is jammed - cleanup & break
        if not to: # it's not a jam
            filter.stdin.close() # sever data intake, so filter would flush the output buffer
            chunk, state = filter.read(-1, max_timeout, state=True) # squeeze the last bits
            if state is Time: log.error("Filter stdout wasn't closed properly on the other end")
            target2.write(chunk)
        if filter.wait(max_timeout) is Time:
            log.error("Filter process hasn't died off, will be terminated")
            if filter.close(max_timeout, 0) is Time: log.error("Filter had to be -KILL'ed")
        source.close(max_timeout)
        target1.close(max_timeout)
        target2.close(max_timeout)
        break
      
Download
Copy to clipboard
Класс!!! Хочу добавить поддержку винды