Index: mailbox.py =================================================================== RCS file: /cvsroot/python/python/dist/src/Lib/mailbox.py,v retrieving revision 1.40 diff -u -u -r1.40 mailbox.py --- mailbox.py 12 Sep 2002 05:08:00 -0000 1.40 +++ mailbox.py 14 Feb 2003 02:28:11 -0000 @@ -3,8 +3,10 @@ """Classes to handle Unix style, MMDF style, and MH style mailboxes.""" -import rfc822 import os +import time, socket +import rfc822 +import glob __all__ = ["UnixMailbox","MmdfMailbox","MHMailbox","Maildir","BabylMailbox", "PortableUnixMailbox"] @@ -246,6 +248,130 @@ fp = open(fn) return self.factory(fp) + # Counter used when creating multiple mail messages per second. + count = 1 + # Time of creation of last message + last_time = None + + # XXX should this method be private or not? I can't think of + # a reason an external caller would need to generate a filename. + def _new_filename (self): + """new_filename() : string + + Generate a unique filename, following the rules specified at + + """ + t = int(time.time()) + if t == self.last_time: + filename = '%s.%s_%s.%s' % (t, os.getpid(), self.count, + socket.gethostname()) + self.count += 1 + else: + filename = '%s.%s.%s' % (t, os.getpid(), socket.gethostname()) + self.last_time = t + + return filename + + + def add_message (self, msg, dir='new'): + """add_message(msg: rfc822.Message, dir:string) : string + + Add the message to the mailbox to the specified subdirectory, + returning the newly-generated filename + (e.g. "103706621.26498_3.nyman"). The message is first + written into the tmp/ directory and then moved into the + specified directory. There is no way to specify the message + status; call .add_message() and then call .move_message(). + """ + filename = self._new_filename() + tmp_dir = os.path.join(self.dirname, 'tmp') + tmp_path = os.path.join(tmp_dir, filename) + new_dir = os.path.join(self.dirname, dir) + new_path = os.path.join(new_dir, filename) + + # Write the message into the temporary directory + output = open(tmp_path, 'wt') + output.write(str(msg)) # Write the headers + output.write('\n') + # Copy the body of the message. + while 1: + data = msg.fp.read(4096) + if data == "": + break + output.write(data) + + output.close() + + # Move the message into the new/ directory. + os.rename(tmp_path, new_path) + self.boxes.append(new_path) + return filename + + + def move_message (self, filename, src_folder, dest_folder, + status=None, flags=None): + """move_message(string, string, string, string, string) + + Move the message with the given filename from the source to + the destination folder. Optionally, the status and flags + can be set to the strings you provide; otherwise they're left + unchanged. + """ + + src_dir = os.path.join(self.dirname, src_folder) + dest_dir = os.path.join(self.dirname, dest_folder) + + # Find matching filename; the '*' is used in case there's + # an information field (e.g. ":2,FRS") appended to the filename. + pattern = os.path.join(src_dir, filename + '*') + L = glob.glob(pattern) + if len(L) > 1: + raise IOError, ('Only a single file should match the filename %r' + % filename) + if not L: + # XXX what exception to use here? + raise IOError, 'No message with filename %r' % filename + + # Figure out the destination filename. If there's a ':' in + # the filename, we have to split off the status field and + # flags. + actual_filename = os.path.basename(L[0]) + if ':' in actual_filename: + index = actual_filename.index(':') + info = actual_filename[index:].split(',', 1) + + if status is None: + status = info[0] + if flags is None: + flags = info[1] + + actual_filename = actual_filename[:index] + + # Sort the flags, turning 'SRF' to 'FRS', for example. + if flags: + flags = map(None, flags) + flags.sort() + flags = ''.join(flags) + + # Ensure both status and flags are supplied. + if dest_folder == 'cur' and (status is None or flags is None): + raise ValueError, ("Both status and flags must be supplied " + "to move a message into the cur/ folder") + elif (status is None) ^ (flags is None): + raise ValueError, "status and flags must both be supplied" + + # Generate destination filename + if status is not None and flags is not None: + dest_filename = '%s:%s,%s' % (actual_filename, status, flags) + else: + dest_filename = actual_filename + + # Rename the file (finally!) + dest_path = os.path.join(dest_dir, dest_filename) + os.rename(L[0], dest_path) + self.boxes.remove(L[0]) + self.boxes.append(dest_path) + class BabylMailbox(_Mailbox): Index: test/test_mailbox.py =================================================================== RCS file: /cvsroot/python/python/dist/src/Lib/test/test_mailbox.py,v retrieving revision 1.9 diff -u -u -r1.9 test_mailbox.py --- test/test_mailbox.py 23 Jul 2002 19:03:56 -0000 1.9 +++ test/test_mailbox.py 14 Feb 2003 02:28:11 -0000 @@ -1,5 +1,5 @@ -import mailbox -import os +import mailbox, rfc822, StringIO +import os, shutil import time import unittest from test import test_support @@ -29,69 +29,78 @@ os.mkdir(os.path.join(self._dir, "tmp")) os.mkdir(os.path.join(self._dir, "new")) self._counter = 1 - self._msgfiles = [] def tearDown(self): - map(os.unlink, self._msgfiles) - os.rmdir(os.path.join(self._dir, "cur")) - os.rmdir(os.path.join(self._dir, "tmp")) - os.rmdir(os.path.join(self._dir, "new")) - os.rmdir(self._dir) + shutil.rmtree(self._dir) def createMessage(self, dir): - t = int(time.time() % 1000000) - pid = self._counter - self._counter += 1 - filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain")) - tmpname = os.path.join(self._dir, "tmp", filename) - newname = os.path.join(self._dir, dir, filename) - fp = open(tmpname, "w") - self._msgfiles.append(tmpname) - fp.write(DUMMY_MESSAGE) - fp.close() - if hasattr(os, "link"): - os.link(tmpname, newname) - else: - fp = open(newname, "w") - fp.write(DUMMY_MESSAGE) - fp.close() - self._msgfiles.append(newname) - + msg = rfc822.Message(StringIO.StringIO(DUMMY_MESSAGE)) + filename = self.mbox.add_message(msg, dir) + return filename + def test_empty_maildir(self): """Test an empty maildir mailbox""" + self.mbox = mailbox.Maildir(test_support.TESTFN) # Test for regression on bug #117490: # Make sure the boxes attribute actually gets set. - self.mbox = mailbox.Maildir(test_support.TESTFN) self.assert_(hasattr(self.mbox, "boxes")) self.assert_(len(self.mbox.boxes) == 0) self.assert_(self.mbox.next() is None) self.assert_(self.mbox.next() is None) def test_nonempty_maildir_cur(self): - self.createMessage("cur") self.mbox = mailbox.Maildir(test_support.TESTFN) + self.createMessage("cur") self.assert_(len(self.mbox.boxes) == 1) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is None) self.assert_(self.mbox.next() is None) def test_nonempty_maildir_new(self): - self.createMessage("new") self.mbox = mailbox.Maildir(test_support.TESTFN) + self.createMessage("new") self.assert_(len(self.mbox.boxes) == 1) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is None) self.assert_(self.mbox.next() is None) def test_nonempty_maildir_both(self): + self.mbox = mailbox.Maildir(test_support.TESTFN) self.createMessage("cur") self.createMessage("new") - self.mbox = mailbox.Maildir(test_support.TESTFN) self.assert_(len(self.mbox.boxes) == 2) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is not None) self.assert_(self.mbox.next() is None) self.assert_(self.mbox.next() is None) + + def test_nonempty_maildir_move(self): + self.mbox = mailbox.Maildir(test_support.TESTFN) + fn1 = self.createMessage("cur") + fn2 = self.createMessage("new") + fn3 = self.createMessage("new") + + # Test for expected error when an empty filename is specified + self.failUnlessRaises(IOError, self.mbox.move_message, + '', 'new', 'cur', status=2) + # Test for expected error when a non-existent filename is specified + self.failUnlessRaises(IOError, self.mbox.move_message, + 'asdfghjkl', 'new', 'cur', status=2) + + # Test moving when only one of status/flags is supplied + self.failUnlessRaises(ValueError, self.mbox.move_message, + fn2, 'new', 'cur', status=2) + self.failUnlessRaises(ValueError, self.mbox.move_message, + fn2, 'new', 'cur', flags='R') + # Try moving a real message from a folder that doesn't contain it + self.failUnlessRaises(IOError, self.mbox.move_message, + fn1, 'new', 'cur', flags='R') + + # Finally, we'll try some moves that should be successful. + self.mbox.move_message(fn1, 'cur', 'new') + self.mbox.move_message(fn2, 'new', 'cur', status=2, flags='SRF') + self.mbox.move_message(fn2, 'cur', 'new') + self.mbox.move_message(fn2, 'new', 'cur') # XXX We still need more tests!