"""Unit tests for contextlib.py, and other context managers.""" import sys import tempfile import unittest from contextlib import * # Tests __all__ from test import support try: import threading except ImportError: threading = None class ContextManagerTestCase(unittest.TestCase): def test_contextmanager_plain(self): state = [] @contextmanager def woohoo(): state.append(1) yield 42 state.append(999) with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) self.assertEqual(state, [1, 42, 999]) def test_contextmanager_finally(self): state = [] @contextmanager def woohoo(): state.append(1) try: yield 42 finally: state.append(999) with self.assertRaises(ZeroDivisionError): with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) raise ZeroDivisionError() self.assertEqual(state, [1, 42, 999]) def test_contextmanager_no_reraise(self): @contextmanager def whee(): yield ctx = whee() ctx.__enter__() # Calling __exit__ should not result in an exception self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None)) def test_contextmanager_trap_yield_after_throw(self): @contextmanager def whoo(): try: yield except: yield ctx = whoo() ctx.__enter__() self.assertRaises( RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None ) def test_contextmanager_except(self): state = [] @contextmanager def woohoo(): state.append(1) try: yield 42 except ZeroDivisionError as e: state.append(e.args[0]) self.assertEqual(state, [1, 42, 999]) with woohoo() as x: self.assertEqual(state, [1]) self.assertEqual(x, 42) state.append(x) raise ZeroDivisionError(999) self.assertEqual(state, [1, 42, 999]) def _create_contextmanager_attribs(self): def attribs(**kw): def decorate(func): for k,v in kw.items(): setattr(func,k,v) return func return decorate @contextmanager @attribs(foo='bar') def baz(spam): """Whee!""" return baz def test_contextmanager_attribs(self): baz = self._create_contextmanager_attribs() self.assertEqual(baz.__name__,'baz') self.assertEqual(baz.foo, 'bar') @unittest.skipIf(sys.flags.optimize >= 2, "Docstrings are omitted with -O2 and above") def test_contextmanager_doc_attrib(self): baz = self._create_contextmanager_attribs() self.assertEqual(baz.__doc__, "Whee!") class ClosingTestCase(unittest.TestCase): # XXX This needs more work def test_closing(self): state = [] class C: def close(self): state.append(1) x = C() self.assertEqual(state, []) with closing(x) as y: self.assertEqual(x, y) self.assertEqual(state, [1]) def test_closing_error(self): state = [] class C: def close(self): state.append(1) x = C() self.assertEqual(state, []) with self.assertRaises(ZeroDivisionError): with closing(x) as y: self.assertEqual(x, y) 1 / 0 self.assertEqual(state, [1]) class FileContextTestCase(unittest.TestCase): def testWithOpen(self): tfn = tempfile.mktemp() try: f = None with open(tfn, "w") as f: self.assertFalse(f.closed) f.write("Booh\n") self.assertTrue(f.closed) f = None with self.assertRaises(ZeroDivisionError): with open(tfn, "r") as f: self.assertFalse(f.closed) self.assertEqual(f.read(), "Booh\n") 1 / 0 self.assertTrue(f.closed) finally: support.unlink(tfn) @unittest.skipUnless(threading, 'Threading required for this test.') class LockContextTestCase(unittest.TestCase): def boilerPlate(self, lock, locked): self.assertFalse(locked()) with lock: self.assertTrue(locked()) self.assertFalse(locked()) with self.assertRaises(ZeroDivisionError): with lock: self.assertTrue(locked()) 1 / 0 self.assertFalse(locked()) def testWithLock(self): lock = threading.Lock() self.boilerPlate(lock, lock.locked) def testWithRLock(self): lock = threading.RLock() self.boilerPlate(lock, lock._is_owned) def testWithCondition(self): lock = threading.Condition() def locked(): return lock._is_owned() self.boilerPlate(lock, locked) def testWithSemaphore(self): lock = threading.Semaphore() def locked(): if lock.acquire(False): lock.release() return False else: return True self.boilerPlate(lock, locked) def testWithBoundedSemaphore(self): lock = threading.BoundedSemaphore() def locked(): if lock.acquire(False): lock.release() return False else: return True self.boilerPlate(lock, locked) class mycontext(ContextDecorator): started = False exc = None catch = False def __enter__(self): self.started = True return self def __exit__(self, *exc): self.exc = exc return self.catch class TestContextDecorator(unittest.TestCase): def test_contextdecorator(self): context = mycontext() with context as result: self.assertIs(result, context) self.assertTrue(context.started) self.assertEqual(context.exc, (None, None, None)) def test_contextdecorator_with_exception(self): context = mycontext() with self.assertRaisesRegex(NameError, 'foo'): with context: raise NameError('foo') self.assertIsNotNone(context.exc) self.assertIs(context.exc[0], NameError) context = mycontext() context.catch = True with context: raise NameError('foo') self.assertIsNotNone(context.exc) self.assertIs(context.exc[0], NameError) def test_decorator(self): context = mycontext() @context def test(): self.assertIsNone(context.exc) self.assertTrue(context.started) test() self.assertEqual(context.exc, (None, None, None)) def test_decorator_with_exception(self): context = mycontext() @context def test(): self.assertIsNone(context.exc) self.assertTrue(context.started) raise NameError('foo') with self.assertRaisesRegex(NameError, 'foo'): test() self.assertIsNotNone(context.exc) self.assertIs(context.exc[0], NameError) def test_decorating_method(self): context = mycontext() class Test(object): @context def method(self, a, b, c=None): self.a = a self.b = b self.c = c # these tests are for argument passing when used as a decorator test = Test() test.method(1, 2) self.assertEqual(test.a, 1) self.assertEqual(test.b, 2) self.assertEqual(test.c, None) test = Test() test.method('a', 'b', 'c') self.assertEqual(test.a, 'a') self.assertEqual(test.b, 'b') self.assertEqual(test.c, 'c') test = Test() test.method(a=1, b=2) self.assertEqual(test.a, 1) self.assertEqual(test.b, 2) def test_typo_enter(self): class mycontext(ContextDecorator): def __unter__(self): pass def __exit__(self, *exc): pass with self.assertRaises(AttributeError): with mycontext(): pass def test_typo_exit(self): class mycontext(ContextDecorator): def __enter__(self): pass def __uxit__(self, *exc): pass with self.assertRaises(AttributeError): with mycontext(): pass def test_contextdecorator_as_mixin(self): class somecontext(object): started = False exc = None def __enter__(self): self.started = True return self def __exit__(self, *exc): self.exc = exc class mycontext(somecontext, ContextDecorator): pass context = mycontext() @context def test(): self.assertIsNone(context.exc) self.assertTrue(context.started) test() self.assertEqual(context.exc, (None, None, None)) def test_contextmanager_as_decorator(self): @contextmanager def woohoo(y): state.append(y) yield state.append(999) state = [] @woohoo(1) def test(x): self.assertEqual(state, [1]) state.append(x) test('something') self.assertEqual(state, [1, 'something', 999]) # Issue #11647: Ensure the decorated function is 'reusable' state = [] test('something else') self.assertEqual(state, [1, 'something else', 999]) class TestExitStack(unittest.TestCase): def test_no_resources(self): with ExitStack(): pass def test_callback(self): expected = [ ((), {}), ((1,), {}), ((1,2), {}), ((), dict(example=1)), ((1,), dict(example=1)), ((1,2), dict(example=1)), ] result = [] def _exit(*args, **kwds): """Test metadata propagation""" result.append((args, kwds)) with ExitStack() as stack: for args, kwds in reversed(expected): if args and kwds: f = stack.callback(_exit, *args, **kwds) elif args: f = stack.callback(_exit, *args) elif kwds: f = stack.callback(_exit, **kwds) else: f = stack.callback(_exit) self.assertIs(f, _exit) for wrapper in stack._exit_callbacks: self.assertIs(wrapper.__wrapped__, _exit) self.assertNotEqual(wrapper.__name__, _exit.__name__) self.assertIsNone(wrapper.__doc__, _exit.__doc__) self.assertEqual(result, expected) def test_push(self): exc_raised = ZeroDivisionError def _expect_exc(exc_type, exc, exc_tb): self.assertIs(exc_type, exc_raised) def _suppress_exc(*exc_details): return True def _expect_ok(exc_type, exc, exc_tb): self.assertIsNone(exc_type) self.assertIsNone(exc) self.assertIsNone(exc_tb) class ExitCM(object): def __init__(self, check_exc): self.check_exc = check_exc def __enter__(self): self.fail("Should not be called!") def __exit__(self, *exc_details): self.check_exc(*exc_details) with ExitStack() as stack: stack.push(_expect_ok) self.assertIs(stack._exit_callbacks[-1], _expect_ok) cm = ExitCM(_expect_ok) stack.push(cm) self.assertIs(stack._exit_callbacks[-1].__self__, cm) stack.push(_suppress_exc) self.assertIs(stack._exit_callbacks[-1], _suppress_exc) cm = ExitCM(_expect_exc) stack.push(cm) self.assertIs(stack._exit_callbacks[-1].__self__, cm) stack.push(_expect_exc) self.assertIs(stack._exit_callbacks[-1], _expect_exc) stack.push(_expect_exc) self.assertIs(stack._exit_callbacks[-1], _expect_exc) 1/0 def test_enter_context(self): class TestCM(object): def __enter__(self): result.append(1) def __exit__(self, *exc_details): result.append(3) result = [] cm = TestCM() with ExitStack() as stack: @stack.callback # Registered first => cleaned up last def _exit(): result.append(4) self.assertIsNotNone(_exit) stack.enter_context(cm) self.assertIs(stack._exit_callbacks[-1].__self__, cm) result.append(2) self.assertEqual(result, [1, 2, 3, 4]) def test_close(self): result = [] with ExitStack() as stack: @stack.callback def _exit(): result.append(1) self.assertIsNotNone(_exit) stack.close() result.append(2) self.assertEqual(result, [1, 2]) def test_pop_all(self): result = [] with ExitStack() as stack: @stack.callback def _exit(): result.append(3) self.assertIsNotNone(_exit) new_stack = stack.pop_all() result.append(1) result.append(2) new_stack.close() self.assertEqual(result, [1, 2, 3]) def test_exit_raise(self): with self.assertRaises(ZeroDivisionError): with ExitStack() as stack: stack.push(lambda *exc: False) 1/0 def test_exit_suppress(self): with ExitStack() as stack: stack.push(lambda *exc: True) 1/0 def test_exit_exception_chaining_reference(self): # Sanity check to make sure that ExitStack chaining matches # actual nested with statements class RaiseExc: def __init__(self, exc): self.exc = exc def __enter__(self): return self def __exit__(self, *exc_details): raise self.exc class RaiseExcWithContext: def __init__(self, outer, inner): self.outer = outer self.inner = inner def __enter__(self): return self def __exit__(self, *exc_details): try: raise self.inner except: raise self.outer class SuppressExc: def __enter__(self): return self def __exit__(self, *exc_details): type(self).saved_details = exc_details return True try: with RaiseExc(IndexError): with RaiseExcWithContext(KeyError, AttributeError): with SuppressExc(): with RaiseExc(ValueError): 1 / 0 except IndexError as exc: self.assertIsInstance(exc.__context__, KeyError) self.assertIsInstance(exc.__context__.__context__, AttributeError) # Inner exceptions were suppressed self.assertIsNone(exc.__context__.__context__.__context__) else: self.fail("Expected IndexError, but no exception was raised") # Check the inner exceptions inner_exc = SuppressExc.saved_details[1] self.assertIsInstance(inner_exc, ValueError) self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) def test_exit_exception_chaining(self): # Ensure exception chaining matches the reference behaviour def raise_exc(exc): raise exc saved_details = None def suppress_exc(*exc_details): nonlocal saved_details saved_details = exc_details return True try: with ExitStack() as stack: stack.callback(raise_exc, IndexError) stack.callback(raise_exc, KeyError) stack.callback(raise_exc, AttributeError) stack.push(suppress_exc) stack.callback(raise_exc, ValueError) 1 / 0 except IndexError as exc: self.assertIsInstance(exc.__context__, KeyError) self.assertIsInstance(exc.__context__.__context__, AttributeError) # Inner exceptions were suppressed self.assertIsNone(exc.__context__.__context__.__context__) else: self.fail("Expected IndexError, but no exception was raised") # Check the inner exceptions inner_exc = saved_details[1] self.assertIsInstance(inner_exc, ValueError) self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) def test_exit_exception_non_suppressing(self): # http://bugs.python.org/issue19092 def raise_exc(exc): raise exc def suppress_exc(*exc_details): return True try: with ExitStack() as stack: stack.callback(lambda: None) stack.callback(raise_exc, IndexError) except Exception as exc: self.assertIsInstance(exc, IndexError) else: self.fail("Expected IndexError, but no exception was raised") try: with ExitStack() as stack: stack.callback(raise_exc, KeyError) stack.push(suppress_exc) stack.callback(raise_exc, IndexError) except Exception as exc: self.assertIsInstance(exc, KeyError) else: self.fail("Expected KeyError, but no exception was raised") def test_exit_exception_with_correct_context(self): # http://bugs.python.org/issue20317 @contextmanager def gets_the_context_right(exc): try: yield finally: raise exc exc1 = Exception(1) exc2 = Exception(2) exc3 = Exception(3) exc4 = Exception(4) # The contextmanager already fixes the context, so prior to the # fix, ExitStack would try to fix it *again* and get into an # infinite self-referential loop try: with ExitStack() as stack: stack.enter_context(gets_the_context_right(exc4)) stack.enter_context(gets_the_context_right(exc3)) stack.enter_context(gets_the_context_right(exc2)) raise exc1 except Exception as exc: self.assertIs(exc, exc4) self.assertIs(exc.__context__, exc3) self.assertIs(exc.__context__.__context__, exc2) self.assertIs(exc.__context__.__context__.__context__, exc1) self.assertIsNone( exc.__context__.__context__.__context__.__context__) def test_exit_exception_with_existing_context(self): # Addresses a lack of test coverage discovered after checking in a # fix for issue 20317 that still contained debugging code. def raise_nested(inner_exc, outer_exc): try: raise inner_exc finally: raise outer_exc exc1 = Exception(1) exc2 = Exception(2) exc3 = Exception(3) exc4 = Exception(4) exc5 = Exception(5) try: with ExitStack() as stack: stack.callback(raise_nested, exc4, exc5) stack.callback(raise_nested, exc2, exc3) raise exc1 except Exception as exc: self.assertIs(exc, exc5) self.assertIs(exc.__context__, exc4) self.assertIs(exc.__context__.__context__, exc3) self.assertIs(exc.__context__.__context__.__context__, exc2) self.assertIs( exc.__context__.__context__.__context__.__context__, exc1) self.assertIsNone( exc.__context__.__context__.__context__.__context__.__context__) def test_body_exception_suppress(self): def suppress_exc(*exc_details): return True try: with ExitStack() as stack: stack.push(suppress_exc) 1/0 except IndexError as exc: self.fail("Expected no exception, got IndexError") def test_exit_exception_chaining_suppress(self): with ExitStack() as stack: stack.push(lambda *exc: True) stack.push(lambda *exc: 1/0) stack.push(lambda *exc: {}[1]) def test_excessive_nesting(self): # The original implementation would die with RecursionError here with ExitStack() as stack: for i in range(10000): stack.callback(int) def test_instance_bypass(self): class Example(object): pass cm = Example() cm.__exit__ = object() stack = ExitStack() self.assertRaises(AttributeError, stack.enter_context, cm) stack.push(cm) self.assertIs(stack._exit_callbacks[-1], cm) # This is needed to make the test actually run under regrtest.py! def test_main(): support.run_unittest(__name__) if __name__ == "__main__": test_main()