Index: Lib/test/test_mailbox.py =================================================================== --- Lib/test/test_mailbox.py (revision 54661) +++ Lib/test/test_mailbox.py (working copy) @@ -1,3 +1,5 @@ +from __future__ import with_statement + import os import sys import time @@ -12,6 +14,7 @@ import unittest import mailbox import glob +from contextlib import contextmanager try: import fcntl except ImportError: @@ -61,6 +64,46 @@ self._box.close() self._delete_recursively(self._path) + def _acquire_lock(self, mbox=None): + # Keep trying to acquire lock on self._box (or mbox if given) + # until we get it. + if mbox is None: + mbox = self._box + while True: + try: + mbox.lock() + break + except mailbox.ExternalClashError: + time.sleep(0.01) + + @contextmanager + def _locked(self, mbox=None): + # Context manager to lock and unlock self._box, or mbox if given. + if mbox is None: + mbox = self._box + try: + self._acquire_lock(mbox) + yield + finally: + mbox.unlock() + + def _compare_mailbox(self, mapping, other=(), mbox=None): + # Check that .as_string() values of mbox contents match + # strings in "mapping" and "other". Messages in "mapping" + # must be present under their respective keys, while messages + # in "other" may have any key. No other messages may be + # present in mbox. + if mbox is None: + mbox = self._box + self.assert_(len(mbox) == len(mapping) + len(other)) + other = list(other) + for key in mbox.iterkeys(): + msgstr = mbox[key].as_string() + if key in mapping: + self.assert_(mapping[key] == msgstr) + else: + del other[other.index(msgstr)] # ValueError if not there + def test_add(self): # Add copies of a sample message keys = [] @@ -510,6 +553,207 @@ self.assert_(len(self._box) == 50) self._box.unlock() + def _subprocess(self, parent, child, inspect=None, path=None, + lock1=False, lock2=False, flush=False): + # Method to run code in parent and child processes under + # various conditions. The function "child" is run in the + # child process, while "parent" should be a generator function + # which yields when it wants to allow the child to run; once + # the child has returned, the generator will be resumed. + # Finally, the function "inspect" will be run. Each of + # "parent", "child" and "inspect" is called with no arguments, + # and a separate mailbox instance on self._box. + # + # If "lock1" is true, self._box will be locked when the first + # step of the parent generator is run, and unlocked when it + # yields. If "flush" is true, self._box.flush() will be + # called when the generator first yields, before releasing the + # lock (if set) and allowing the child to run. If "lock2" is + # true, self._box will be locked during the second step. + for attr in 'fork', 'waitpid', 'pipe', '_exit': + if not hasattr(os, attr): + return + if path is None: + path = self._path + @contextmanager + def nullcm(*args, **kwargs): + yield + lock1cm = self._locked if lock1 else nullcm + lock2cm = self._locked if lock2 else nullcm + self._box.close() + self._delete_recursively(self._path) + from_parent, to_child = os.pipe() + def child_func(): + os.close(to_child) + os.read(from_parent, 1) # Wait for other end to be closed + self._box = self._factory(path) + child() + self._box.close() + with child_process(child_func): + os.close(from_parent) + try: + self._box = self._factory(path) + parent_iter = parent() + with lock1cm(): + parent_iter.next() + if flush: + self._box.flush() + finally: + os.close(to_child) # Allow child to continue + with lock2cm(): + try: + parent_iter.next() + except StopIteration: + pass + self._box.close() + if inspect is not None: + self._box = self._factory(path) + inspect() + + def _subprocess_correct(self, parent, child, inspect=None, path=None): + # Run with proper locking and flushing in parent. + self._subprocess(parent, child, inspect, path, + lock1=True, lock2=True, flush=True) + + def _subprocess_modify_unlocked_flush(self, parent, child, inspect=None, + path=None): + # Run first step unlocked, but flush before yielding to child. + self._subprocess(parent, child, inspect, path, + lock1=False, lock2=True, flush=True) + + def _subprocess_modify_unlocked(self, parent, child, inspect=None, + path=None): + # Run first step without locks, and yield to child without flushing. + self._subprocess(parent, child, inspect, path, + lock1=False, lock2=True, flush=False) + + def _subprocess_tests(self, parent, child, inspect=None, path=None): + # Run with some particular conditions we want to test for. + self._subprocess_correct(parent, child, inspect, path) + self._subprocess_modify_unlocked_flush(parent, child, inspect, path) + self._subprocess_modify_unlocked(parent, child, inspect, path) + + def test_subprocess(self): + # Check that self._subprocess runs OK with various options. + def parent(): + yield + child = inspect = lambda: None + for n in range(8): + self._subprocess(parent, child, inspect, + lock1=(n & 4), lock2=(n & 2), flush=(n & 1)) + + def test_add_by_other(self): + # Check that other process can add a message and we can read it. + msg = self._template % 0 + def parent(): + yield + self._compare_mailbox({}, [msg]) + def child(): + self._box.add(msg) + self._subprocess_tests(parent, child) + + def test_add_by_other_reread(self): + # Check we can read other process' message after writing our own. + msgp = self._template % 0 + msgc = self._template % 1 + def parent(): + key = self._box.add(msgp) + yield + self._compare_mailbox({key: msgp}, [msgc]) + def child(): + self._box.add(msgc) + self._subprocess_tests(parent, child) + + def test_interleave(self): + # Check that other process can add a message in between our messages. + p1 = self._template % "p1" + p2 = self._template % "p2" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + yield + k2 = self._box.add(p2) + self._compare_mailbox({k1: p1, k2: p2}, [c1]) + def child(): + self._box.add(c1) + def inspect(): + self._compare_mailbox({}, [p1, c1, p2]) + self._subprocess_tests(parent, child, inspect) + + def test_delete_reread(self): + # Have other process add a message after we've deleted one. + p1 = self._template % "p1" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + del self._box[k1] + yield + self._compare_mailbox({}, [c1]) + def child(): + self._box.add(c1) + def inspect(): + self._compare_mailbox({}, [c1]) + self._subprocess_tests(parent, child, inspect) + + def test_delete_reread2(self): + # As above, but have parent add more messages before and after. + p1 = self._template % "p1" + p2 = self._template % "p2" + p3 = self._template % "p3" + p4 = self._template % "p4" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + k2 = self._box.add(p2) + del self._box[k2] + k3 = self._box.add(p3) + yield + k4 = self._box.add(p4) + self._compare_mailbox({k1: p1, k3: p3, k4: p4}, [c1]) + def child(): + self._box.add(c1) + def inspect(): + self._compare_mailbox({}, [p1, p3, c1, p4]) + self._subprocess_tests(parent, child, inspect) + + def test_replace_reread(self): + # Have other process add a message after we've replaced one. + p1 = self._template % "p1" + p2 = self._template % "p2" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + self._box[k1] = p2 + yield + self._compare_mailbox({k1: p2}, [c1]) + def child(): + self._box.add(c1) + def inspect(): + self._compare_mailbox({}, [p2, c1]) + self._subprocess_tests(parent, child, inspect) + + def test_replace_reread2(self): + # As above, but have parent add more messages before and after. + p1 = self._template % "p1" + p2 = self._template % "p2" + p3 = self._template % "p3" + p4 = self._template % "p4" + p5 = self._template % "p5" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + k2 = self._box.add(p2) + self._box[k2] = p3 + k4 = self._box.add(p4) + yield + k5 = self._box.add(p5) + self._compare_mailbox({k1: p1, k2: p3, k4: p4, k5: p5}, [c1]) + def child(): + self._box.add(c1) + def inspect(): + self._compare_mailbox({}, [p1, p3, p4, c1, p5]) + self._subprocess_tests(parent, child, inspect) + def _get_lock_path(self): # Return the path of the dot lock file. May be overridden. return self._path + '.lock' @@ -1846,6 +2090,31 @@ ## End: classes from the original module (for backward compatibility). +@contextmanager +def child_process(func, *args, **kwargs): + """Context manager to run a function concurrently in a child process. + + Runs func(*args, **kwargs) in a child process, returns process ID, and + waits for child afterwards. + + Caveat: child might escape if it gets a signal at the wrong point. + + """ + pid = None + try: + pid = os.fork() + if pid == 0: + try: + func(*args, **kwargs) + os._exit(0) + except: + os._exit(1) + yield pid + finally: + if pid is not None: + os.waitpid(pid, 0) + + _sample_message = """\ Return-Path: X-Original-To: gkj+person@localhost