# HG changeset patch # Parent a87930571eb6fe3ba650c9b02432e10d15602d9a diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py --- a/Lib/test/test_mailbox.py +++ b/Lib/test/test_mailbox.py @@ -13,6 +13,7 @@ from test import test_support import unittest import mailbox import glob +from contextlib import contextmanager try: import fcntl except ImportError: @@ -21,6 +22,27 @@ except ImportError: # Silence Py3k warning rfc822 = test_support.import_module('rfc822', deprecated=True) +try: + import multiprocessing +except ImportError: + multiprocessing = None +else: + @contextmanager + def child_process(func, *args, **kwargs): + """Context manager to run a function concurrently in a child process. + + Runs func(*args, **kwargs) in a subprocess using + multiprocessing and waits for it to terminate. + + """ + process = multiprocessing.Process(target=func, args=args, kwargs=kwargs) + try: + process.start() + yield + finally: + process.join() + + class TestBase: def _check_sample(self, msg): @@ -45,6 +67,64 @@ class TestBase: test_support.unlink(target) +def random_message(): + # Generate a random message body + import random + body = "" + for i in range(random.randint(1, 10)): + line = "a" * random.randint(0, 75) + '\n' + body += line + + return body + +def add_25_messages(factory, path): + "Helper function to add 25 messages to a mailbox." + mbox = factory(path) + try: + for i in range(25): + msg = """Subject: %i, pid %i +From: sender@example.com + +Content goes here. +%s""" % (i, os.getpid(), random_message()) + while True: + try: + mbox.lock() + except mailbox.ExternalClashError: + # In case of conflict, wait a bit and try again. + time.sleep(0.01) + else: + break + mbox.add(msg) + mbox.flush() + mbox.unlock() + finally: + mbox.close() + +def add_message(factory, path, msg): + # Add "msg" to mailbox at "path", using mailbox instance returned + # by "factory". + mbox = factory(path) + mbox.add(msg) + mbox.close() + +def only_yield(): + yield + +def child_func(to_child, from_parent, child, child_args): + # Used by _subprocess method below. Waits for Connection object + # "from_parent" to receive EOF, and then calls "child" with + # arguments "child_args". + to_child.close() + try: + from_parent.recv() + except EOFError: + pass + else: + raise AssertionError("Unexpectedly received data from parent process.") + from_parent.close() + child(*child_args) + class TestMailbox(TestBase): _factory = None # Overridden by subclasses to reuse tests @@ -59,6 +139,47 @@ class TestMailbox(TestBase): self._box.close() self._delete_recursively(self._path) + def _acquire_lock(self, mbox=None): + # Keep trying to acquire lock on self._box (or mbox if given) + # until we get it. + if mbox is None: + mbox = self._box + while True: + try: + mbox.lock() + break + except mailbox.ExternalClashError: + time.sleep(0.01) + + @contextmanager + def _locked(self, mbox=None): + # Context manager to lock and unlock self._box, or mbox if given. + if mbox is None: + mbox = self._box + try: + self._acquire_lock(mbox) + yield + finally: + mbox.unlock() + + def _compare_mailbox(self, mapping, other=(), mbox=None): + # Check that .as_string() values of mbox contents match + # strings in "mapping" and "other". Messages in "mapping" + # must be present under their respective keys, while messages + # in "other" may have any key. No other messages may be + # present in mbox. + if mbox is None: + mbox = self._box + self.assertEqual(len(mbox), len(mapping) + len(other)) + other = list(other) + for key in mbox.iterkeys(): + msgstr = mbox[key].as_string() + if key in mapping: + self.assertEqual(mapping[key], msgstr) + else: + self.assertIn(msgstr, other) + del other[other.index(msgstr)] + def test_add(self): # Add copies of a sample message keys = [] @@ -132,6 +253,38 @@ class TestMailbox(TestBase): self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key0]) + def test_double_shorten(self): + # Check that flush() can shorten the mailbox twice + self._test_remove_two_of_three(broken_locking=False) + + def test_remove_with_broken_locking(self): + # Check that a (broken) application releasing the lock and + # then removing messages using the existing keys does not + # delete the wrong messages. + self._test_remove_two_of_three(broken_locking=True) + + def _test_remove_two_of_three(self, broken_locking=False): + self._box.lock() + key0 = self._box.add(self._template % 0) + key1 = self._box.add(self._template % 1) + key2 = self._box.add(self._template % 2) + self._box.flush() + self._box.remove(key0) + self._box.flush() + if broken_locking: + # As the name suggests, code that does this is broken + # (releasing the lock invalidates the keys, in general), + # but ideally mailbox.py should not break it further. + self._box.unlock() + self._box.lock() + self._box.remove(key1) + self._box.flush() + self._box.unlock() + self._box.close() + self._box = self._factory(self._path) + self.assertEqual(len(self._box), 1) + self.assertEqual(self._box.itervalues().next().get_payload(), '2\n') + def test_get(self): # Retrieve messages using get() key0 = self._box.add(self._template % 0) @@ -454,6 +607,216 @@ class TestMailbox(TestBase): self.assertRaises(TypeError, lambda: self._box._dump_message(None, output)) + @unittest.skipIf(multiprocessing is None, "requires multiprocessing") + def test_concurrent_add(self): + # Simple test of concurrent addition to a mailbox. + # This exercises the add() and flush() methods, based on bug #1599254. + # This bug affected only the classes based on _singlefileMailbox + # (mbox, MMDF, Babyl), but this test can apply to any mailbox type. + + self._box.close() + + # Fire off a subprocess that will add 25 messages to a mailbox + # file, locking and unlocking it each time. The parent process + # will do the same. The resulting mailbox should contain 50 messages. + with child_process(add_25_messages, self._factory, self._path): + add_25_messages(self._factory, self._path) + + # We expect the mailbox to contain 50 messages. + self._box = self._factory(self._path) + self._box.lock() + self.assertEqual(len(self._box), 50) + self._box.unlock() + + def _subprocess(self, parent, child, child_args, inspect=None, path=None, + lock1=False, lock2=False, flush=False): + # Method to run code in parent and child processes under + # various conditions. The function "child" is run in the + # child process with arguments "child_args", while "parent" + # should be a generator function which yields when it wants to + # allow the child to run; once the child has returned, the + # generator will be resumed. Finally, the function "inspect" + # will be run. Both "parent" and "inspect" are called with no + # arguments, and separate mailbox instances on self._box. + # + # If "lock1" is true, self._box will be locked when the first + # step of the parent generator is run, and unlocked when it + # yields. If "flush" is true, self._box.flush() will be + # called when the generator first yields, before releasing the + # lock (if set) and allowing the child to run. If "lock2" is + # true, self._box will be locked during the second step. + if multiprocessing is None: + self.skipTest("requires multiprocessing") + if path is None: + path = self._path + @contextmanager + def nullcm(*args, **kwargs): + yield + lock1cm = self._locked if lock1 else nullcm + lock2cm = self._locked if lock2 else nullcm + self._box.close() + self._delete_recursively(self._path) + from_parent, to_child = multiprocessing.Pipe(duplex=False) + with child_process(child_func, to_child, from_parent, + child, child_args): + from_parent.close() + try: + self._box = self._factory(path) + parent_iter = parent() + with lock1cm(): + parent_iter.next() + if flush: + self._box.flush() + finally: + to_child.close() # Allow child to continue + with lock2cm(): + try: + parent_iter.next() + except StopIteration: + pass + self._box.close() + if inspect is not None: + self._box = self._factory(path) + inspect() + + def _subprocess_correct(self, parent, child, child_args, + inspect=None, path=None): + # Run with proper locking and flushing in parent. + self._subprocess(parent, child, child_args, inspect, path, + lock1=True, lock2=True, flush=True) + + def _subprocess_modify_unlocked_flush(self, parent, child, child_args, + inspect=None, path=None): + # Run first step unlocked, but flush before yielding to child. + self._subprocess(parent, child, child_args, inspect, path, + lock1=False, lock2=True, flush=True) + + def _subprocess_modify_unlocked(self, parent, child, child_args, + inspect=None, path=None): + # Run first step without locks, and yield to child without flushing. + self._subprocess(parent, child, child_args, inspect, path, + lock1=False, lock2=True, flush=False) + + def _subprocess_tests(self, parent, child, child_args, + inspect=None, path=None): + # Run with some particular conditions we want to test for. + self._subprocess_correct(parent, child, child_args, inspect, path) + self._subprocess_modify_unlocked_flush(parent, child, child_args, + inspect, path) + self._subprocess_modify_unlocked(parent, child, child_args, + inspect, path) + + def test_subprocess(self): + # Check that self._subprocess runs OK with various options. + for n in range(8): + self._subprocess(only_yield, only_yield, (), lambda: None, + lock1=(n & 4), lock2=(n & 2), flush=(n & 1)) + + def test_add_by_other(self): + # Check that other process can add a message and we can read it. + msg = self._template % 0 + def parent(): + yield + self._compare_mailbox({}, [msg]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, msg)) + + def test_add_by_other_reread(self): + # Check we can read other process' message after writing our own. + msgp = self._template % 0 + msgc = self._template % 1 + def parent(): + key = self._box.add(msgp) + yield + self._compare_mailbox({key: msgp}, [msgc]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, msgc)) + + def test_interleave(self): + # Check that other process can add a message in between our messages. + p1 = self._template % "p1" + p2 = self._template % "p2" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + yield + k2 = self._box.add(p2) + self._compare_mailbox({k1: p1, k2: p2}, [c1]) + def inspect(): + self._compare_mailbox({}, [p1, c1, p2]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, c1), inspect) + + def test_delete_reread(self): + # Have other process add a message after we've deleted one. + p1 = self._template % "p1" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + del self._box[k1] + yield + self._compare_mailbox({}, [c1]) + def inspect(): + self._compare_mailbox({}, [c1]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, c1), inspect) + + def test_delete_reread2(self): + # As above, but have parent add more messages before and after. + p1 = self._template % "p1" + p2 = self._template % "p2" + p3 = self._template % "p3" + p4 = self._template % "p4" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + k2 = self._box.add(p2) + del self._box[k2] + k3 = self._box.add(p3) + yield + k4 = self._box.add(p4) + self._compare_mailbox({k1: p1, k3: p3, k4: p4}, [c1]) + def inspect(): + self._compare_mailbox({}, [p1, p3, c1, p4]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, c1), inspect) + + def test_replace_reread(self): + # Have other process add a message after we've replaced one. + p1 = self._template % "p1" + p2 = self._template % "p2" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + self._box[k1] = p2 + yield + self._compare_mailbox({k1: p2}, [c1]) + def inspect(): + self._compare_mailbox({}, [p2, c1]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, c1), inspect) + + def test_replace_reread2(self): + # As above, but have parent add more messages before and after. + p1 = self._template % "p1" + p2 = self._template % "p2" + p3 = self._template % "p3" + p4 = self._template % "p4" + p5 = self._template % "p5" + c1 = self._template % "c1" + def parent(): + k1 = self._box.add(p1) + k2 = self._box.add(p2) + self._box[k2] = p3 + k4 = self._box.add(p4) + yield + k5 = self._box.add(p5) + self._compare_mailbox({k1: p1, k2: p3, k4: p4, k5: p5}, [c1]) + def inspect(): + self._compare_mailbox({}, [p1, p3, p4, c1, p5]) + self._subprocess_tests(parent, add_message, + (self._factory, self._path, c1), inspect) + def _get_lock_path(self): # Return the path of the dot lock file. May be overridden. return self._path + '.lock' @@ -494,11 +857,13 @@ class TestMailboxSuperclass(TestBase, un self.assertRaises(NotImplementedError, lambda: box.close()) +def factory_Maildir(path, factory=None): + return mailbox.Maildir(path, factory) + class TestMaildir(TestMailbox, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) - def setUp(self): + self._factory = factory_Maildir TestMailbox.setUp(self) if os.name in ('nt', 'os2') or sys.platform == 'cygwin': self._box.colon = '!' @@ -985,9 +1350,14 @@ class _TestMboxMMDF(_TestSingleFile): self._box.close() +def factory_mbox(path, factory=None): + return mailbox.mbox(path, factory) + class TestMbox(_TestMboxMMDF, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) + def setUp(self): + self._factory = factory_mbox + _TestMboxMMDF.setUp(self) @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()') @@ -1032,14 +1402,24 @@ class TestMbox(_TestMboxMMDF, unittest.T self.assertEqual(data[-3:], '0\n\n') +def factory_MMDF(path, factory=None): + return mailbox.MMDF(path, factory) + class TestMMDF(_TestMboxMMDF, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) + def setUp(self): + self._factory = factory_MMDF + _TestMboxMMDF.setUp(self) +def factory_MH(path, factory=None): + return mailbox.MH(path, factory) + class TestMH(TestMailbox, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.MH(path, factory) + def setUp(self): + self._factory = factory_MH + TestMailbox.setUp(self) def test_list_folders(self): # List folders @@ -1169,9 +1549,14 @@ class TestMH(TestMailbox, unittest.TestC return os.path.join(self._path, '.mh_sequences.lock') +def factory_Babyl(path, factory=None): + return mailbox.Babyl(path, factory) + class TestBabyl(_TestSingleFile, unittest.TestCase): - _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) + def setUp(self): + self._factory = factory_Babyl + _TestSingleFile.setUp(self) def tearDown(self): self._box.close()