# HG changeset patch # Parent a293c01337a62718938d9639653cf1b8dfffa054 diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -13,6 +13,7 @@ from test import test_support import unittest import mailbox import glob +from contextlib import contextmanager try: import fcntl except ImportError: @@ -21,6 +22,27 @@ except ImportError: # Silence Py3k warning rfc822 = test_support.import_module('rfc822', deprecated=True) +try: + import multiprocessing +except ImportError: + multiprocessing = None +else: + @contextmanager + def child_process(func, *args, **kwargs): + """Context manager to run a function concurrently in a child process. + + Runs func(*args, **kwargs) in a subprocess using + multiprocessing and waits for it to terminate. + + """ + process = multiprocessing.Process(target=func, args=args, kwargs=kwargs) + try: + process.start() + yield + finally: + process.join() + + class TestBase: def _check_sample(self, msg): @@ -45,6 +67,53 @@ class TestBase: test_support.unlink(target) +def add_message(factory, path, msg): + # Add "msg" to mailbox at "path", using mailbox instance returned + # by "factory". + mbox = factory(path) + try: + mbox.lock() + mbox.add(msg) + finally: + mbox.close() + +def add_two_delete_one(factory, path, msg1, msg2): + # Add "msg1" and "msg2" to mailbox at "path", then delete "msg1". + # Uses mailbox instance returned by "factory". + mbox = factory(path) + try: + mbox.lock() + key = mbox.add(msg1) + mbox.add(msg2) + # Flushing out two messages then deleting one ensures that for + # the single-file mailbox formats, the subsequent flush has to + # rewrite the mailbox. + mbox.flush() + del mbox[key] + mbox.flush() + finally: + mbox.close() + +def only_yield(): + yield + +def child_func(to_child, from_parent, child, child_args): + # Used by _subprocess method below. Waits for Connection object + # "from_parent" to receive EOF, and then calls "child" with + # arguments "child_args". + try: + to_child.close() + try: + from_parent.recv() + except EOFError: + pass + else: + raise AssertionError("Unexpectedly received data from parent " + "process.") + finally: + from_parent.close() + child(*child_args) + class TestMailbox(TestBase): _factory = None # Overridden by subclasses to reuse tests @@ -59,6 +128,166 @@ class TestMailbox(TestBase): self._box.close() self._delete_recursively(self._path) + def _acquire_lock(self, mbox=None): + # Keep trying to acquire lock on self._box (or mbox if given) + # until we get it. + if mbox is None: + mbox = self._box + while True: + try: + mbox.lock() + break + except mailbox.ExternalClashError: + time.sleep(0.01) + + @contextmanager + def _locked(self, mbox=None): + # Context manager to lock and unlock self._box, or mbox if given. + if mbox is None: + mbox = self._box + try: + self._acquire_lock(mbox) + yield + finally: + mbox.unlock() + + def _compare_mailbox(self, mapping, other=(), mbox=None): + # Check that .as_string() values of mbox contents match + # strings in "mapping" and "other". Messages in "mapping" + # must be present under their respective keys, while messages + # in "other" may have any key. No other messages may be + # present in mbox. + if mbox is None: + mbox = self._box + self.assertEqual(len(mbox), len(mapping) + len(other)) + other = list(other) + for key in mbox.iterkeys(): + msgstr = mbox[key].as_string() + if key in mapping: + self.assertEqual(mapping[key], msgstr) + else: + self.assertIn(msgstr, other) + del other[other.index(msgstr)] + + def _subprocess(self, parent, child, child_args, inspect=None, path=None, + lock1=False, lock2=False, flush=False): + # Method to run code in parent and child processes under + # various conditions. The function "child" is run in the + # child process with arguments "child_args", while "parent" + # should be a generator function which yields when it wants to + # allow the child to run; once the child has returned, the + # generator will be resumed. Finally, the function "inspect" + # will be run. Both "parent" and "inspect" are called with no + # arguments, and separate mailbox instances on self._box. + # + # If "lock1" is true, self._box will be locked when the first + # step of the parent generator is run, and unlocked when it + # yields. If "flush" is true, self._box.flush() will be + # called when the generator first yields, before releasing the + # lock (if set) and allowing the child to run. If "lock2" is + # true, self._box will be locked during the second step. + if multiprocessing is None: + self.skipTest("requires multiprocessing") + if path is None: + path = self._path + @contextmanager + def nullcm(*args, **kwargs): + yield + lock1cm = self._locked if lock1 else nullcm + lock2cm = self._locked if lock2 else nullcm + self._box.close() + self._delete_recursively(self._path) + from_parent, to_child = multiprocessing.Pipe(duplex=False) + with child_process(child_func, to_child, from_parent, + child, child_args): + try: + from_parent.close() + self._box = self._factory(path) + parent_iter = parent() + with lock1cm(): + parent_iter.next() + if flush: + self._box.flush() + finally: + to_child.close() # Allow child to continue + with lock2cm(): + try: + parent_iter.next() + except StopIteration: + pass + self._box.close() + if inspect is not None: + self._box = self._factory(path) + inspect() + + def _subprocess_correct(self, parent, child, child_args, + inspect=None, path=None): + # Run with proper locking and flushing in parent. + self._subprocess(parent, child, child_args, inspect, path, + lock1=True, lock2=True, flush=True) + + def _subprocess_modify_unlocked_flush(self, parent, child, child_args, + inspect=None, path=None): + # Run first step unlocked, but flush before yielding to child. + self._subprocess(parent, child, child_args, inspect, path, + lock1=False, lock2=True, flush=True) + + def _subprocess_modify_unlocked(self, parent, child, child_args, + inspect=None, path=None): + # Run first step without locks, and yield to child without flushing. + self._subprocess(parent, child, child_args, inspect, path, + lock1=False, lock2=True, flush=False) + + def _subprocess_tests(self, parent, child, child_args, + inspect=None, path=None): + # Run with some particular conditions we want to test for. + # (Note that since the actions of parent and child are + # explicitly serialized, the mailbox should not be corrupted + # whether they use locking or not.) + self._subprocess_correct(parent, child, child_args, inspect, path) + self._subprocess_modify_unlocked_flush(parent, child, child_args, + inspect, path) + self._subprocess_modify_unlocked(parent, child, child_args, + inspect, path) + + def test_subprocess(self): + # Check that self._subprocess runs OK with various options. + for n in range(8): + self._subprocess(only_yield, only_yield, (), lambda: None, + lock1=(n & 4), lock2=(n & 2), flush=(n & 1)) + + def test_add_by_other(self): + # Check that other process can add a message and we can read it. + msg = self._template % 0 + def parent(): + yield + self._compare_mailbox({}, [msg]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, msg)) + + def test_child_add_and_delete(self): + # (Issue #1599254) Check that when one process (in this case + # the child) makes changes which require rewriting a + # single-file mailbox, it does not interfere with another + # process which already has the mailbox open and is about to + # modify it, but hasn't (yet) acquired a lock on it, or + # written anything. Previously the rewritten mailbox was + # renamed over the original, causing the other process to + # write its own changes (e.g. new mail) into a deleted file. + msgp = self._template % "p" + msgc1 = self._template % "c1" + msgc2 = self._template % "c2" + def parent(): + # self._box has already been created at this point, and + # the .add() below will use the same instance. + yield + self._box.add(msgp) + def inspect(): + self._compare_mailbox({}, [msgc2, msgp]) + self._subprocess_tests(parent, add_two_delete_one, + (self._factory, self._path, msgc1, msgc2), + inspect) + def test_add(self): # Add copies of a sample message keys = [] @@ -494,11 +723,13 @@ class TestMailboxSuperclass(TestBase, un self.assertRaises(NotImplementedError, lambda: box.close()) +def factory_Maildir(path, factory=None): + return mailbox.Maildir(path, factory) + class TestMaildir(TestMailbox, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) - def setUp(self): + self._factory = factory_Maildir TestMailbox.setUp(self) if os.name in ('nt', 'os2') or sys.platform == 'cygwin': self._box.colon = '!' @@ -985,9 +1216,14 @@ class _TestMboxMMDF(_TestSingleFile): self._box.close() +def factory_mbox(path, factory=None): + return mailbox.mbox(path, factory) + class TestMbox(_TestMboxMMDF, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) + def setUp(self): + self._factory = factory_mbox + _TestMboxMMDF.setUp(self) @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') @@ -1032,14 +1268,24 @@ class TestMbox(_TestMboxMMDF, unittest.T self.assertEqual(data[-3:], '0\n\n') +def factory_MMDF(path, factory=None): + return mailbox.MMDF(path, factory) + class TestMMDF(_TestMboxMMDF, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) + def setUp(self): + self._factory = factory_MMDF + _TestMboxMMDF.setUp(self) +def factory_MH(path, factory=None): + return mailbox.MH(path, factory) + class TestMH(TestMailbox, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.MH(path, factory) + def setUp(self): + self._factory = factory_MH + TestMailbox.setUp(self) def test_list_folders(self): # List folders @@ -1169,9 +1415,14 @@ class TestMH(TestMailbox, unittest.TestC return os.path.join(self._path, '.mh_sequences.lock') +def factory_Babyl(path, factory=None): + return mailbox.Babyl(path, factory) + class TestBabyl(_TestSingleFile, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) + def setUp(self): + self._factory = factory_Babyl + _TestSingleFile.setUp(self) def tearDown(self): self._box.close()