From 5a48b68245df0b3326c7b980026c767f40bc2fae Mon Sep 17 00:00:00 2001 From: Jason Madden Date: Tue, 24 Oct 2023 11:54:01 -0500 Subject: [PATCH] Refactor layers; add postgres support. --- README.rst | 5 +- setup.py | 3 + src/nti/testing/__init__.py | 2 + src/nti/testing/layers.py | 244 ------- src/nti/testing/layers/__init__.py | 234 +----- src/nti/testing/layers/cleanup.py | 154 +--- src/nti/testing/layers/configuration.py | 244 ------- src/nti/testing/layers/postgres.py | 673 ++++++++++++++++++ src/nti/testing/layers/tests/__init__.py | 0 src/nti/testing/layers/tests/test_postgres.py | 17 + src/nti/testing/layers/zope.py | 134 +--- src/nti/testing/matchers.py | 39 +- 12 files changed, 753 insertions(+), 996 deletions(-) delete mode 100644 src/nti/testing/layers.py delete mode 100644 src/nti/testing/layers/configuration.py create mode 100644 src/nti/testing/layers/postgres.py create mode 100644 src/nti/testing/layers/tests/__init__.py create mode 100644 src/nti/testing/layers/tests/test_postgres.py diff --git a/README.rst b/README.rst index 96755cb..ef94cd4 100644 --- a/README.rst +++ b/README.rst @@ -32,8 +32,9 @@ Installation nti.testing can be installed using pip, either from the git repository or from PyPI:: - pip install nti.testing + pip install nti.testing[testgres] +Use the ``testgres`` extra to be able to use `nti.testing.layers.postgres`. PyHamcrest ========== @@ -179,7 +180,7 @@ Test Fixtures Support for test fixtures can be found in `nti.testing.base` and `nti.testing.layers`. The ``base`` package includes fully-fleshed -out base classes for direct use, while the ``layers`` package includes +out base classes for direct use, while the ``layers`` package mostly includes mixins that can be used to construct your own test layers. The ``base`` package makes a distinction between "normal" and "shared" diff --git a/setup.py b/setup.py index 6038d36..7d6cf58 100755 --- a/setup.py +++ b/setup.py @@ -13,6 +13,7 @@ 'Acquisition', 'zope.site', 'zope.testrunner', + 'testgres', ] def _read(fname): @@ -75,6 +76,8 @@ def _read(fname): 'Sphinx', 'sphinx_rtd_theme', ], + 'testgres': [ + ], }, python_requires=">=3.8", ) diff --git a/src/nti/testing/__init__.py b/src/nti/testing/__init__.py index 0cb0451..57733f0 100644 --- a/src/nti/testing/__init__.py +++ b/src/nti/testing/__init__.py @@ -52,6 +52,7 @@ from .matchers import implements from .matchers import verifiably_provides from .matchers import validly_provides +from .matchers import provides from .matchers import validated_by from .matchers import not_validated_by from .matchers import aq_inContextOf @@ -95,6 +96,7 @@ def transactionCleanUp(): 'implements', 'verifiably_provides', 'validly_provides', + 'provides', 'validated_by', 'not_validated_by', 'aq_inContextOf', diff --git a/src/nti/testing/layers.py b/src/nti/testing/layers.py deleted file mode 100644 index 6ea0cfe..0000000 --- a/src/nti/testing/layers.py +++ /dev/null @@ -1,244 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Test layer support. - -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -# stdlib imports -import gc -import sys -import unittest - - -from zope import component -from zope.component import eventtesting -from zope.component.hooks import setHooks -import zope.testing.cleanup - -from . import transactionCleanUp -from .base import AbstractConfiguringObject -from .base import sharedCleanup - -from hamcrest import assert_that -from hamcrest import is_ - -__docformat__ = "restructuredtext en" - -logger = __import__('logging').getLogger(__name__) - -class GCLayerMixin(object): - """ - Mixin this layer class and call :meth:`setUpGC` from - your layer `setUp` method and likewise for the teardowns - when you want to do GC. - """ - - @classmethod - def setUp(cls): - pass - - @classmethod - def tearDown(cls): - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - @classmethod - def setUpGC(cls): - """ - This method disables GC until :meth:`tearDownGC` is called. - You should call it from your layer ``setUp`` method. - - It also cleans up the zope.testing state. - """ - zope.testing.cleanup.cleanUp() - cls.__isenabled = gc.isenabled() - gc.disable() - - @classmethod - def tearDownGC(cls): - """ - This method executes zope.testing's cleanup and then renables - GC. You should call if from your layer ``tearDown`` method. - """ - zope.testing.cleanup.cleanUp() - - if cls.__isenabled: - gc.enable() - - gc.collect(0) # collect one generation now to clean up weak refs - assert_that(gc.garbage, is_([])) - -class SharedCleanupLayer(object): - """ - Mixin this layer when you need cleanup functions - that run for every test. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def testSetUp(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() - - @classmethod - def testTearDown(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() - - - -class ZopeComponentLayer(SharedCleanupLayer): - """ - Test layer that can be subclassed when zope.component will be used. - - This does nothing but set up the hooks and the event handlers. - """ - - @classmethod - def setUp(cls): - setHooks() # zope.component.hooks registers a zope.testing.cleanup to reset these - - - @classmethod - def tearDown(cls): - # always safe to clear events - eventtesting.clearEvents() # redundant with zope.testing.cleanup - # we never actually want to do this, it's not needed and can mess up other fixtures - # resetHooks() - - @classmethod - def testSetUp(cls): - setHooks() # ensure these are still here; cheap and easy - - @classmethod - def testTearDown(cls): - # Some tear down needs to happen always - eventtesting.clearEvents() - transactionCleanUp() - -_marker = object() - -class ConfiguringLayerMixin(AbstractConfiguringObject): - """ - Inherit from this layer *at the leaf level* to perform configuration. - You should have already inherited from :class:`ZopeComponentLayer`. - - To use this layer, subclass it and define a set of packages. This - should be done EXACTLY ONCE for each set of packages; things that - add to the set of packages should generally extend that layer - class. You must call :meth:`setUpPackages` and :meth:`tearDownPackages` - from your ``setUp`` and ``tearDown`` methods. - - See :class:`~.AbstractConfiguringObject` for documentation on - the class attributes to configure. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - #: .. seealso:: :meth:`~.AbstractConfiguringObject.get_configuration_package_for_class` - #: .. versionadded:: 2.1.0 - get_configuration_package = classmethod( - AbstractConfiguringObject.get_configuration_package_for_class - ) - - @classmethod - def setUpPackages(cls): - """ - Set up the configured packages. - """ - logger.info('Setting up packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - cls.configuration_context = cls.configure_packages( - set_up_packages=cls.set_up_packages, - features=cls.features, - context=cls.configuration_context, - package=cls.get_configuration_package()) - component.provideHandler(eventtesting.events.append, (None,)) - gc.collect() - - configure_packages = classmethod(AbstractConfiguringObject._do_configure_packages) - - @classmethod - def tearDownPackages(cls): - """ - Tear down all configured packages in the global site manager. - """ - # This is a duplicate of zope.component.globalregistry - logger.info('Tearing down packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - component.getGlobalSiteManager().__init__('base') - gc.collect() - cls.configuration_context = None - - -def find_test(): - """ - The layer support in :class:`nose2.plugins.layers.Layers` - optionally supplies the test case object to ``testSetUp`` - and ``testTearDown``, but ``zope.testrunner`` does not do - this. If you need access to the test, you can use an idiom like this:: - - @classmethod - def testSetUp(cls, test=None): - test = test or find_test() - """ - - i = 2 - while True: - try: - frame = sys._getframe(i) # pylint:disable=protected-access - i = i + 1 - except ValueError: # pragma: no cover - return None - else: - if isinstance(frame.f_locals.get('self'), unittest.TestCase): - return frame.f_locals['self'] - if isinstance(frame.f_locals.get('test'), unittest.TestCase): - return frame.f_locals['test'] diff --git a/src/nti/testing/layers/__init__.py b/src/nti/testing/layers/__init__.py index 6ea0cfe..cf6d011 100644 --- a/src/nti/testing/layers/__init__.py +++ b/src/nti/testing/layers/__init__.py @@ -3,219 +3,21 @@ """ Test layer support. -""" +.. versionchanged:: NEXT + + This is now a package with sub-modules. Existing imports continue + to work. -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function +""" -# stdlib imports -import gc import sys import unittest -from zope import component -from zope.component import eventtesting -from zope.component.hooks import setHooks -import zope.testing.cleanup - -from . import transactionCleanUp -from .base import AbstractConfiguringObject -from .base import sharedCleanup - -from hamcrest import assert_that -from hamcrest import is_ - -__docformat__ = "restructuredtext en" - -logger = __import__('logging').getLogger(__name__) - -class GCLayerMixin(object): - """ - Mixin this layer class and call :meth:`setUpGC` from - your layer `setUp` method and likewise for the teardowns - when you want to do GC. - """ - - @classmethod - def setUp(cls): - pass - - @classmethod - def tearDown(cls): - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - @classmethod - def setUpGC(cls): - """ - This method disables GC until :meth:`tearDownGC` is called. - You should call it from your layer ``setUp`` method. - - It also cleans up the zope.testing state. - """ - zope.testing.cleanup.cleanUp() - cls.__isenabled = gc.isenabled() - gc.disable() - - @classmethod - def tearDownGC(cls): - """ - This method executes zope.testing's cleanup and then renables - GC. You should call if from your layer ``tearDown`` method. - """ - zope.testing.cleanup.cleanUp() - - if cls.__isenabled: - gc.enable() - - gc.collect(0) # collect one generation now to clean up weak refs - assert_that(gc.garbage, is_([])) - -class SharedCleanupLayer(object): - """ - Mixin this layer when you need cleanup functions - that run for every test. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def testSetUp(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() - - @classmethod - def testTearDown(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() - - - -class ZopeComponentLayer(SharedCleanupLayer): - """ - Test layer that can be subclassed when zope.component will be used. - - This does nothing but set up the hooks and the event handlers. - """ - - @classmethod - def setUp(cls): - setHooks() # zope.component.hooks registers a zope.testing.cleanup to reset these - - - @classmethod - def tearDown(cls): - # always safe to clear events - eventtesting.clearEvents() # redundant with zope.testing.cleanup - # we never actually want to do this, it's not needed and can mess up other fixtures - # resetHooks() - - @classmethod - def testSetUp(cls): - setHooks() # ensure these are still here; cheap and easy - - @classmethod - def testTearDown(cls): - # Some tear down needs to happen always - eventtesting.clearEvents() - transactionCleanUp() - -_marker = object() - -class ConfiguringLayerMixin(AbstractConfiguringObject): - """ - Inherit from this layer *at the leaf level* to perform configuration. - You should have already inherited from :class:`ZopeComponentLayer`. - - To use this layer, subclass it and define a set of packages. This - should be done EXACTLY ONCE for each set of packages; things that - add to the set of packages should generally extend that layer - class. You must call :meth:`setUpPackages` and :meth:`tearDownPackages` - from your ``setUp`` and ``tearDown`` methods. - - See :class:`~.AbstractConfiguringObject` for documentation on - the class attributes to configure. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - #: .. seealso:: :meth:`~.AbstractConfiguringObject.get_configuration_package_for_class` - #: .. versionadded:: 2.1.0 - get_configuration_package = classmethod( - AbstractConfiguringObject.get_configuration_package_for_class - ) - - @classmethod - def setUpPackages(cls): - """ - Set up the configured packages. - """ - logger.info('Setting up packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - cls.configuration_context = cls.configure_packages( - set_up_packages=cls.set_up_packages, - features=cls.features, - context=cls.configuration_context, - package=cls.get_configuration_package()) - component.provideHandler(eventtesting.events.append, (None,)) - gc.collect() - - configure_packages = classmethod(AbstractConfiguringObject._do_configure_packages) - - @classmethod - def tearDownPackages(cls): - """ - Tear down all configured packages in the global site manager. - """ - # This is a duplicate of zope.component.globalregistry - logger.info('Tearing down packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - component.getGlobalSiteManager().__init__('base') - gc.collect() - cls.configuration_context = None +from .cleanup import GCLayerMixin +from .cleanup import SharedCleanupLayer +from .zope import ZopeComponentLayer +from .zope import ConfiguringLayerMixin def find_test(): @@ -237,8 +39,16 @@ def testSetUp(cls, test=None): i = i + 1 except ValueError: # pragma: no cover return None - else: - if isinstance(frame.f_locals.get('self'), unittest.TestCase): - return frame.f_locals['self'] - if isinstance(frame.f_locals.get('test'), unittest.TestCase): - return frame.f_locals['test'] + + if isinstance(frame.f_locals.get('self'), unittest.TestCase): + return frame.f_locals['self'] + if isinstance(frame.f_locals.get('test'), unittest.TestCase): + return frame.f_locals['test'] + +__all__ = [ + 'GCLayerMixin', + 'SharedCleanupLayer', + 'ZopeComponentLayer', + 'ConfiguringLayerMixin', + 'find_test', +] diff --git a/src/nti/testing/layers/cleanup.py b/src/nti/testing/layers/cleanup.py index 6ea0cfe..a912acd 100644 --- a/src/nti/testing/layers/cleanup.py +++ b/src/nti/testing/layers/cleanup.py @@ -1,33 +1,24 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- """ -Test layer support. +Support for cleaning up: -""" +- Garbage collection +- Registered cleanups. -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function +.. versionadded:: NEXT -# stdlib imports -import gc -import sys -import unittest +""" +import gc -from zope import component -from zope.component import eventtesting -from zope.component.hooks import setHooks import zope.testing.cleanup -from . import transactionCleanUp -from .base import AbstractConfiguringObject -from .base import sharedCleanup +from ..base import sharedCleanup from hamcrest import assert_that from hamcrest import is_ -__docformat__ = "restructuredtext en" logger = __import__('logging').getLogger(__name__) @@ -81,6 +72,7 @@ def tearDownGC(cls): gc.collect(0) # collect one generation now to clean up weak refs assert_that(gc.garbage, is_([])) + class SharedCleanupLayer(object): """ Mixin this layer when you need cleanup functions @@ -112,133 +104,3 @@ def testTearDown(cls): Calls :func:`~.sharedCleanup` for every test. """ sharedCleanup() - - - -class ZopeComponentLayer(SharedCleanupLayer): - """ - Test layer that can be subclassed when zope.component will be used. - - This does nothing but set up the hooks and the event handlers. - """ - - @classmethod - def setUp(cls): - setHooks() # zope.component.hooks registers a zope.testing.cleanup to reset these - - - @classmethod - def tearDown(cls): - # always safe to clear events - eventtesting.clearEvents() # redundant with zope.testing.cleanup - # we never actually want to do this, it's not needed and can mess up other fixtures - # resetHooks() - - @classmethod - def testSetUp(cls): - setHooks() # ensure these are still here; cheap and easy - - @classmethod - def testTearDown(cls): - # Some tear down needs to happen always - eventtesting.clearEvents() - transactionCleanUp() - -_marker = object() - -class ConfiguringLayerMixin(AbstractConfiguringObject): - """ - Inherit from this layer *at the leaf level* to perform configuration. - You should have already inherited from :class:`ZopeComponentLayer`. - - To use this layer, subclass it and define a set of packages. This - should be done EXACTLY ONCE for each set of packages; things that - add to the set of packages should generally extend that layer - class. You must call :meth:`setUpPackages` and :meth:`tearDownPackages` - from your ``setUp`` and ``tearDown`` methods. - - See :class:`~.AbstractConfiguringObject` for documentation on - the class attributes to configure. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - #: .. seealso:: :meth:`~.AbstractConfiguringObject.get_configuration_package_for_class` - #: .. versionadded:: 2.1.0 - get_configuration_package = classmethod( - AbstractConfiguringObject.get_configuration_package_for_class - ) - - @classmethod - def setUpPackages(cls): - """ - Set up the configured packages. - """ - logger.info('Setting up packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - cls.configuration_context = cls.configure_packages( - set_up_packages=cls.set_up_packages, - features=cls.features, - context=cls.configuration_context, - package=cls.get_configuration_package()) - component.provideHandler(eventtesting.events.append, (None,)) - gc.collect() - - configure_packages = classmethod(AbstractConfiguringObject._do_configure_packages) - - @classmethod - def tearDownPackages(cls): - """ - Tear down all configured packages in the global site manager. - """ - # This is a duplicate of zope.component.globalregistry - logger.info('Tearing down packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - component.getGlobalSiteManager().__init__('base') - gc.collect() - cls.configuration_context = None - - -def find_test(): - """ - The layer support in :class:`nose2.plugins.layers.Layers` - optionally supplies the test case object to ``testSetUp`` - and ``testTearDown``, but ``zope.testrunner`` does not do - this. If you need access to the test, you can use an idiom like this:: - - @classmethod - def testSetUp(cls, test=None): - test = test or find_test() - """ - - i = 2 - while True: - try: - frame = sys._getframe(i) # pylint:disable=protected-access - i = i + 1 - except ValueError: # pragma: no cover - return None - else: - if isinstance(frame.f_locals.get('self'), unittest.TestCase): - return frame.f_locals['self'] - if isinstance(frame.f_locals.get('test'), unittest.TestCase): - return frame.f_locals['test'] diff --git a/src/nti/testing/layers/configuration.py b/src/nti/testing/layers/configuration.py deleted file mode 100644 index 6ea0cfe..0000000 --- a/src/nti/testing/layers/configuration.py +++ /dev/null @@ -1,244 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Test layer support. - -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -# stdlib imports -import gc -import sys -import unittest - - -from zope import component -from zope.component import eventtesting -from zope.component.hooks import setHooks -import zope.testing.cleanup - -from . import transactionCleanUp -from .base import AbstractConfiguringObject -from .base import sharedCleanup - -from hamcrest import assert_that -from hamcrest import is_ - -__docformat__ = "restructuredtext en" - -logger = __import__('logging').getLogger(__name__) - -class GCLayerMixin(object): - """ - Mixin this layer class and call :meth:`setUpGC` from - your layer `setUp` method and likewise for the teardowns - when you want to do GC. - """ - - @classmethod - def setUp(cls): - pass - - @classmethod - def tearDown(cls): - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - @classmethod - def setUpGC(cls): - """ - This method disables GC until :meth:`tearDownGC` is called. - You should call it from your layer ``setUp`` method. - - It also cleans up the zope.testing state. - """ - zope.testing.cleanup.cleanUp() - cls.__isenabled = gc.isenabled() - gc.disable() - - @classmethod - def tearDownGC(cls): - """ - This method executes zope.testing's cleanup and then renables - GC. You should call if from your layer ``tearDown`` method. - """ - zope.testing.cleanup.cleanUp() - - if cls.__isenabled: - gc.enable() - - gc.collect(0) # collect one generation now to clean up weak refs - assert_that(gc.garbage, is_([])) - -class SharedCleanupLayer(object): - """ - Mixin this layer when you need cleanup functions - that run for every test. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def testSetUp(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() - - @classmethod - def testTearDown(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() - - - -class ZopeComponentLayer(SharedCleanupLayer): - """ - Test layer that can be subclassed when zope.component will be used. - - This does nothing but set up the hooks and the event handlers. - """ - - @classmethod - def setUp(cls): - setHooks() # zope.component.hooks registers a zope.testing.cleanup to reset these - - - @classmethod - def tearDown(cls): - # always safe to clear events - eventtesting.clearEvents() # redundant with zope.testing.cleanup - # we never actually want to do this, it's not needed and can mess up other fixtures - # resetHooks() - - @classmethod - def testSetUp(cls): - setHooks() # ensure these are still here; cheap and easy - - @classmethod - def testTearDown(cls): - # Some tear down needs to happen always - eventtesting.clearEvents() - transactionCleanUp() - -_marker = object() - -class ConfiguringLayerMixin(AbstractConfiguringObject): - """ - Inherit from this layer *at the leaf level* to perform configuration. - You should have already inherited from :class:`ZopeComponentLayer`. - - To use this layer, subclass it and define a set of packages. This - should be done EXACTLY ONCE for each set of packages; things that - add to the set of packages should generally extend that layer - class. You must call :meth:`setUpPackages` and :meth:`tearDownPackages` - from your ``setUp`` and ``tearDown`` methods. - - See :class:`~.AbstractConfiguringObject` for documentation on - the class attributes to configure. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - #: .. seealso:: :meth:`~.AbstractConfiguringObject.get_configuration_package_for_class` - #: .. versionadded:: 2.1.0 - get_configuration_package = classmethod( - AbstractConfiguringObject.get_configuration_package_for_class - ) - - @classmethod - def setUpPackages(cls): - """ - Set up the configured packages. - """ - logger.info('Setting up packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - cls.configuration_context = cls.configure_packages( - set_up_packages=cls.set_up_packages, - features=cls.features, - context=cls.configuration_context, - package=cls.get_configuration_package()) - component.provideHandler(eventtesting.events.append, (None,)) - gc.collect() - - configure_packages = classmethod(AbstractConfiguringObject._do_configure_packages) - - @classmethod - def tearDownPackages(cls): - """ - Tear down all configured packages in the global site manager. - """ - # This is a duplicate of zope.component.globalregistry - logger.info('Tearing down packages %s for layer %s', cls.set_up_packages, cls) - gc.collect() - component.getGlobalSiteManager().__init__('base') - gc.collect() - cls.configuration_context = None - - -def find_test(): - """ - The layer support in :class:`nose2.plugins.layers.Layers` - optionally supplies the test case object to ``testSetUp`` - and ``testTearDown``, but ``zope.testrunner`` does not do - this. If you need access to the test, you can use an idiom like this:: - - @classmethod - def testSetUp(cls, test=None): - test = test or find_test() - """ - - i = 2 - while True: - try: - frame = sys._getframe(i) # pylint:disable=protected-access - i = i + 1 - except ValueError: # pragma: no cover - return None - else: - if isinstance(frame.f_locals.get('self'), unittest.TestCase): - return frame.f_locals['self'] - if isinstance(frame.f_locals.get('test'), unittest.TestCase): - return frame.f_locals['test'] diff --git a/src/nti/testing/layers/postgres.py b/src/nti/testing/layers/postgres.py new file mode 100644 index 0000000..717c83d --- /dev/null +++ b/src/nti/testing/layers/postgres.py @@ -0,0 +1,673 @@ +# -*- coding: utf-8 -*- +""" +Support for using :mod:`testgres` to create and use a Postgres +instance as a layer. + +There is also support for benchmarking and saving databeses for +later examination. + +.. versionadded:: NEXT + +The APIs are preliminary and may change. + +""" +from contextlib import contextmanager +import functools +import os +import sys +import unittest +from unittest.mock import patch + +import psycopg2 +import psycopg2.extras +import psycopg2.pool + +import testgres + + +if 'PG_CONFIG' not in os.environ: + # Set up for macports and fedora, using files that exist. + # Otherwise, don't set it, assume things are on the path. + for option in ( + '/opt/local/lib/postgresql11/bin/pg_config', + '/usr/pgsql-11/bin/pg_config', + ): + if os.path.isfile(option): + # TODO: Check exec bit + os.environ['PG_CONFIG'] = option + break + +# If True, save the database to a pg_dump +# file on teardown. The file name will be printed.'' +SAVE_DATABASE_ON_TEARDOWN = False +SAVE_DATABASE_FILENAME = None + +# If the path to a database dump file that exists, the database +# will be restored from this file on setUp. +LOAD_DATABASE_ON_SETUP = None + +if 'NTI_SAVE_DB' in os.environ: + # NTI_SAVE_DB is either 1/on/true (case-insensitive) + # or a file name. + val = os.environ['NTI_SAVE_DB'] + if val.lower() in ('0', 'off', 'false', 'no'): + SAVE_DATABASE_ON_TEARDOWN = False + else: + SAVE_DATABASE_ON_TEARDOWN = True + if val.lower() not in ('1', 'on', 'true', 'yes'): + SAVE_DATABASE_FILENAME = val + + +if 'NTI_LOAD_DB_FILE' in os.environ: + LOAD_DATABASE_ON_SETUP = os.environ['NTI_LOAD_DB_FILE'] + + +def patched_get_pg_version(): + from testgres.utils import get_pg_version + from testgres.node import PgVer + from packaging.version import InvalidVersion + + # Some installs of postgres return + # strings that the version parser doesn't like; + # notably the Python images get a debian build that says + # "15.3-0+deb12u1" which get_pg_version() chops down to + # "15.3-0+". If it can't be parsed, then return a fake. + + try: + version = get_pg_version() + PgVer(version) + except InvalidVersion: + print('testgres: Got invalid postgres version', version) + # The actual version string looks like "postgres (PostgreSQL) 15.4", + # and get_pg_version() processes that down to this + version = "15.4" + print('testgres: Substituting version', version) + + return version + +class DatabaseLayer(object): + """ + A test layer that creates the database, and sets each + test up in its own connection, aborting the transaction when + done. + """ + + #: The name of the database within the node. We only create + #: the default databases, so this should be 'postgres' + DATABASE_NAME = 'postgres' + + #: A `testgres.node.PostgresNode`, created for the layer. + #: A psycopg2 connection to it is located in the :attr:`connection` + #: attribute (similarly for :attr:`connection_pool`), while + #: a DSN connection string is in :attr:`postgres_dsn` + postgres_node = None + + #: A string you can use to connect to Postgres. + postgres_dsn = None + + #: A string you can pass to SQLAlchemy + postgres_uri = None + + #: Set for each test. + connection = None + + #: Set for each test. + cursor = None + + connection_pool = None + + + connection_pool_klass = psycopg2.pool.ThreadedConnectionPool + connection_pool_minconn = 1 + connection_pool_maxconn = 51 + + @classmethod + def setUp(cls): + testgres.configure_testgres() + + with patch('testgres.node.get_pg_version', new=patched_get_pg_version): + node = cls.postgres_node = testgres.get_new_node() + + # init takes about about 2 -- 3 seconds + node.init( + # Use the encoding as UTF-8. Set the locale as POSIX + # instead of inheriting it (in JAM's environment, the locale + # and thus collation and ctype is en_US.UTF8; this turns out to be + # up to 40% slower than POSIX). + # We could explicitly specify 'en-x-icu' on each column, if we required + # ICU support, but it cannot be used as a default collation. + initdb_params=[ + "-E", "UTF8", + '--locale', 'POSIX', + # Don't force to disk; this may save some minor init time. + '--no-sync', + ], + log_statement='none', + # Disable unix sockets. Some platforms might try to put this + # in a directory we can't write to + unix_sockets=False + ) + # Speed up bulk inserts + # These settings appeared to make no difference for the + # 2 million security insert or the 500K security mapping insert; + # likely because the final table sizes are < 300MB, so the default max size of + # 1GB is more than enough. + node.append_conf('fsync = off') + node.append_conf('full_page_writes = off') + node.append_conf('min_wal_size = 500MB') + node.append_conf('max_wal_size = 2GB') + # 'replica' is the default. If we use 'minimal' we could be + # a bit faster, but that's not exactly realistic. Plus, + # using 'minimal' disables the WAL backup functionality. + # If we set to 'minimal', we must also set 'max_wal_senders' to 0. + node.append_conf('wal_level = replica') + node.append_conf('wal_compression = on') + node.append_conf('wal_writer_delay = 10000ms') + node.append_conf('wal_writer_flush_after = 10MB') + + node.append_conf('temp_buffers = 500MB') + node.append_conf('work_mem = 500MB') + node.append_conf('maintenance_work_mem = 500MB') + node.append_conf('shared_buffers = 500MB') + + node.append_conf('max_connections = 100') + + # auto-explain for slow queries + if 'benchmark' in ' '.join(sys.argv): + print("Enabling BENCHMARK SETTINGS") + node.append_conf('shared_preload_libraries = auto_explain') + node.append_conf('auto_explain.log_min_duration = 40ms') + node.append_conf('auto_explain.log_nested_statements = on') + node.append_conf('auto_explain.log_analyze = on') + node.append_conf('auto_explain.log_timing = on') + node.append_conf('auto_explain.log_triggers = on') + + # PG 11 only, when --with-llvm was used to compile. + # It seems if it can't be used, it's ignored? It errors on 10 though, + # but we only support 11 + node.append_conf('jit = on') + + + node.start() + cls.connection_pool = cls.connection_pool_klass( + cls.connection_pool_minconn, + cls.connection_pool_maxconn, + dbname=cls.DATABASE_NAME, + host='localhost', + port=cls.postgres_node.port, + cursor_factory=psycopg2.extras.DictCursor + ) + + cls.postgres_dsn = "host=%s dbname=%s port=%s" % ( + node.host, cls.DATABASE_NAME, node.port + ) + cls.postgres_uri = "postgresql://%s:%s/%s" % ( + cls.postgres_node.host, + cls.postgres_node.port, + cls.DATABASE_NAME + ) + + with cls.borrowed_connection() as conn: + with conn.cursor() as cur: + i = cls.__get_db_info(cur) + print(f"({i['version']} {i['current_database']}/{i['current_schema']} " + f"{i['Encoding']}-{i['Collate']}) ", end="") + + @classmethod + def tearDown(cls): + cls.connection_pool.closeall() + cls.connection_pool = None + + cls.postgres_node.__exit__(None, None, None) + cls.postgres_node = None + + @classmethod + def testSetUp(cls): + # XXX: Errors here cause the tearDown method to not get called. + cls.connection = cls.connection_pool.getconn() + cls.cursor = cls.connection.cursor() + + @classmethod + def testTearDown(cls): + cls.connection.rollback() # Make sure we're able to execute + cls.cursor.execute('UNLISTEN *') + cls.cursor.close() + cls.cursor = None + cls.connection_pool.putconn(cls.connection) + cls.connection = None + + @classmethod + def __get_db_info(cls, cur): + query = """ + SELECT version() as version, + d.datname as "Name", + pg_catalog.pg_get_userbyid(d.datdba) as "Owner", + pg_catalog.pg_encoding_to_char(d.encoding) as "Encoding", + d.datcollate as "Collate", + d.datctype as "Ctype", + pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges", + current_database() as "current_database", + current_schema() as "current_schema" + FROM pg_catalog.pg_database d + WHERE d.datname = %s + """ + cur.execute(query, (cls.DATABASE_NAME,)) + row = cur.fetchone() + return dict(row) + + @classmethod + @contextmanager + def borrowed_connection(cls): + """ + Context manager that returns a connection from the connection + pool. + """ + conn = cls.connection_pool.getconn() + try: + yield conn + finally: + cls.connection_pool.putconn(conn) + + + @classmethod + def truncate_table(cls, conn, table_name): + "Transactionally truncate the given *table_name* using *conn*" + try: + + with conn.cursor() as cur: + cur.execute( + 'TRUNCATE TABLE ' + table_name + ' CASCADE' + ) + except (psycopg2.ProgrammingError, psycopg2.InternalError): + # Table doesn't exist, not a full schema, + # ignore. + # OR: + # Already aborted + import traceback + traceback.print_exc() + conn.rollback() + else: + # Is PostgreSQL, TRUNCATE is transactional! + # Awesome! + conn.commit() + + @classmethod + def drop_relation(cls, relation, kind='TABLE', idempotent=False): + "Drops the *relation* of type *kind* (default table), in new transaction." + with cls.borrowed_connection() as conn: + with conn.cursor() as cur: + if idempotent: + cur.execute(f"DROP {kind} IF EXISTS {relation}") + else: + cur.execute(f"DROP {kind} {relation}") + conn.commit() + + @classmethod + def vacuum(cls, *tables, **kwargs): + verbose = '' + if kwargs.pop('verbose', False): + verbose = ', VERBOSE' + + with cls.borrowed_connection() as conn: + conn.autocommit = True + # FULL rewrites all tables and takes forever. + # FREEZE is simpler and compacts tables + stmt = f'VACUUM (FREEZE, ANALYZE {verbose}) ' + tables = tables or ('',) + with conn.cursor() as cur: + # VACUUM cannot run inside a transaction block... + for t in tables: + cur.execute(stmt + t) + conn.autocommit = False + if verbose: + for n in conn.notices: + print(n) + del conn.notices[:] + if kwargs.pop('size_report', True): + cls.print_size_report() + + ONLY_PRINT_SIZE_OF_TABLES = None + + @classmethod + def print_size_report(cls): + extra_query = '' + if cls.ONLY_PRINT_SIZE_OF_TABLES: + t = cls.ONLY_PRINT_SIZE_OF_TABLES + extra_query = f"AND table_name = '{t}'" + query = f""" + SELECT table_name, + pg_size_pretty(total_bytes) AS total, + pg_size_pretty(index_bytes) AS INDEX, + pg_size_pretty(toast_bytes) AS toast, + pg_size_pretty(table_bytes) AS TABLE + FROM ( + SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM ( + SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME + , c.reltuples AS row_estimate + , pg_total_relation_size(c.oid) AS total_bytes + , pg_indexes_size(c.oid) AS index_bytes + , pg_total_relation_size(reltoastrelid) AS toast_bytes + FROM pg_class c + LEFT JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE relkind = 'r' + ) a + ) a + WHERE (table_name NOT LIKE 'pg_%' and table_name not like 'abstract_%' + {extra_query} + ) + AND table_schema <> 'pg_catalog' and table_schema <> 'information_schema' + ORDER BY total_bytes DESC + """ + + with cls.borrowed_connection() as conn: + with conn.cursor() as cur: + cur.execute(query) + + rows = [dict(row) for row in cur] + + keys = ['table_name', 'total', 'index', 'toast', 'table'] + + rows.insert(0, {k: k for k in keys}) + print() + fmt = "| {table_name:35s} | {total:10s} | {index:10s} | {toast:10s} | {table:10s}" + for row in rows: + if not extra_query and row['total'] in ( + '72 kB', '32 kB', '24 kB', '16 kB', '8192 bytes' + ): + continue + print(fmt.format( + **{k: v if v else '' for k, v in row.items()} + )) + + +class SchemaDatabaseLayer(DatabaseLayer): + """ + A test layer that adds our schema. + """ + + SCHEMA_FILE = os.path.abspath(os.path.join( + os.path.dirname(os.path.abspath(__file__)), + '..', '..', '..', '..', '..', + 'full_schema.sql' + )) + + @classmethod + def run_files(cls, *files): + for fname in files: + code, stdout, stderr = cls.postgres_node.psql( + filename=fname, + ON_ERROR_STOP=1 + ) + if code: + break + + if code: + import subprocess + stdout = stdout.decode("utf-8") + stderr = stderr.decode('utf-8') + print(stdout) + print(stderr) + raise subprocess.CalledProcessError( + code, + 'psql', + stdout, + stderr + ) + + @classmethod + def _tangle_schema_if_needed(cls): + # If the schema files do not exist, or db.org is newer + # than they are, run emacs to weave the files together. + # This requires a working emacs with org-mode available. + from pathlib import Path + import subprocess + + cwd = Path(".") + # Each org file tangles to at least one sql file. + org_to_sql = { + org: org.with_suffix('.sql') + for org in cwd.glob("*.org") + } + org_to_sql[Path("db.org")] = Path("full_schema.sql") + + for org, sql in org_to_sql.items(): + if not org.exists(): + continue + if sql.exists() and sql.stat().st_mtime >= org.stat().st_mtime: + continue + print(f"\nDatabase schema files outdated; tangling {org}") + ex = None + try: + output = subprocess.check_output([ + "emacs", + "--batch", + "--eval", + f'''(progn + (package-initialize) + (require 'org) + (org-babel-tangle-file "{org}") + )''' + ], stderr=subprocess.STDOUT) + except FileNotFoundError as e: + output = str(e).encode('utf-8') + ex = e + except subprocess.CalledProcessError as e: + output = ex.output + ex = e + + output = output.decode('utf-8') + + if ex is not None or 'Tangled 0' in output: + print("Failed to tangle database schema; " + "(check file paths):\n", + output, + file=sys.stderr) + sys.exit(1) + + @classmethod + def setUp(cls): + cwd = os.getcwd() + try: + os.chdir(os.path.dirname(cls.SCHEMA_FILE)) + # XXX: Do exceptions here prevent the super tearDown() + # from being called? + cls._tangle_schema_if_needed() + + to_run = [cls.SCHEMA_FILE] + if os.path.exists("prereq.sql"): + to_run.insert(0, "prereq.sql") + cls.run_files(*to_run) + finally: + os.chdir(cwd) + + @classmethod + def tearDown(cls): + pass + + @classmethod + def testSetUp(cls): + pass + + @classmethod + def testTearDown(cls): + pass + +class DatabaseBackupLayerHelper: + """ + A layer helper that works with another layer to + + * create a backup of the current database on `push`; + * make that backup active; + * switch the connection pool to that backup + * reverse all of that on layer `pop` + + Note that this consists of modifying values in the `DatabaseLayer`, + so the *layer* parameter must extend that. + """ + + _nodes = [] + _pools = [] + + @classmethod + def push(cls, layer): + current_node = DatabaseLayer.postgres_node + cls._nodes.append(current_node) + cls._pools.append(DatabaseLayer.connection_pool) + + with layer.borrowed_connection() as conn: + with conn.cursor() as cur: + # If we don't checkpoint here, then the backup waits + # for the next WAL checkpoint to happen. We may not have + # written much to the WAL, so we could wait until a time limit + # expires, which is ofter 30+ seconds. We don't want to wait. + cur.execute('CHECKPOINT') + + # A streaming backup uses a replication slot, but it + # does the copy in parallel. + backup = current_node.backup(xlog_method='stream') + DatabaseLayer.postgres_node = new_node = backup.spawn_primary() + new_node.start() + DatabaseLayer.connection_pool = layer.connection_pool_klass( + layer.connection_pool_minconn, + layer.connection_pool_maxconn, + dbname=layer.DATABASE_NAME, + host='localhost', + port=new_node.port, + cursor_factory=psycopg2.extras.DictCursor + ) + + @classmethod + def pop(cls, layer): # pylint:disable=unused-argument + DatabaseLayer.tearDown() # Closes the current node, and the connection pool + DatabaseLayer.postgres_node = cls._nodes.pop() + DatabaseLayer.connection_pool = cls._pools.pop() + + +_persistent_base = ( + # If we're loading a file, it has the schema + # info. + + SchemaDatabaseLayer + if not LOAD_DATABASE_ON_SETUP + else DatabaseLayer +) + +class PersistentDatabaseLayer(_persistent_base): + """ + A layer that establishes persistent data visible to + all of its tests (and all of its sub-layers). + + Sub-layers need to check whether they should + clean up or not, because we may be saving the database file. + + It's important to have a fairly linear layer + setup, or layers that don't interfere with each other. + """ + + @classmethod + def setUp(cls): + if LOAD_DATABASE_ON_SETUP: + print(f" (Loading database from {LOAD_DATABASE_ON_SETUP}) ", + end='', + flush=True) + cls.postgres_node.restore(LOAD_DATABASE_ON_SETUP) + cls.vacuum() + + @classmethod + def testSetUp(cls): + pass + + @classmethod + def testTearDown(cls): + pass + + @classmethod + def persistent_layer_skip_teardown(cls): + """ + Should persistent layers, that write data intended to be + visible between tests (and in sub-layers) tear down that data + when the layer is torn down? If we're saving the database, we + don't want to do that. + + Raising NotImplementedError causes the testrunner to assume + it's python resources that are the problem and continue in a new + subprocess, which doesn't help (and may hurt?). So you must check this as a + boolean. + """ + return SAVE_DATABASE_ON_TEARDOWN + + @classmethod + def persistent_layer_skip_setup(cls): + """ + Should persistent layers skip their setup because + we loaded a save file? + """ + return LOAD_DATABASE_ON_SETUP + + @classmethod + def tearDown(cls): + if SAVE_DATABASE_ON_TEARDOWN: + tmp_fname = cls.postgres_node.dump(format='custom') + result_fname = tmp_fname + if SAVE_DATABASE_FILENAME: + import shutil + result_fname = SAVE_DATABASE_FILENAME + while os.path.exists(result_fname): + result_fname += '.1' + shutil.move(tmp_fname, result_fname) + print(f" (Database dumped to {result_fname}) ", end='') + +def persistent_skip_setup(func): + + @functools.wraps(func) + def maybe_skip_setup(cls): + if cls.persistent_layer_skip_setup(): + return + func(cls) + return maybe_skip_setup + +def persistent_skip_teardown(func): + @functools.wraps(func) + def f(cls): + if cls.persistent_layer_skip_teardown(): + return + func(cls) + return f + +class DatabaseTestCase(unittest.TestCase): + """ + A helper test base containing some functions useful for both + benchmarking and unit testing. + """ + # pylint:disable=no-member + + @contextmanager + def assertRaisesIntegrityError(self, match=None): + if match: + with self.assertRaisesRegex(psycopg2.IntegrityError, match) as exc: + yield exc + else: + with self.assertRaises(psycopg2.IntegrityError) as exc: + yield exc + + # We can't do any queries after an error is raised + # until we rollback. + self.layer.connection.rollback() + return exc + + def assert_row_count_in_query(self, expected_count, query): + cur = self.layer.cursor + + cur.execute('SELECT COUNT(*) FROM ' + query) + row = cur.fetchone() + count = row[0] + + self.assertEqual(expected_count, count, query) + + def assert_row_count_in_table(self, expected_count, table_name): + __traceback_info__ = table_name + self.assert_row_count_in_query(expected_count, table_name) + + def assert_row_count_in_cursor(self, rowcount, cursor=None): + cur = cursor if cursor is not None else self.layer.cursor + self.assertEqual(cur.rowcount, rowcount) diff --git a/src/nti/testing/layers/tests/__init__.py b/src/nti/testing/layers/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/nti/testing/layers/tests/test_postgres.py b/src/nti/testing/layers/tests/test_postgres.py new file mode 100644 index 0000000..0050b56 --- /dev/null +++ b/src/nti/testing/layers/tests/test_postgres.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +""" +Tests for postgres.py + +""" + +import unittest + + +class TestBasic(unittest.TestCase): + + def test_imports(self): + from .. import postgres + self.assertIsNotNone(postgres) + +if __name__ == '__main__': + unittest.main() diff --git a/src/nti/testing/layers/zope.py b/src/nti/testing/layers/zope.py index 6ea0cfe..bfe2588 100644 --- a/src/nti/testing/layers/zope.py +++ b/src/nti/testing/layers/zope.py @@ -1,119 +1,25 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- """ -Test layer support. +Test layers for working with Zope libraries. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -# stdlib imports import gc import sys import unittest +import logging from zope import component from zope.component import eventtesting from zope.component.hooks import setHooks -import zope.testing.cleanup - -from . import transactionCleanUp -from .base import AbstractConfiguringObject -from .base import sharedCleanup - -from hamcrest import assert_that -from hamcrest import is_ - -__docformat__ = "restructuredtext en" - -logger = __import__('logging').getLogger(__name__) - -class GCLayerMixin(object): - """ - Mixin this layer class and call :meth:`setUpGC` from - your layer `setUp` method and likewise for the teardowns - when you want to do GC. - """ - - @classmethod - def setUp(cls): - pass - - @classmethod - def tearDown(cls): - pass - - @classmethod - def testSetUp(cls): - pass - - @classmethod - def testTearDown(cls): - # Must implement - pass - - @classmethod - def setUpGC(cls): - """ - This method disables GC until :meth:`tearDownGC` is called. - You should call it from your layer ``setUp`` method. - - It also cleans up the zope.testing state. - """ - zope.testing.cleanup.cleanUp() - cls.__isenabled = gc.isenabled() - gc.disable() - - @classmethod - def tearDownGC(cls): - """ - This method executes zope.testing's cleanup and then renables - GC. You should call if from your layer ``tearDown`` method. - """ - zope.testing.cleanup.cleanUp() - - if cls.__isenabled: - gc.enable() - - gc.collect(0) # collect one generation now to clean up weak refs - assert_that(gc.garbage, is_([])) - -class SharedCleanupLayer(object): - """ - Mixin this layer when you need cleanup functions - that run for every test. - """ - - @classmethod - def setUp(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def tearDown(cls): - # You MUST implement this, otherwise zope.testrunner - # will call the super-class again - zope.testing.cleanup.cleanUp() - - @classmethod - def testSetUp(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() - - @classmethod - def testTearDown(cls): - """ - Calls :func:`~.sharedCleanup` for every test. - """ - sharedCleanup() +from .. import transactionCleanUp +from ..base import AbstractConfiguringObject +from .cleanup import SharedCleanupLayer +logger = logging.getLogger(__name__) class ZopeComponentLayer(SharedCleanupLayer): """ @@ -213,32 +119,6 @@ def tearDownPackages(cls): # This is a duplicate of zope.component.globalregistry logger.info('Tearing down packages %s for layer %s', cls.set_up_packages, cls) gc.collect() - component.getGlobalSiteManager().__init__('base') + component.getGlobalSiteManager().__init__('base') # pylint:disable=unnecessary-dunder-call gc.collect() cls.configuration_context = None - - -def find_test(): - """ - The layer support in :class:`nose2.plugins.layers.Layers` - optionally supplies the test case object to ``testSetUp`` - and ``testTearDown``, but ``zope.testrunner`` does not do - this. If you need access to the test, you can use an idiom like this:: - - @classmethod - def testSetUp(cls, test=None): - test = test or find_test() - """ - - i = 2 - while True: - try: - frame = sys._getframe(i) # pylint:disable=protected-access - i = i + 1 - except ValueError: # pragma: no cover - return None - else: - if isinstance(frame.f_locals.get('self'), unittest.TestCase): - return frame.f_locals['self'] - if isinstance(frame.f_locals.get('test'), unittest.TestCase): - return frame.f_locals['test'] diff --git a/src/nti/testing/matchers.py b/src/nti/testing/matchers.py index d213fbc..206d7b3 100644 --- a/src/nti/testing/matchers.py +++ b/src/nti/testing/matchers.py @@ -4,16 +4,8 @@ Hamcrest matchers for testing. """ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function +from collections.abc import Sequence, Mapping -# stdlib imports -try: - from collections.abc import Sequence, Mapping -except ImportError: # pragma: no cover - # Python 2 - from collections import Sequence, Mapping # pylint:disable=deprecated-class import pprint import six @@ -55,7 +47,7 @@ class BoolMatcher(BaseMatcher): def __init__(self, value): - super(BoolMatcher, self).__init__() + super().__init__() self.value = value def _matches(self, item): @@ -82,7 +74,7 @@ def is_false(): class Provides(BaseMatcher): def __init__(self, iface): - super(Provides, self).__init__() + super().__init__() self.iface = iface def _matches(self, item): @@ -95,6 +87,7 @@ def describe_to(self, description): def __repr__(self): return 'object providing' + str(self.iface) + def provides(iface): """ Matches an object that provides the given interface. @@ -105,7 +98,7 @@ def provides(iface): class VerifyProvides(BaseMatcher): def __init__(self, iface): - super(VerifyProvides, self).__init__() + super().__init__() self.iface = iface def _matches(self, item): @@ -113,8 +106,7 @@ def _matches(self, item): verifyObject(self.iface, item) except Invalid: return False - else: - return True + return True def describe_to(self, description): description.append_text('object verifiably providing ').append_description_of(self.iface) @@ -148,15 +140,17 @@ def verifiably_provides(*ifaces): .. note:: This does **not** test schema compliance. For that (stricter) test, see :func:`validly_provides`. + """ if len(ifaces) == 1: return VerifyProvides(ifaces[0]) return hamcrest.all_of(*[VerifyProvides(x) for x in ifaces]) + class VerifyValidSchema(BaseMatcher): def __init__(self, iface): - super(VerifyValidSchema, self).__init__() + super().__init__() self.iface = iface def _matches(self, item): @@ -185,6 +179,7 @@ def describe_mismatch(self, item, mismatch_description): except Invalid as x: # pragma: no cover md.append_text(str(x)) + def validly_provides(*ifaces): """ Matches if the object verifiably and validly provides the given @@ -203,10 +198,12 @@ def validly_provides(*ifaces): return hamcrest.all_of(prov, *valid) + + class Implements(BaseMatcher): def __init__(self, iface): - super(Implements, self).__init__() + super().__init__() self.iface = iface def _matches(self, item): @@ -229,7 +226,7 @@ def implements(iface): class ValidatedBy(BaseMatcher): def __init__(self, field, invalid=Invalid): - super(ValidatedBy, self).__init__() + super().__init__() self.field = field self.invalid = invalid @@ -238,8 +235,8 @@ def _matches(self, item): self.field.validate(item) except self.invalid: return False - else: - return True + + return True def describe_to(self, description): description.append_text('data validated by').append_description_of(self.field) @@ -283,7 +280,7 @@ def not_validated_by(field, invalid=Invalid): """ return is_not(validated_by(field, invalid=invalid)) -def _aq_inContextOf_NotImplemented(child, parent): +def _aq_inContextOf_NotImplemented(child, parent): # pylint:disable=unused-argument return False try: @@ -295,7 +292,7 @@ def _aq_inContextOf_NotImplemented(child, parent): class AqInContextOf(BaseMatcher): def __init__(self, parent): - super(AqInContextOf, self).__init__() + super().__init__() self.parent = parent def _matches(self, item):