import os import sys import time import stat import socket import email import email.message import re import io import shutil import tempfile from test import support import unittest import textwrap import mailbox import glob try: import fcntl except ImportError: pass class TestBase: all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage, mailbox.mboxMessage, mailbox.MHMessage, mailbox.BabylMessage, mailbox.MMDFMessage) def _check_sample(self, msg): # Inspect a mailbox.Message representation of the sample message self.assertIsInstance(msg, email.message.Message) self.assertIsInstance(msg, mailbox.Message) for key, value in _sample_headers.items(): self.assertIn(value, msg.get_all(key)) self.assertTrue(msg.is_multipart()) self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) for i, payload in enumerate(_sample_payloads): part = msg.get_payload(i) self.assertIsInstance(part, email.message.Message) self.assertNotIsInstance(part, mailbox.Message) self.assertEqual(part.get_payload(), payload) def _delete_recursively(self, target): # Delete a file or delete a directory recursively if os.path.isdir(target): shutil.rmtree(target) elif os.path.exists(target): os.remove(target) class TestMailbox(TestBase): maxDiff = None _factory = None # Overridden by subclasses to reuse tests _template = 'From: foo\n\n%s' def setUp(self): self._path = support.TESTFN self._delete_recursively(self._path) self._box = self._factory(self._path) def tearDown(self): self._box.close() self._delete_recursively(self._path) def test_add(self): # Add copies of a sample message keys = [] keys.append(self._box.add(self._template % 0)) self.assertEqual(len(self._box), 1) keys.append(self._box.add(mailbox.Message(_sample_message))) self.assertEqual(len(self._box), 2) keys.append(self._box.add(email.message_from_string(_sample_message))) self.assertEqual(len(self._box), 3) keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) self.assertEqual(len(self._box), 4) keys.append(self._box.add(_sample_message)) self.assertEqual(len(self._box), 5) keys.append(self._box.add(_bytes_sample_message)) self.assertEqual(len(self._box), 6) with self.assertWarns(DeprecationWarning): keys.append(self._box.add( io.TextIOWrapper(io.BytesIO(_bytes_sample_message)))) self.assertEqual(len(self._box), 7) self.assertEqual(self._box.get_string(keys[0]), self._template % 0) for i in (1, 2, 3, 4, 5, 6): self._check_sample(self._box[keys[i]]) _nonascii_msg = textwrap.dedent("""\ From: foo Subject: Falinaptár házhozszállítással. Már rendeltél? 0 """) def test_add_invalid_8bit_bytes_header(self): key = self._box.add(self._nonascii_msg.encode('latin-1')) self.assertEqual(len(self._box), 1) self.assertEqual(self._box.get_bytes(key), self._nonascii_msg.encode('latin-1')) def test_invalid_nonascii_header_as_string(self): subj = self._nonascii_msg.splitlines()[1] key = self._box.add(subj.encode('latin-1')) self.assertEqual(self._box.get_string(key), 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') def test_add_nonascii_string_header_raises(self): with self.assertRaisesRegex(ValueError, "ASCII-only"): self._box.add(self._nonascii_msg) self._box.flush() self.assertEqual(len(self._box), 0) self.assertMailboxEmpty() def test_add_that_raises_leaves_mailbox_empty(self): def raiser(*args, **kw): raise Exception("a fake error") support.patch(self, email.generator.BytesGenerator, 'flatten', raiser) with self.assertRaises(Exception): self._box.add(email.message_from_string("From: Alphöso")) self.assertEqual(len(self._box), 0) self._box.close() self.assertMailboxEmpty() _non_latin_bin_msg = textwrap.dedent("""\ From: foo@bar.com To: báz Subject: Maintenant je vous présente mon collègue, le pouf célèbre \tJean de Baddie Mime-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit Да, они летят. """).encode('utf-8') def test_add_8bit_body(self): key = self._box.add(self._non_latin_bin_msg) self.assertEqual(self._box.get_bytes(key), self._non_latin_bin_msg) with self._box.get_file(key) as f: self.assertEqual(f.read(), self._non_latin_bin_msg.replace(b'\n', os.linesep.encode())) self.assertEqual(self._box[key].get_payload(), "Да, они летят.\n") def test_add_binary_file(self): with tempfile.TemporaryFile('wb+') as f: f.write(_bytes_sample_message) f.seek(0) key = self._box.add(f) self.assertEqual(self._box.get_bytes(key).split(b'\n'), _bytes_sample_message.split(b'\n')) def test_add_binary_nonascii_file(self): with tempfile.TemporaryFile('wb+') as f: f.write(self._non_latin_bin_msg) f.seek(0) key = self._box.add(f) self.assertEqual(self._box.get_bytes(key).split(b'\n'), self._non_latin_bin_msg.split(b'\n')) def test_add_text_file_warns(self): with tempfile.TemporaryFile('w+') as f: f.write(_sample_message) f.seek(0) with self.assertWarns(DeprecationWarning): key = self._box.add(f) self.assertEqual(self._box.get_bytes(key).split(b'\n'), _bytes_sample_message.split(b'\n')) def test_add_StringIO_warns(self): with self.assertWarns(DeprecationWarning): key = self._box.add(io.StringIO(self._template % "0")) self.assertEqual(self._box.get_string(key), self._template % "0") def test_add_nonascii_StringIO_raises(self): with self.assertWarns(DeprecationWarning): with self.assertRaisesRegex(ValueError, "ASCII-only"): self._box.add(io.StringIO(self._nonascii_msg)) self.assertEqual(len(self._box), 0) self._box.close() self.assertMailboxEmpty() def test_remove(self): # Remove messages using remove() self._test_remove_or_delitem(self._box.remove) def test_delitem(self): # Remove messages using __delitem__() self._test_remove_or_delitem(self._box.__delitem__) def _test_remove_or_delitem(self, method): # (Used by test_remove() and test_delitem().) key0 = self._box.add(self._template % 0) key1 = self._box.add(self._template % 1) self.assertEqual(len(self._box), 2) method(key0) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key0]) self.assertRaises(KeyError, lambda: method(key0)) self.assertEqual(self._box.get_string(key1), self._template % 1) key2 = self._box.add(self._template % 2) self.assertEqual(len(self._box), 2) method(key2) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key2]) self.assertRaises(KeyError, lambda: method(key2)) self.assertEqual(self._box.get_string(key1), self._template % 1) method(key1) self.assertEqual(len(self._box), 0) self.assertRaises(KeyError, lambda: self._box[key1]) self.assertRaises(KeyError, lambda: method(key1)) def test_discard(self, repetitions=10): # Discard messages key0 = self._box.add(self._template % 0) key1 = self._box.add(self._template % 1) self.assertEqual(len(self._box), 2) self._box.discard(key0) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key0]) self._box.discard(key0) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key0]) def test_get(self): # Retrieve messages using get() key0 = self._box.add(self._template % 0) msg = self._box.get(key0) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '0') self.assertIs(self._box.get('foo'), None) self.assertIs(self._box.get('foo', False), False) self._box.close() self._box = self._factory(self._path) key1 = self._box.add(self._template % 1) msg = self._box.get(key1) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '1') def test_getitem(self): # Retrieve message using __getitem__() key0 = self._box.add(self._template % 0) msg = self._box[key0] self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '0') self.assertRaises(KeyError, lambda: self._box['foo']) self._box.discard(key0) self.assertRaises(KeyError, lambda: self._box[key0]) def test_get_message(self): # Get Message representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) msg0 = self._box.get_message(key0) self.assertIsInstance(msg0, mailbox.Message) self.assertEqual(msg0['from'], 'foo') self.assertEqual(msg0.get_payload(), '0') self._check_sample(self._box.get_message(key1)) def test_get_bytes(self): # Get bytes representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) self.assertEqual(self._box.get_bytes(key0), (self._template % 0).encode('ascii')) self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) def test_get_string(self): # Get string representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) self.assertEqual(self._box.get_string(key0), self._template % 0) self.assertEqual(self._box.get_string(key1).split('\n'), _sample_message.split('\n')) def test_get_file(self): # Get file representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) with self._box.get_file(key0) as file: data0 = file.read() with self._box.get_file(key1) as file: data1 = file.read() self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), self._template % 0) self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), _sample_message) def test_get_file_can_be_closed_twice(self): # Issue 11700 key = self._box.add(_sample_message) f = self._box.get_file(key) f.close() f.close() def test_iterkeys(self): # Get keys using iterkeys() self._check_iteration(self._box.keys, do_keys=True, do_values=False) def test_keys(self): # Get keys using keys() self._check_iteration(self._box.keys, do_keys=True, do_values=False) def test_itervalues(self): # Get values using itervalues() self._check_iteration(self._box.values, do_keys=False, do_values=True) def test_iter(self): # Get values using __iter__() self._check_iteration(self._box.__iter__, do_keys=False, do_values=True) def test_values(self): # Get values using values() self._check_iteration(self._box.values, do_keys=False, do_values=True) def test_iteritems(self): # Get keys and values using iteritems() self._check_iteration(self._box.items, do_keys=True, do_values=True) def test_items(self): # Get keys and values using items() self._check_iteration(self._box.items, do_keys=True, do_values=True) def _check_iteration(self, method, do_keys, do_values, repetitions=10): for value in method(): self.fail("Not empty") keys, values = [], [] for i in range(repetitions): keys.append(self._box.add(self._template % i)) values.append(self._template % i) if do_keys and not do_values: returned_keys = list(method()) elif do_values and not do_keys: returned_values = list(method()) else: returned_keys, returned_values = [], [] for key, value in method(): returned_keys.append(key) returned_values.append(value) if do_keys: self.assertEqual(len(keys), len(returned_keys)) self.assertEqual(set(keys), set(returned_keys)) if do_values: count = 0 for value in returned_values: self.assertEqual(value['from'], 'foo') self.assertLess(int(value.get_payload()), repetitions) count += 1 self.assertEqual(len(values), count) def test_contains(self): # Check existence of keys using __contains__() self.assertNotIn('foo', self._box) key0 = self._box.add(self._template % 0) self.assertIn(key0, self._box) self.assertNotIn('foo', self._box) key1 = self._box.add(self._template % 1) self.assertIn(key1, self._box) self.assertIn(key0, self._box) self.assertNotIn('foo', self._box) self._box.remove(key0) self.assertNotIn(key0, self._box) self.assertIn(key1, self._box) self.assertNotIn('foo', self._box) self._box.remove(key1) self.assertNotIn(key1, self._box) self.assertNotIn(key0, self._box) self.assertNotIn('foo', self._box) def test_len(self, repetitions=10): # Get message count keys = [] for i in range(repetitions): self.assertEqual(len(self._box), i) keys.append(self._box.add(self._template % i)) self.assertEqual(len(self._box), i + 1) for i in range(repetitions): self.assertEqual(len(self._box), repetitions - i) self._box.remove(keys[i]) self.assertEqual(len(self._box), repetitions - i - 1) def test_set_item(self): # Modify messages using __setitem__() key0 = self._box.add(self._template % 'original 0') self.assertEqual(self._box.get_string(key0), self._template % 'original 0') key1 = self._box.add(self._template % 'original 1') self.assertEqual(self._box.get_string(key1), self._template % 'original 1') self._box[key0] = self._template % 'changed 0' self.assertEqual(self._box.get_string(key0), self._template % 'changed 0') self._box[key1] = self._template % 'changed 1' self.assertEqual(self._box.get_string(key1), self._template % 'changed 1') self._box[key0] = _sample_message self._check_sample(self._box[key0]) self._box[key1] = self._box[key0] self._check_sample(self._box[key1]) self._box[key0] = self._template % 'original 0' self.assertEqual(self._box.get_string(key0), self._template % 'original 0') self._check_sample(self._box[key1]) self.assertRaises(KeyError, lambda: self._box.__setitem__('foo', 'bar')) self.assertRaises(KeyError, lambda: self._box['foo']) self.assertEqual(len(self._box), 2) def test_clear(self, iterations=10): # Remove all messages using clear() keys = [] for i in range(iterations): self._box.add(self._template % i) for i, key in enumerate(keys): self.assertEqual(self._box.get_string(key), self._template % i) self._box.clear() self.assertEqual(len(self._box), 0) for i, key in enumerate(keys): self.assertRaises(KeyError, lambda: self._box.get_string(key)) def test_pop(self): # Get and remove a message using pop() key0 = self._box.add(self._template % 0) self.assertIn(key0, self._box) key1 = self._box.add(self._template % 1) self.assertIn(key1, self._box) self.assertEqual(self._box.pop(key0).get_payload(), '0') self.assertNotIn(key0, self._box) self.assertIn(key1, self._box) key2 = self._box.add(self._template % 2) self.assertIn(key2, self._box) self.assertEqual(self._box.pop(key2).get_payload(), '2') self.assertNotIn(key2, self._box) self.assertIn(key1, self._box) self.assertEqual(self._box.pop(key1).get_payload(), '1') self.assertNotIn(key1, self._box) self.assertEqual(len(self._box), 0) def test_popitem(self, iterations=10): # Get and remove an arbitrary (key, message) using popitem() keys = [] for i in range(10): keys.append(self._box.add(self._template % i)) seen = [] for i in range(10): key, msg = self._box.popitem() self.assertIn(key, keys) self.assertNotIn(key, seen) seen.append(key) self.assertEqual(int(msg.get_payload()), keys.index(key)) self.assertEqual(len(self._box), 0) for key in keys: self.assertRaises(KeyError, lambda: self._box[key]) def test_update(self): # Modify multiple messages using update() key0 = self._box.add(self._template % 'original 0') key1 = self._box.add(self._template % 'original 1') key2 = self._box.add(self._template % 'original 2') self._box.update({key0: self._template % 'changed 0', key2: _sample_message}) self.assertEqual(len(self._box), 3) self.assertEqual(self._box.get_string(key0), self._template % 'changed 0') self.assertEqual(self._box.get_string(key1), self._template % 'original 1') self._check_sample(self._box[key2]) self._box.update([(key2, self._template % 'changed 2'), (key1, self._template % 'changed 1'), (key0, self._template % 'original 0')]) self.assertEqual(len(self._box), 3) self.assertEqual(self._box.get_string(key0), self._template % 'original 0') self.assertEqual(self._box.get_string(key1), self._template % 'changed 1') self.assertEqual(self._box.get_string(key2), self._template % 'changed 2') self.assertRaises(KeyError, lambda: self._box.update({'foo': 'bar', key0: self._template % "changed 0"})) self.assertEqual(len(self._box), 3) self.assertEqual(self._box.get_string(key0), self._template % "changed 0") self.assertEqual(self._box.get_string(key1), self._template % "changed 1") self.assertEqual(self._box.get_string(key2), self._template % "changed 2") def test_flush(self): # Write changes to disk self._test_flush_or_close(self._box.flush, True) def test_popitem_and_flush_twice(self): # See #15036. self._box.add(self._template % 0) self._box.add(self._template % 1) self._box.flush() self._box.popitem() self._box.flush() self._box.popitem() self._box.flush() def test_lock_unlock(self): # Lock and unlock the mailbox self.assertFalse(os.path.exists(self._get_lock_path())) self._box.lock() self.assertTrue(os.path.exists(self._get_lock_path())) self._box.unlock() self.assertFalse(os.path.exists(self._get_lock_path())) def test_close(self): # Close mailbox and flush changes to disk self._test_flush_or_close(self._box.close, False) def _test_flush_or_close(self, method, should_call_close): contents = [self._template % i for i in range(3)] self._box.add(contents[0]) self._box.add(contents[1]) self._box.add(contents[2]) oldbox = self._box method() if should_call_close: self._box.close() self._box = self._factory(self._path) keys = self._box.keys() self.assertEqual(len(keys), 3) for key in keys: self.assertIn(self._box.get_string(key), contents) oldbox.close() def test_dump_message(self): # Write message representations to disk for input in (email.message_from_string(_sample_message), _sample_message, io.BytesIO(_bytes_sample_message)): output = io.BytesIO() self._box._dump_message(input, output) self.assertEqual(output.getvalue(), _bytes_sample_message.replace(b'\n', os.linesep.encode())) output = io.BytesIO() self.assertRaises(TypeError, lambda: self._box._dump_message(None, output)) def _get_lock_path(self): # Return the path of the dot lock file. May be overridden. return self._path + '.lock' class TestMailboxSuperclass(TestBase, unittest.TestCase): def test_notimplemented(self): # Test that all Mailbox methods raise NotImplementedException. box = mailbox.Mailbox('path') self.assertRaises(NotImplementedError, lambda: box.add('')) self.assertRaises(NotImplementedError, lambda: box.remove('')) self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) self.assertRaises(NotImplementedError, lambda: box.discard('')) self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) self.assertRaises(NotImplementedError, lambda: box.keys()) self.assertRaises(NotImplementedError, lambda: box.keys()) self.assertRaises(NotImplementedError, lambda: box.values().__next__()) self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) self.assertRaises(NotImplementedError, lambda: box.values()) self.assertRaises(NotImplementedError, lambda: box.items().next()) self.assertRaises(NotImplementedError, lambda: box.items()) self.assertRaises(NotImplementedError, lambda: box.get('')) self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) self.assertRaises(NotImplementedError, lambda: box.get_message('')) self.assertRaises(NotImplementedError, lambda: box.get_string('')) self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) self.assertRaises(NotImplementedError, lambda: box.get_file('')) self.assertRaises(NotImplementedError, lambda: '' in box) self.assertRaises(NotImplementedError, lambda: box.__contains__('')) self.assertRaises(NotImplementedError, lambda: box.__len__()) self.assertRaises(NotImplementedError, lambda: box.clear()) self.assertRaises(NotImplementedError, lambda: box.pop('')) self.assertRaises(NotImplementedError, lambda: box.popitem()) self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) self.assertRaises(NotImplementedError, lambda: box.flush()) self.assertRaises(NotImplementedError, lambda: box.lock()) self.assertRaises(NotImplementedError, lambda: box.unlock()) self.assertRaises(NotImplementedError, lambda: box.close()) class TestMaildir(TestMailbox, unittest.TestCase): _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) def setUp(self): TestMailbox.setUp(self) if os.name in ('nt', 'os2') or sys.platform == 'cygwin': self._box.colon = '!' def assertMailboxEmpty(self): self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) def test_add_MM(self): # Add a MaildirMessage instance msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_info('foo') key = self._box.add(msg) self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % (key, self._box.colon)))) def test_get_MM(self): # Get a MaildirMessage instance msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_flags('RF') key = self._box.add(msg) msg_returned = self._box.get_message(key) self.assertIsInstance(msg_returned, mailbox.MaildirMessage) self.assertEqual(msg_returned.get_subdir(), 'cur') self.assertEqual(msg_returned.get_flags(), 'FR') def test_set_MM(self): # Set with a MaildirMessage instance msg0 = mailbox.MaildirMessage(self._template % 0) msg0.set_flags('TP') key = self._box.add(msg0) msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), 'PT') msg1 = mailbox.MaildirMessage(self._template % 1) self._box[key] = msg1 msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), '') self.assertEqual(msg_returned.get_payload(), '1') msg2 = mailbox.MaildirMessage(self._template % 2) msg2.set_info('2,S') self._box[key] = msg2 self._box[key] = self._template % 3 msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), 'S') self.assertEqual(msg_returned.get_payload(), '3') def test_consistent_factory(self): # Add a message. msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_flags('RF') key = self._box.add(msg) # Create new mailbox with class FakeMessage(mailbox.MaildirMessage): pass box = mailbox.Maildir(self._path, factory=FakeMessage) box.colon = self._box.colon msg2 = box.get_message(key) self.assertIsInstance(msg2, FakeMessage) def test_initialize_new(self): # Initialize a non-existent mailbox self.tearDown() self._box = mailbox.Maildir(self._path) self._check_basics() self._delete_recursively(self._path) self._box = self._factory(self._path, factory=None) self._check_basics() def test_initialize_existing(self): # Initialize an existing mailbox self.tearDown() for subdir in '', 'tmp', 'new', 'cur': os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) self._box = mailbox.Maildir(self._path) self._check_basics() def _check_basics(self, factory=None): # (Used by test_open_new() and test_open_existing().) self.assertEqual(self._box._path, os.path.abspath(self._path)) self.assertEqual(self._box._factory, factory) for subdir in '', 'tmp', 'new', 'cur': path = os.path.join(self._path, subdir) mode = os.stat(path)[stat.ST_MODE] self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) def test_list_folders(self): # List folders self._box.add_folder('one') self._box.add_folder('two') self._box.add_folder('three') self.assertEqual(len(self._box.list_folders()), 3) self.assertEqual(set(self._box.list_folders()), set(('one', 'two', 'three'))) def test_get_folder(self): # Open folders self._box.add_folder('foo.bar') folder0 = self._box.get_folder('foo.bar') folder0.add(self._template % 'bar') self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) folder1 = self._box.get_folder('foo.bar') self.assertEqual(folder1.get_string(folder1.keys()[0]), self._template % 'bar') def test_add_and_remove_folders(self): # Delete folders self._box.add_folder('one') self._box.add_folder('two') self.assertEqual(len(self._box.list_folders()), 2) self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) self._box.remove_folder('one') self.assertEqual(len(self._box.list_folders()), 1) self.assertEqual(set(self._box.list_folders()), set(('two',))) self._box.add_folder('three') self.assertEqual(len(self._box.list_folders()), 2) self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) self._box.remove_folder('three') self.assertEqual(len(self._box.list_folders()), 1) self.assertEqual(set(self._box.list_folders()), set(('two',))) self._box.remove_folder('two') self.assertEqual(len(self._box.list_folders()), 0) self.assertEqual(self._box.list_folders(), []) def test_clean(self): # Remove old files from 'tmp' foo_path = os.path.join(self._path, 'tmp', 'foo') bar_path = os.path.join(self._path, 'tmp', 'bar') with open(foo_path, 'w') as f: f.write("@") with open(bar_path, 'w') as f: f.write("@") self._box.clean() self.assertTrue(os.path.exists(foo_path)) self.assertTrue(os.path.exists(bar_path)) foo_stat = os.stat(foo_path) os.utime(foo_path, (time.time() - 129600 - 2, foo_stat.st_mtime)) self._box.clean() self.assertFalse(os.path.exists(foo_path)) self.assertTrue(os.path.exists(bar_path)) def test_create_tmp(self, repetitions=10): # Create files in tmp directory hostname = socket.gethostname() if '/' in hostname: hostname = hostname.replace('/', r'\057') if ':' in hostname: hostname = hostname.replace(':', r'\072') pid = os.getpid() pattern = re.compile(r"(?P