| Left: | ||
| Right: |
| OLD | NEW |
|---|---|
| 1 import gc | 1 import gc |
| 2 import sys | 2 import sys |
| 3 import unittest | 3 import unittest |
| 4 import collections | 4 import collections |
| 5 import weakref | 5 import weakref |
| 6 import operator | 6 import operator |
| 7 import contextlib | 7 import contextlib |
| 8 import copy | 8 import copy |
| 9 import subprocess | |
| 9 | 10 |
| 10 from test import support | 11 from test import support |
| 11 | 12 |
| 12 # Used in ReferencesTestCase.test_ref_created_during_del() . | 13 # Used in ReferencesTestCase.test_ref_created_during_del() . |
| 13 ref_from_del = None | 14 ref_from_del = None |
| 14 | 15 |
| 16 # Used by FinalizeTestCase as a global that may be replaced by None | |
| 17 # when the interpreter shuts down. | |
| 18 _global_var = 'foobar' | |
| 19 | |
| 15 class C: | 20 class C: |
| 16 def method(self): | 21 def method(self): |
| 17 pass | 22 pass |
| 18 | 23 |
| 19 | 24 |
| 20 class Callable: | 25 class Callable: |
| 21 bar = None | 26 bar = None |
| 22 | 27 |
| 23 def __call__(self, x): | 28 def __call__(self, x): |
| 24 self.bar = x | 29 self.bar = x |
| (...skipping 1271 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1296 type2test = weakref.WeakValueDictionary | 1301 type2test = weakref.WeakValueDictionary |
| 1297 def _reference(self): | 1302 def _reference(self): |
| 1298 return self.__ref.copy() | 1303 return self.__ref.copy() |
| 1299 | 1304 |
| 1300 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): | 1305 class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): |
| 1301 """Check that WeakKeyDictionary conforms to the mapping protocol""" | 1306 """Check that WeakKeyDictionary conforms to the mapping protocol""" |
| 1302 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} | 1307 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} |
| 1303 type2test = weakref.WeakKeyDictionary | 1308 type2test = weakref.WeakKeyDictionary |
| 1304 def _reference(self): | 1309 def _reference(self): |
| 1305 return self.__ref.copy() | 1310 return self.__ref.copy() |
| 1311 | |
| 1312 | |
| 1313 class FinalizeTestCase(unittest.TestCase): | |
| 1314 | |
| 1315 class A: | |
| 1316 pass | |
| 1317 | |
| 1318 def _collect_if_necessary(self): | |
| 1319 # we create no ref-cycles so in CPython no gc should be needed | |
| 1320 if sys.implementation.name != 'cpython': | |
| 1321 support.gc_collect() | |
| 1322 | |
| 1323 def test_finalize(self): | |
| 1324 def add(x,y,z): | |
| 1325 res.append(x + y + z) | |
| 1326 return x + y + z | |
| 1327 | |
| 1328 a = self.A() | |
| 1329 | |
| 1330 res = [] | |
| 1331 f = weakref.finalize(a, add, 67, 43, z=89) | |
| 1332 wr, func, args, kwds = f.get() | |
| 1333 self.assertEqual([wr(), func, args, kwds], [a, add, (67,43), {'z':89}]) | |
| 1334 self.assertEqual(f(), 199) # deregisters callback | |
| 1335 self.assertEqual(f(), None) | |
| 1336 self.assertEqual(f(), None) | |
| 1337 self.assertEqual(f.get(), None) | |
| 1338 self.assertEqual(f.pop(), None) | |
| 1339 self.assertEqual(res, [199]) | |
| 1340 | |
| 1341 res = [] | |
| 1342 f = weakref.finalize(a, add, 67, 43, z=89) | |
| 1343 wr, func, args, kwds = f.pop() # deregisters callback | |
| 1344 self.assertEqual([wr(), func, args, kwds], [a, add, (67,43), {'z':89}]) | |
| 1345 self.assertEqual(f(), None) | |
| 1346 self.assertEqual(f(), None) | |
| 1347 self.assertEqual(f.get(), None) | |
| 1348 self.assertEqual(f.pop(), None) | |
| 1349 self.assertEqual(res, []) | |
| 1350 | |
| 1351 res = [] | |
| 1352 f = weakref.finalize(a, add, 67, 43, z=89) | |
| 1353 del a # deregisters callback | |
| 1354 self._collect_if_necessary() | |
| 1355 self.assertEqual(f(), None) | |
| 1356 self.assertEqual(f(), None) | |
| 1357 self.assertEqual(f.get(), None) | |
| 1358 self.assertEqual(f.pop(), None) | |
| 1359 self.assertEqual(res, [199]) | |
| 1360 | |
| 1361 def test_order(self): | |
| 1362 a = self.A() | |
| 1363 res = [] | |
| 1364 | |
| 1365 f1 = weakref.finalize(a, res.append, 'f1') | |
| 1366 f2 = weakref.finalize(a, res.append, 'f2') | |
| 1367 f3 = weakref.finalize(a, res.append, 'f3') | |
| 1368 f4 = weakref.finalize(a, res.append, 'f4') | |
| 1369 f5 = weakref.finalize(a, res.append, 'f5') | |
| 1370 | |
| 1371 # make sure finalizers can keep themselves alive | |
| 1372 del f1, f4 | |
| 1373 | |
| 1374 self.assertTrue(f2.get()) | |
| 1375 self.assertTrue(f3.get()) | |
| 1376 self.assertTrue(f5.get()) | |
| 1377 | |
| 1378 self.assertTrue(f5.pop()) | |
| 1379 self.assertFalse(f5.get()) | |
| 1380 | |
| 1381 f5() # nothing because previously unregistered | |
| 1382 res.append('A') | |
| 1383 f3() # => res.append('f3') | |
| 1384 self.assertFalse(f3.get()) | |
| 1385 res.append('B') | |
| 1386 f3() # nothing because previously called | |
| 1387 res.append('C') | |
| 1388 del a | |
| 1389 self._collect_if_necessary() | |
| 1390 # => res.append('f4') | |
| 1391 # => res.append('f2') | |
| 1392 # => res.append('f1') | |
| 1393 self.assertFalse(f2.get()) | |
| 1394 res.append('D') | |
| 1395 f2() # nothing because previously called by gc | |
| 1396 | |
| 1397 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] | |
| 1398 self.assertEqual(res, expected) | |
| 1399 | |
| 1400 def test_all_freed(self): | |
| 1401 # we want a weakrefable subclass of weakref.finalize | |
| 1402 class myfinalize(weakref.finalize): | |
| 1403 pass | |
| 1404 | |
| 1405 a = self.A() | |
| 1406 res = [] | |
| 1407 def callback(): | |
| 1408 res.append(123) | |
| 1409 f = myfinalize(a, callback) | |
| 1410 | |
| 1411 wr_callback = weakref.ref(callback) | |
| 1412 wr_f = weakref.ref(f) | |
| 1413 del callback, f | |
| 1414 | |
| 1415 self.assertIsNotNone(wr_callback()) | |
| 1416 self.assertIsNotNone(wr_f()) | |
| 1417 | |
| 1418 del a | |
| 1419 self._collect_if_necessary() | |
| 1420 | |
| 1421 self.assertIsNone(wr_callback()) | |
| 1422 self.assertIsNone(wr_f()) | |
| 1423 self.assertEqual(res, [123]) | |
| 1424 | |
| 1425 def test_traceback(self): | |
| 1426 def f(): | |
| 1427 g() | |
| 1428 | |
| 1429 def g(): | |
| 1430 1/0 | |
| 1431 | |
| 1432 with support.captured_stderr() as stderr: | |
| 1433 a = self.A() | |
| 1434 weakref.finalize(a, f) | |
| 1435 del a | |
| 1436 self._collect_if_necessary() | |
| 1437 | |
| 1438 def process(tb): | |
| 1439 lines = [line.strip() for line in tb.split('\n')] | |
| 1440 lines[1] = '...' + lines[1][-6:] | |
| 1441 lines[3] = '...' + lines[3][-6:] | |
| 1442 return lines | |
| 1443 | |
| 1444 expected = '''\ | |
| 1445 Ignored exception in finalizer - Traceback (most recent call last): | |
| 1446 File ..., line ..., in f | |
| 1447 g() | |
| 1448 File ..., line ..., in g | |
| 1449 1/0 | |
| 1450 ZeroDivisionError: division by zero | |
| 1451 ''' | |
| 1452 self.assertEqual(process(stderr.getvalue()), process(expected)) | |
| 1453 | |
| 1454 @classmethod | |
| 1455 def run_in_child(cls): | |
| 1456 # cls should stay alive till atexit callbacks run | |
| 1457 f1 = weakref.finalize(cls, print, 'f1', _global_var) | |
| 1458 f2 = weakref.finalize(cls, print, 'f2', _global_var) | |
| 1459 f3 = weakref.finalize(cls, print, 'f3', _global_var) | |
| 1460 f1.atexit = True | |
| 1461 assert f2.atexit == False | |
| 1462 f3.atexit = True | |
| 1463 | |
| 1464 def test_atexit(self): | |
| 1465 prog = ('from test.test_weakref import FinalizeTestCase;'+ | |
| 1466 'FinalizeTestCase.run_in_child()') | |
| 1467 output = subprocess.check_output([sys.executable, '-c', prog]) | |
|
AntoinePitrou
2012/08/02 23:19:01
You should use the helpers from test.script_helper
| |
| 1468 output = output.decode('ascii') | |
| 1469 output = output.strip().split('\n') | |
| 1470 self.assertEqual(output, ['f3 foobar', 'f1 foobar']) | |
| 1471 | |
| 1306 | 1472 |
| 1307 libreftest = """ Doctest for examples in the library reference: weakref.rst | 1473 libreftest = """ Doctest for examples in the library reference: weakref.rst |
| 1308 | 1474 |
| 1309 >>> import weakref | 1475 >>> import weakref |
| 1310 >>> class Dict(dict): | 1476 >>> class Dict(dict): |
| 1311 ... pass | 1477 ... pass |
| 1312 ... | 1478 ... |
| 1313 >>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable | 1479 >>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable |
| 1314 >>> r = weakref.ref(obj) | 1480 >>> r = weakref.ref(obj) |
| 1315 >>> print(r() is obj) | 1481 >>> print(r() is obj) |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1377 >>> id2obj(a_id) is a | 1543 >>> id2obj(a_id) is a |
| 1378 True | 1544 True |
| 1379 >>> del a | 1545 >>> del a |
| 1380 >>> try: | 1546 >>> try: |
| 1381 ... id2obj(a_id) | 1547 ... id2obj(a_id) |
| 1382 ... except KeyError: | 1548 ... except KeyError: |
| 1383 ... print('OK') | 1549 ... print('OK') |
| 1384 ... else: | 1550 ... else: |
| 1385 ... print('WeakValueDictionary error') | 1551 ... print('WeakValueDictionary error') |
| 1386 OK | 1552 OK |
| 1553 | |
| 1554 | |
| 1555 >>> import weakref | |
|
AntoinePitrou
2012/08/02 23:19:01
My personal opinion about doctests is that we shou
sbt
2012/08/03 01:02:14
I did wonder whether it would be better to make th
| |
| 1556 >>> class Object: pass | |
| 1557 ... | |
| 1558 >>> kenny = Object() | |
| 1559 >>> weakref.finalize(kenny, print, "you killed kenny!") #doctest:+ELLIPSIS | |
| 1560 <weakref.finalize object at ...> | |
| 1561 >>> del kenny | |
| 1562 you killed kenny! | |
| 1563 | |
| 1564 | |
| 1565 >>> def callback(x, y, z): | |
| 1566 ... print("CALLBACK") | |
| 1567 ... return x + y + z | |
| 1568 ... | |
| 1569 >>> obj = Object() | |
| 1570 >>> f = weakref.finalize(obj, callback, 1, 2, z=3) | |
| 1571 >>> f.get() #doctest:+ELLIPSIS | |
| 1572 (<weakref ... to 'Object' ...>, <function callback ...>, (1, 2), {'z': 3}) | |
| 1573 >>> wr, func, args, kwds = _ | |
| 1574 >>> assert wr() is obj | |
| 1575 >>> assert func is callback | |
| 1576 | |
| 1577 | |
| 1578 >>> f() | |
| 1579 CALLBACK | |
| 1580 6 | |
| 1581 >>> f(), f.get(), f.pop() | |
| 1582 (None, None, None) | |
| 1583 >>> del obj | |
| 1584 | |
| 1585 | |
| 1586 >>> obj = Object() | |
| 1587 >>> f = weakref.finalize(obj, callback, 1, 2, z=3) | |
| 1588 >>> wr, func, args, kwds = f.pop() | |
| 1589 >>> f(), f.get(), f.pop() | |
| 1590 (None, None, None) | |
| 1591 >>> del obj | |
| 1592 >>> func(*args, **kwds) # explicitly call the popped callback | |
| 1593 CALLBACK | |
| 1594 6 | |
| 1595 | |
| 1596 | |
| 1597 >>> kenny = Object() | |
| 1598 >>> f = weakref.finalize(kenny, print, "kenny must die!") | |
| 1599 >>> f.atexit = True | |
| 1600 >>> exit() #doctest:+SKIP | |
| 1601 kenny must die! | |
| 1387 | 1602 |
| 1388 """ | 1603 """ |
| 1389 | 1604 |
| 1390 __test__ = {'libreftest' : libreftest} | 1605 __test__ = {'libreftest' : libreftest} |
| 1391 | 1606 |
| 1392 def test_main(): | 1607 def test_main(): |
| 1393 support.run_unittest( | 1608 support.run_unittest( |
| 1394 ReferencesTestCase, | 1609 ReferencesTestCase, |
| 1395 MappingTestCase, | 1610 MappingTestCase, |
| 1396 WeakValueDictionaryTestCase, | 1611 WeakValueDictionaryTestCase, |
| 1397 WeakKeyDictionaryTestCase, | 1612 WeakKeyDictionaryTestCase, |
| 1398 SubclassableWeakrefTestCase, | 1613 SubclassableWeakrefTestCase, |
| 1614 FinalizeTestCase, | |
| 1399 ) | 1615 ) |
| 1400 support.run_doctest(sys.modules[__name__]) | 1616 support.run_doctest(sys.modules[__name__]) |
| 1401 | 1617 |
| 1402 | 1618 |
| 1403 if __name__ == "__main__": | 1619 if __name__ == "__main__": |
| 1404 test_main() | 1620 test_main() |
| OLD | NEW |