From a2ec341258e4f7588eb2632217f16f1abce9babe Mon Sep 17 00:00:00 2001 From: Denis Otkidach Date: Mon, 23 Oct 2023 12:45:23 +0300 Subject: [PATCH] Replace future and final clean-up --- aiokafka/cluster.py | 4 +- aiokafka/coordinator/base.py | 3 +- aiokafka/coordinator/consumer.py | 5 +- docs/examples/manual_commit.rst | 2 +- docs/examples/serialize_and_compress.rst | 2 +- docs/examples/ssl_consume_produce.rst | 2 +- kafka/__init__.py | 17 - kafka/future.py | 83 --- kafka/vendor/__init__.py | 0 kafka/vendor/enum34.py | 841 --------------------- kafka/vendor/selectors34.py | 637 ---------------- kafka/vendor/six.py | 897 ----------------------- kafka/vendor/socketpair.py | 58 -- setup.py | 2 +- tests/kafka/__init__.py | 8 - tests/kafka/conftest.py | 140 ---- tests/kafka/fixtures.py | 651 ---------------- tests/kafka/service.py | 133 ---- tests/kafka/testutil.py | 46 -- 19 files changed, 9 insertions(+), 3522 deletions(-) delete mode 100644 kafka/__init__.py delete mode 100644 kafka/future.py delete mode 100644 kafka/vendor/__init__.py delete mode 100644 kafka/vendor/enum34.py delete mode 100644 kafka/vendor/selectors34.py delete mode 100644 kafka/vendor/six.py delete mode 100644 kafka/vendor/socketpair.py delete mode 100644 tests/kafka/__init__.py delete mode 100644 tests/kafka/conftest.py delete mode 100644 tests/kafka/fixtures.py delete mode 100644 tests/kafka/service.py delete mode 100644 tests/kafka/testutil.py diff --git a/aiokafka/cluster.py b/aiokafka/cluster.py index fd565422..061db638 100644 --- a/aiokafka/cluster.py +++ b/aiokafka/cluster.py @@ -4,7 +4,7 @@ import threading import time -from kafka.future import Future +from concurrent.futures import Future from aiokafka import errors as Errors from aiokafka.conn import collect_hosts @@ -189,7 +189,7 @@ def request_update(self): change the reported ttl() Returns: - kafka.future.Future (value will be the cluster object after update) + Future (value will be the cluster object after update) """ with self._lock: self._need_update = True diff --git a/aiokafka/coordinator/base.py b/aiokafka/coordinator/base.py index ea6b4ccd..f1de92de 100644 --- a/aiokafka/coordinator/base.py +++ b/aiokafka/coordinator/base.py @@ -4,8 +4,7 @@ import threading import time import weakref - -from kafka.future import Future +from concurrent.futures import Future from aiokafka import errors as Errors from aiokafka.metrics import AnonMeasurable diff --git a/aiokafka/coordinator/consumer.py b/aiokafka/coordinator/consumer.py index 7604e051..dade9fcd 100644 --- a/aiokafka/coordinator/consumer.py +++ b/aiokafka/coordinator/consumer.py @@ -3,8 +3,7 @@ import functools import logging import time - -from kafka.future import Future +from concurrent.futures import Future import aiokafka.errors as Errors from aiokafka.metrics import AnonMeasurable @@ -503,7 +502,7 @@ def commit_offsets_async(self, offsets, callback=None): a commit request completes. Returns: - kafka.future.Future + Future """ self._invoke_completed_offset_commit_callbacks() if not self.coordinator_unknown(): diff --git a/docs/examples/manual_commit.rst b/docs/examples/manual_commit.rst index 30eca170..416f5ed5 100644 --- a/docs/examples/manual_commit.rst +++ b/docs/examples/manual_commit.rst @@ -22,7 +22,7 @@ Consumer: import json import asyncio - from kafka.common import KafkaError + from aiokafka.errors import KafkaError from aiokafka import AIOKafkaConsumer async def consume(): diff --git a/docs/examples/serialize_and_compress.rst b/docs/examples/serialize_and_compress.rst index 02c8dbdf..55d5a48d 100644 --- a/docs/examples/serialize_and_compress.rst +++ b/docs/examples/serialize_and_compress.rst @@ -49,7 +49,7 @@ Consumer import json import asyncio - from kafka.common import KafkaError + from aiokafka.errors import KafkaError from aiokafka import AIOKafkaConsumer def deserializer(serialized): diff --git a/docs/examples/ssl_consume_produce.rst b/docs/examples/ssl_consume_produce.rst index b3c0808f..b99d5e51 100644 --- a/docs/examples/ssl_consume_produce.rst +++ b/docs/examples/ssl_consume_produce.rst @@ -11,7 +11,7 @@ information. import asyncio from aiokafka import AIOKafkaProducer, AIOKafkaConsumer from aiokafka.helpers import create_ssl_context - from kafka.common import TopicPartition + from aiokafka.errors import TopicPartition context = create_ssl_context( cafile="./ca-cert", # CA used to sign certificate. diff --git a/kafka/__init__.py b/kafka/__init__.py deleted file mode 100644 index a40686e6..00000000 --- a/kafka/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -from __future__ import absolute_import - -__title__ = 'kafka' -__author__ = 'Dana Powers' -__license__ = 'Apache License 2.0' -__copyright__ = 'Copyright 2016 Dana Powers, David Arthur, and Contributors' - -# Set default logging handler to avoid "No handler found" warnings. -import logging -try: # Python 2.7+ - from logging import NullHandler -except ImportError: - class NullHandler(logging.Handler): - def emit(self, record): - pass - -logging.getLogger(__name__).addHandler(NullHandler()) diff --git a/kafka/future.py b/kafka/future.py deleted file mode 100644 index d0f3c665..00000000 --- a/kafka/future.py +++ /dev/null @@ -1,83 +0,0 @@ -from __future__ import absolute_import - -import functools -import logging - -log = logging.getLogger(__name__) - - -class Future(object): - error_on_callbacks = False # and errbacks - - def __init__(self): - self.is_done = False - self.value = None - self.exception = None - self._callbacks = [] - self._errbacks = [] - - def succeeded(self): - return self.is_done and not bool(self.exception) - - def failed(self): - return self.is_done and bool(self.exception) - - def retriable(self): - try: - return self.exception.retriable - except AttributeError: - return False - - def success(self, value): - assert not self.is_done, 'Future is already complete' - self.value = value - self.is_done = True - if self._callbacks: - self._call_backs('callback', self._callbacks, self.value) - return self - - def failure(self, e): - assert not self.is_done, 'Future is already complete' - self.exception = e if type(e) is not type else e() - assert isinstance(self.exception, BaseException), ( - 'future failed without an exception') - self.is_done = True - self._call_backs('errback', self._errbacks, self.exception) - return self - - def add_callback(self, f, *args, **kwargs): - if args or kwargs: - f = functools.partial(f, *args, **kwargs) - if self.is_done and not self.exception: - self._call_backs('callback', [f], self.value) - else: - self._callbacks.append(f) - return self - - def add_errback(self, f, *args, **kwargs): - if args or kwargs: - f = functools.partial(f, *args, **kwargs) - if self.is_done and self.exception: - self._call_backs('errback', [f], self.exception) - else: - self._errbacks.append(f) - return self - - def add_both(self, f, *args, **kwargs): - self.add_callback(f, *args, **kwargs) - self.add_errback(f, *args, **kwargs) - return self - - def chain(self, future): - self.add_callback(future.success) - self.add_errback(future.failure) - return self - - def _call_backs(self, back_type, backs, value): - for f in backs: - try: - f(value) - except Exception as e: - log.exception('Error processing %s', back_type) - if self.error_on_callbacks: - raise e diff --git a/kafka/vendor/__init__.py b/kafka/vendor/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/kafka/vendor/enum34.py b/kafka/vendor/enum34.py deleted file mode 100644 index 5f64bd2d..00000000 --- a/kafka/vendor/enum34.py +++ /dev/null @@ -1,841 +0,0 @@ -# pylint: skip-file -# vendored from: -# https://bitbucket.org/stoneleaf/enum34/src/58c4cd7174ca35f164304c8a6f0a4d47b779c2a7/enum/__init__.py?at=1.1.6 - -"""Python Enumerations""" - -import sys as _sys - -__all__ = ['Enum', 'IntEnum', 'unique'] - -version = 1, 1, 6 - -pyver = float('%s.%s' % _sys.version_info[:2]) - -try: - any -except NameError: - def any(iterable): - for element in iterable: - if element: - return True - return False - -try: - from collections import OrderedDict -except ImportError: - OrderedDict = None - -try: - basestring -except NameError: - # In Python 2 basestring is the ancestor of both str and unicode - # in Python 3 it's just str, but was missing in 3.1 - basestring = str - -try: - unicode -except NameError: - # In Python 3 unicode no longer exists (it's just str) - unicode = str - -class _RouteClassAttributeToGetattr(object): - """Route attribute access on a class to __getattr__. - - This is a descriptor, used to define attributes that act differently when - accessed through an instance and through a class. Instance access remains - normal, but access to an attribute through a class will be routed to the - class's __getattr__ method; this is done by raising AttributeError. - - """ - def __init__(self, fget=None): - self.fget = fget - - def __get__(self, instance, ownerclass=None): - if instance is None: - raise AttributeError() - return self.fget(instance) - - def __set__(self, instance, value): - raise AttributeError("can't set attribute") - - def __delete__(self, instance): - raise AttributeError("can't delete attribute") - - -def _is_descriptor(obj): - """Returns True if obj is a descriptor, False otherwise.""" - return ( - hasattr(obj, '__get__') or - hasattr(obj, '__set__') or - hasattr(obj, '__delete__')) - - -def _is_dunder(name): - """Returns True if a __dunder__ name, False otherwise.""" - return (name[:2] == name[-2:] == '__' and - name[2:3] != '_' and - name[-3:-2] != '_' and - len(name) > 4) - - -def _is_sunder(name): - """Returns True if a _sunder_ name, False otherwise.""" - return (name[0] == name[-1] == '_' and - name[1:2] != '_' and - name[-2:-1] != '_' and - len(name) > 2) - - -def _make_class_unpicklable(cls): - """Make the given class un-picklable.""" - def _break_on_call_reduce(self, protocol=None): - raise TypeError('%r cannot be pickled' % self) - cls.__reduce_ex__ = _break_on_call_reduce - cls.__module__ = '' - - -class _EnumDict(dict): - """Track enum member order and ensure member names are not reused. - - EnumMeta will use the names found in self._member_names as the - enumeration member names. - - """ - def __init__(self): - super(_EnumDict, self).__init__() - self._member_names = [] - - def __setitem__(self, key, value): - """Changes anything not dundered or not a descriptor. - - If a descriptor is added with the same name as an enum member, the name - is removed from _member_names (this may leave a hole in the numerical - sequence of values). - - If an enum member name is used twice, an error is raised; duplicate - values are not checked for. - - Single underscore (sunder) names are reserved. - - Note: in 3.x __order__ is simply discarded as a not necessary piece - leftover from 2.x - - """ - if pyver >= 3.0 and key in ('_order_', '__order__'): - return - elif key == '__order__': - key = '_order_' - if _is_sunder(key): - if key != '_order_': - raise ValueError('_names_ are reserved for future Enum use') - elif _is_dunder(key): - pass - elif key in self._member_names: - # descriptor overwriting an enum? - raise TypeError('Attempted to reuse key: %r' % key) - elif not _is_descriptor(value): - if key in self: - # enum overwriting a descriptor? - raise TypeError('Key already defined as: %r' % self[key]) - self._member_names.append(key) - super(_EnumDict, self).__setitem__(key, value) - - -# Dummy value for Enum as EnumMeta explicity checks for it, but of course until -# EnumMeta finishes running the first time the Enum class doesn't exist. This -# is also why there are checks in EnumMeta like `if Enum is not None` -Enum = None - - -class EnumMeta(type): - """Metaclass for Enum""" - @classmethod - def __prepare__(metacls, cls, bases): - return _EnumDict() - - def __new__(metacls, cls, bases, classdict): - # an Enum class is final once enumeration items have been defined; it - # cannot be mixed with other types (int, float, etc.) if it has an - # inherited __new__ unless a new __new__ is defined (or the resulting - # class will fail). - if type(classdict) is dict: - original_dict = classdict - classdict = _EnumDict() - for k, v in original_dict.items(): - classdict[k] = v - - member_type, first_enum = metacls._get_mixins_(bases) - __new__, save_new, use_args = metacls._find_new_(classdict, member_type, - first_enum) - # save enum items into separate mapping so they don't get baked into - # the new class - members = dict((k, classdict[k]) for k in classdict._member_names) - for name in classdict._member_names: - del classdict[name] - - # py2 support for definition order - _order_ = classdict.get('_order_') - if _order_ is None: - if pyver < 3.0: - try: - _order_ = [name for (name, value) in sorted(members.items(), key=lambda item: item[1])] - except TypeError: - _order_ = [name for name in sorted(members.keys())] - else: - _order_ = classdict._member_names - else: - del classdict['_order_'] - if pyver < 3.0: - _order_ = _order_.replace(',', ' ').split() - aliases = [name for name in members if name not in _order_] - _order_ += aliases - - # check for illegal enum names (any others?) - invalid_names = set(members) & set(['mro']) - if invalid_names: - raise ValueError('Invalid enum member name(s): %s' % ( - ', '.join(invalid_names), )) - - # save attributes from super classes so we know if we can take - # the shortcut of storing members in the class dict - base_attributes = set([a for b in bases for a in b.__dict__]) - # create our new Enum type - enum_class = super(EnumMeta, metacls).__new__(metacls, cls, bases, classdict) - enum_class._member_names_ = [] # names in random order - if OrderedDict is not None: - enum_class._member_map_ = OrderedDict() - else: - enum_class._member_map_ = {} # name->value map - enum_class._member_type_ = member_type - - # Reverse value->name map for hashable values. - enum_class._value2member_map_ = {} - - # instantiate them, checking for duplicates as we go - # we instantiate first instead of checking for duplicates first in case - # a custom __new__ is doing something funky with the values -- such as - # auto-numbering ;) - if __new__ is None: - __new__ = enum_class.__new__ - for member_name in _order_: - value = members[member_name] - if not isinstance(value, tuple): - args = (value, ) - else: - args = value - if member_type is tuple: # special case for tuple enums - args = (args, ) # wrap it one more time - if not use_args or not args: - enum_member = __new__(enum_class) - if not hasattr(enum_member, '_value_'): - enum_member._value_ = value - else: - enum_member = __new__(enum_class, *args) - if not hasattr(enum_member, '_value_'): - enum_member._value_ = member_type(*args) - value = enum_member._value_ - enum_member._name_ = member_name - enum_member.__objclass__ = enum_class - enum_member.__init__(*args) - # If another member with the same value was already defined, the - # new member becomes an alias to the existing one. - for name, canonical_member in enum_class._member_map_.items(): - if canonical_member.value == enum_member._value_: - enum_member = canonical_member - break - else: - # Aliases don't appear in member names (only in __members__). - enum_class._member_names_.append(member_name) - # performance boost for any member that would not shadow - # a DynamicClassAttribute (aka _RouteClassAttributeToGetattr) - if member_name not in base_attributes: - setattr(enum_class, member_name, enum_member) - # now add to _member_map_ - enum_class._member_map_[member_name] = enum_member - try: - # This may fail if value is not hashable. We can't add the value - # to the map, and by-value lookups for this value will be - # linear. - enum_class._value2member_map_[value] = enum_member - except TypeError: - pass - - - # If a custom type is mixed into the Enum, and it does not know how - # to pickle itself, pickle.dumps will succeed but pickle.loads will - # fail. Rather than have the error show up later and possibly far - # from the source, sabotage the pickle protocol for this class so - # that pickle.dumps also fails. - # - # However, if the new class implements its own __reduce_ex__, do not - # sabotage -- it's on them to make sure it works correctly. We use - # __reduce_ex__ instead of any of the others as it is preferred by - # pickle over __reduce__, and it handles all pickle protocols. - unpicklable = False - if '__reduce_ex__' not in classdict: - if member_type is not object: - methods = ('__getnewargs_ex__', '__getnewargs__', - '__reduce_ex__', '__reduce__') - if not any(m in member_type.__dict__ for m in methods): - _make_class_unpicklable(enum_class) - unpicklable = True - - - # double check that repr and friends are not the mixin's or various - # things break (such as pickle) - for name in ('__repr__', '__str__', '__format__', '__reduce_ex__'): - class_method = getattr(enum_class, name) - obj_method = getattr(member_type, name, None) - enum_method = getattr(first_enum, name, None) - if name not in classdict and class_method is not enum_method: - if name == '__reduce_ex__' and unpicklable: - continue - setattr(enum_class, name, enum_method) - - # method resolution and int's are not playing nice - # Python's less than 2.6 use __cmp__ - - if pyver < 2.6: - - if issubclass(enum_class, int): - setattr(enum_class, '__cmp__', getattr(int, '__cmp__')) - - elif pyver < 3.0: - - if issubclass(enum_class, int): - for method in ( - '__le__', - '__lt__', - '__gt__', - '__ge__', - '__eq__', - '__ne__', - '__hash__', - ): - setattr(enum_class, method, getattr(int, method)) - - # replace any other __new__ with our own (as long as Enum is not None, - # anyway) -- again, this is to support pickle - if Enum is not None: - # if the user defined their own __new__, save it before it gets - # clobbered in case they subclass later - if save_new: - setattr(enum_class, '__member_new__', enum_class.__dict__['__new__']) - setattr(enum_class, '__new__', Enum.__dict__['__new__']) - return enum_class - - def __bool__(cls): - """ - classes/types should always be True. - """ - return True - - def __call__(cls, value, names=None, module=None, type=None, start=1): - """Either returns an existing member, or creates a new enum class. - - This method is used both when an enum class is given a value to match - to an enumeration member (i.e. Color(3)) and for the functional API - (i.e. Color = Enum('Color', names='red green blue')). - - When used for the functional API: `module`, if set, will be stored in - the new class' __module__ attribute; `type`, if set, will be mixed in - as the first base class. - - Note: if `module` is not set this routine will attempt to discover the - calling module by walking the frame stack; if this is unsuccessful - the resulting class will not be pickleable. - - """ - if names is None: # simple value lookup - return cls.__new__(cls, value) - # otherwise, functional API: we're creating a new Enum type - return cls._create_(value, names, module=module, type=type, start=start) - - def __contains__(cls, member): - return isinstance(member, cls) and member.name in cls._member_map_ - - def __delattr__(cls, attr): - # nicer error message when someone tries to delete an attribute - # (see issue19025). - if attr in cls._member_map_: - raise AttributeError( - "%s: cannot delete Enum member." % cls.__name__) - super(EnumMeta, cls).__delattr__(attr) - - def __dir__(self): - return (['__class__', '__doc__', '__members__', '__module__'] + - self._member_names_) - - @property - def __members__(cls): - """Returns a mapping of member name->value. - - This mapping lists all enum members, including aliases. Note that this - is a copy of the internal mapping. - - """ - return cls._member_map_.copy() - - def __getattr__(cls, name): - """Return the enum member matching `name` - - We use __getattr__ instead of descriptors or inserting into the enum - class' __dict__ in order to support `name` and `value` being both - properties for enum members (which live in the class' __dict__) and - enum members themselves. - - """ - if _is_dunder(name): - raise AttributeError(name) - try: - return cls._member_map_[name] - except KeyError: - raise AttributeError(name) - - def __getitem__(cls, name): - return cls._member_map_[name] - - def __iter__(cls): - return (cls._member_map_[name] for name in cls._member_names_) - - def __reversed__(cls): - return (cls._member_map_[name] for name in reversed(cls._member_names_)) - - def __len__(cls): - return len(cls._member_names_) - - __nonzero__ = __bool__ - - def __repr__(cls): - return "" % cls.__name__ - - def __setattr__(cls, name, value): - """Block attempts to reassign Enum members. - - A simple assignment to the class namespace only changes one of the - several possible ways to get an Enum member from the Enum class, - resulting in an inconsistent Enumeration. - - """ - member_map = cls.__dict__.get('_member_map_', {}) - if name in member_map: - raise AttributeError('Cannot reassign members.') - super(EnumMeta, cls).__setattr__(name, value) - - def _create_(cls, class_name, names=None, module=None, type=None, start=1): - """Convenience method to create a new Enum class. - - `names` can be: - - * A string containing member names, separated either with spaces or - commas. Values are auto-numbered from 1. - * An iterable of member names. Values are auto-numbered from 1. - * An iterable of (member name, value) pairs. - * A mapping of member name -> value. - - """ - if pyver < 3.0: - # if class_name is unicode, attempt a conversion to ASCII - if isinstance(class_name, unicode): - try: - class_name = class_name.encode('ascii') - except UnicodeEncodeError: - raise TypeError('%r is not representable in ASCII' % class_name) - metacls = cls.__class__ - if type is None: - bases = (cls, ) - else: - bases = (type, cls) - classdict = metacls.__prepare__(class_name, bases) - _order_ = [] - - # special processing needed for names? - if isinstance(names, basestring): - names = names.replace(',', ' ').split() - if isinstance(names, (tuple, list)) and isinstance(names[0], basestring): - names = [(e, i+start) for (i, e) in enumerate(names)] - - # Here, names is either an iterable of (name, value) or a mapping. - item = None # in case names is empty - for item in names: - if isinstance(item, basestring): - member_name, member_value = item, names[item] - else: - member_name, member_value = item - classdict[member_name] = member_value - _order_.append(member_name) - # only set _order_ in classdict if name/value was not from a mapping - if not isinstance(item, basestring): - classdict['_order_'] = ' '.join(_order_) - enum_class = metacls.__new__(metacls, class_name, bases, classdict) - - # TODO: replace the frame hack if a blessed way to know the calling - # module is ever developed - if module is None: - try: - module = _sys._getframe(2).f_globals['__name__'] - except (AttributeError, ValueError): - pass - if module is None: - _make_class_unpicklable(enum_class) - else: - enum_class.__module__ = module - - return enum_class - - @staticmethod - def _get_mixins_(bases): - """Returns the type for creating enum members, and the first inherited - enum class. - - bases: the tuple of bases that was given to __new__ - - """ - if not bases or Enum is None: - return object, Enum - - - # double check that we are not subclassing a class with existing - # enumeration members; while we're at it, see if any other data - # type has been mixed in so we can use the correct __new__ - member_type = first_enum = None - for base in bases: - if (base is not Enum and - issubclass(base, Enum) and - base._member_names_): - raise TypeError("Cannot extend enumerations") - # base is now the last base in bases - if not issubclass(base, Enum): - raise TypeError("new enumerations must be created as " - "`ClassName([mixin_type,] enum_type)`") - - # get correct mix-in type (either mix-in type of Enum subclass, or - # first base if last base is Enum) - if not issubclass(bases[0], Enum): - member_type = bases[0] # first data type - first_enum = bases[-1] # enum type - else: - for base in bases[0].__mro__: - # most common: (IntEnum, int, Enum, object) - # possible: (, , - # , , - # ) - if issubclass(base, Enum): - if first_enum is None: - first_enum = base - else: - if member_type is None: - member_type = base - - return member_type, first_enum - - if pyver < 3.0: - @staticmethod - def _find_new_(classdict, member_type, first_enum): - """Returns the __new__ to be used for creating the enum members. - - classdict: the class dictionary given to __new__ - member_type: the data type whose __new__ will be used by default - first_enum: enumeration to check for an overriding __new__ - - """ - # now find the correct __new__, checking to see of one was defined - # by the user; also check earlier enum classes in case a __new__ was - # saved as __member_new__ - __new__ = classdict.get('__new__', None) - if __new__: - return None, True, True # __new__, save_new, use_args - - N__new__ = getattr(None, '__new__') - O__new__ = getattr(object, '__new__') - if Enum is None: - E__new__ = N__new__ - else: - E__new__ = Enum.__dict__['__new__'] - # check all possibles for __member_new__ before falling back to - # __new__ - for method in ('__member_new__', '__new__'): - for possible in (member_type, first_enum): - try: - target = possible.__dict__[method] - except (AttributeError, KeyError): - target = getattr(possible, method, None) - if target not in [ - None, - N__new__, - O__new__, - E__new__, - ]: - if method == '__member_new__': - classdict['__new__'] = target - return None, False, True - if isinstance(target, staticmethod): - target = target.__get__(member_type) - __new__ = target - break - if __new__ is not None: - break - else: - __new__ = object.__new__ - - # if a non-object.__new__ is used then whatever value/tuple was - # assigned to the enum member name will be passed to __new__ and to the - # new enum member's __init__ - if __new__ is object.__new__: - use_args = False - else: - use_args = True - - return __new__, False, use_args - else: - @staticmethod - def _find_new_(classdict, member_type, first_enum): - """Returns the __new__ to be used for creating the enum members. - - classdict: the class dictionary given to __new__ - member_type: the data type whose __new__ will be used by default - first_enum: enumeration to check for an overriding __new__ - - """ - # now find the correct __new__, checking to see of one was defined - # by the user; also check earlier enum classes in case a __new__ was - # saved as __member_new__ - __new__ = classdict.get('__new__', None) - - # should __new__ be saved as __member_new__ later? - save_new = __new__ is not None - - if __new__ is None: - # check all possibles for __member_new__ before falling back to - # __new__ - for method in ('__member_new__', '__new__'): - for possible in (member_type, first_enum): - target = getattr(possible, method, None) - if target not in ( - None, - None.__new__, - object.__new__, - Enum.__new__, - ): - __new__ = target - break - if __new__ is not None: - break - else: - __new__ = object.__new__ - - # if a non-object.__new__ is used then whatever value/tuple was - # assigned to the enum member name will be passed to __new__ and to the - # new enum member's __init__ - if __new__ is object.__new__: - use_args = False - else: - use_args = True - - return __new__, save_new, use_args - - -######################################################## -# In order to support Python 2 and 3 with a single -# codebase we have to create the Enum methods separately -# and then use the `type(name, bases, dict)` method to -# create the class. -######################################################## -temp_enum_dict = {} -temp_enum_dict['__doc__'] = "Generic enumeration.\n\n Derive from this class to define new enumerations.\n\n" - -def __new__(cls, value): - # all enum instances are actually created during class construction - # without calling this method; this method is called by the metaclass' - # __call__ (i.e. Color(3) ), and by pickle - if type(value) is cls: - # For lookups like Color(Color.red) - value = value.value - #return value - # by-value search for a matching enum member - # see if it's in the reverse mapping (for hashable values) - try: - if value in cls._value2member_map_: - return cls._value2member_map_[value] - except TypeError: - # not there, now do long search -- O(n) behavior - for member in cls._member_map_.values(): - if member.value == value: - return member - raise ValueError("%s is not a valid %s" % (value, cls.__name__)) -temp_enum_dict['__new__'] = __new__ -del __new__ - -def __repr__(self): - return "<%s.%s: %r>" % ( - self.__class__.__name__, self._name_, self._value_) -temp_enum_dict['__repr__'] = __repr__ -del __repr__ - -def __str__(self): - return "%s.%s" % (self.__class__.__name__, self._name_) -temp_enum_dict['__str__'] = __str__ -del __str__ - -if pyver >= 3.0: - def __dir__(self): - added_behavior = [ - m - for cls in self.__class__.mro() - for m in cls.__dict__ - if m[0] != '_' and m not in self._member_map_ - ] - return (['__class__', '__doc__', '__module__', ] + added_behavior) - temp_enum_dict['__dir__'] = __dir__ - del __dir__ - -def __format__(self, format_spec): - # mixed-in Enums should use the mixed-in type's __format__, otherwise - # we can get strange results with the Enum name showing up instead of - # the value - - # pure Enum branch - if self._member_type_ is object: - cls = str - val = str(self) - # mix-in branch - else: - cls = self._member_type_ - val = self.value - return cls.__format__(val, format_spec) -temp_enum_dict['__format__'] = __format__ -del __format__ - - -#################################### -# Python's less than 2.6 use __cmp__ - -if pyver < 2.6: - - def __cmp__(self, other): - if type(other) is self.__class__: - if self is other: - return 0 - return -1 - return NotImplemented - raise TypeError("unorderable types: %s() and %s()" % (self.__class__.__name__, other.__class__.__name__)) - temp_enum_dict['__cmp__'] = __cmp__ - del __cmp__ - -else: - - def __le__(self, other): - raise TypeError("unorderable types: %s() <= %s()" % (self.__class__.__name__, other.__class__.__name__)) - temp_enum_dict['__le__'] = __le__ - del __le__ - - def __lt__(self, other): - raise TypeError("unorderable types: %s() < %s()" % (self.__class__.__name__, other.__class__.__name__)) - temp_enum_dict['__lt__'] = __lt__ - del __lt__ - - def __ge__(self, other): - raise TypeError("unorderable types: %s() >= %s()" % (self.__class__.__name__, other.__class__.__name__)) - temp_enum_dict['__ge__'] = __ge__ - del __ge__ - - def __gt__(self, other): - raise TypeError("unorderable types: %s() > %s()" % (self.__class__.__name__, other.__class__.__name__)) - temp_enum_dict['__gt__'] = __gt__ - del __gt__ - - -def __eq__(self, other): - if type(other) is self.__class__: - return self is other - return NotImplemented -temp_enum_dict['__eq__'] = __eq__ -del __eq__ - -def __ne__(self, other): - if type(other) is self.__class__: - return self is not other - return NotImplemented -temp_enum_dict['__ne__'] = __ne__ -del __ne__ - -def __hash__(self): - return hash(self._name_) -temp_enum_dict['__hash__'] = __hash__ -del __hash__ - -def __reduce_ex__(self, proto): - return self.__class__, (self._value_, ) -temp_enum_dict['__reduce_ex__'] = __reduce_ex__ -del __reduce_ex__ - -# _RouteClassAttributeToGetattr is used to provide access to the `name` -# and `value` properties of enum members while keeping some measure of -# protection from modification, while still allowing for an enumeration -# to have members named `name` and `value`. This works because enumeration -# members are not set directly on the enum class -- __getattr__ is -# used to look them up. - -@_RouteClassAttributeToGetattr -def name(self): - return self._name_ -temp_enum_dict['name'] = name -del name - -@_RouteClassAttributeToGetattr -def value(self): - return self._value_ -temp_enum_dict['value'] = value -del value - -@classmethod -def _convert(cls, name, module, filter, source=None): - """ - Create a new Enum subclass that replaces a collection of global constants - """ - # convert all constants from source (or module) that pass filter() to - # a new Enum called name, and export the enum and its members back to - # module; - # also, replace the __reduce_ex__ method so unpickling works in - # previous Python versions - module_globals = vars(_sys.modules[module]) - if source: - source = vars(source) - else: - source = module_globals - members = dict((name, value) for name, value in source.items() if filter(name)) - cls = cls(name, members, module=module) - cls.__reduce_ex__ = _reduce_ex_by_name - module_globals.update(cls.__members__) - module_globals[name] = cls - return cls -temp_enum_dict['_convert'] = _convert -del _convert - -Enum = EnumMeta('Enum', (object, ), temp_enum_dict) -del temp_enum_dict - -# Enum has now been created -########################### - -class IntEnum(int, Enum): - """Enum where members are also (and must be) ints""" - -def _reduce_ex_by_name(self, proto): - return self.name - -def unique(enumeration): - """Class decorator that ensures only unique members exist in an enumeration.""" - duplicates = [] - for name, member in enumeration.__members__.items(): - if name != member.name: - duplicates.append((name, member.name)) - if duplicates: - duplicate_names = ', '.join( - ["%s -> %s" % (alias, name) for (alias, name) in duplicates] - ) - raise ValueError('duplicate names found in %r: %s' % - (enumeration, duplicate_names) - ) - return enumeration diff --git a/kafka/vendor/selectors34.py b/kafka/vendor/selectors34.py deleted file mode 100644 index ebf5d515..00000000 --- a/kafka/vendor/selectors34.py +++ /dev/null @@ -1,637 +0,0 @@ -# pylint: skip-file -# vendored from https://github.com/berkerpeksag/selectors34 -# at commit ff61b82168d2cc9c4922ae08e2a8bf94aab61ea2 (unreleased, ~1.2) -# -# Original author: Charles-Francois Natali (c.f.natali[at]gmail.com) -# Maintainer: Berker Peksag (berker.peksag[at]gmail.com) -# Also see https://pypi.python.org/pypi/selectors34 -"""Selectors module. - -This module allows high-level and efficient I/O multiplexing, built upon the -`select` module primitives. - -The following code adapted from trollius.selectors. -""" -from __future__ import absolute_import - -from abc import ABCMeta, abstractmethod -from collections import namedtuple, Mapping -from errno import EINTR -import math -import select -import sys - -from kafka.vendor import six - - -def _wrap_error(exc, mapping, key): - if key not in mapping: - return - new_err_cls = mapping[key] - new_err = new_err_cls(*exc.args) - - # raise a new exception with the original traceback - if hasattr(exc, '__traceback__'): - traceback = exc.__traceback__ - else: - traceback = sys.exc_info()[2] - six.reraise(new_err_cls, new_err, traceback) - - -# generic events, that must be mapped to implementation-specific ones -EVENT_READ = (1 << 0) -EVENT_WRITE = (1 << 1) - - -def _fileobj_to_fd(fileobj): - """Return a file descriptor from a file object. - - Parameters: - fileobj -- file object or file descriptor - - Returns: - corresponding file descriptor - - Raises: - ValueError if the object is invalid - """ - if isinstance(fileobj, six.integer_types): - fd = fileobj - else: - try: - fd = int(fileobj.fileno()) - except (AttributeError, TypeError, ValueError): - raise ValueError("Invalid file object: " - "{0!r}".format(fileobj)) - if fd < 0: - raise ValueError("Invalid file descriptor: {0}".format(fd)) - return fd - - -SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) -"""Object used to associate a file object to its backing file descriptor, -selected event mask and attached data.""" - - -class _SelectorMapping(Mapping): - """Mapping of file objects to selector keys.""" - - def __init__(self, selector): - self._selector = selector - - def __len__(self): - return len(self._selector._fd_to_key) - - def __getitem__(self, fileobj): - try: - fd = self._selector._fileobj_lookup(fileobj) - return self._selector._fd_to_key[fd] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - def __iter__(self): - return iter(self._selector._fd_to_key) - -# Using six.add_metaclass() decorator instead of six.with_metaclass() because -# the latter leaks temporary_class to garbage with gc disabled -@six.add_metaclass(ABCMeta) -class BaseSelector(object): - """Selector abstract base class. - - A selector supports registering file objects to be monitored for specific - I/O events. - - A file object is a file descriptor or any object with a `fileno()` method. - An arbitrary object can be attached to the file object, which can be used - for example to store context information, a callback, etc. - - A selector can use various implementations (select(), poll(), epoll()...) - depending on the platform. The default `Selector` class uses the most - efficient implementation on the current platform. - """ - - @abstractmethod - def register(self, fileobj, events, data=None): - """Register a file object. - - Parameters: - fileobj -- file object or file descriptor - events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) - data -- attached data - - Returns: - SelectorKey instance - - Raises: - ValueError if events is invalid - KeyError if fileobj is already registered - OSError if fileobj is closed or otherwise is unacceptable to - the underlying system call (if a system call is made) - - Note: - OSError may or may not be raised - """ - raise NotImplementedError - - @abstractmethod - def unregister(self, fileobj): - """Unregister a file object. - - Parameters: - fileobj -- file object or file descriptor - - Returns: - SelectorKey instance - - Raises: - KeyError if fileobj is not registered - - Note: - If fileobj is registered but has since been closed this does - *not* raise OSError (even if the wrapped syscall does) - """ - raise NotImplementedError - - def modify(self, fileobj, events, data=None): - """Change a registered file object monitored events or attached data. - - Parameters: - fileobj -- file object or file descriptor - events -- events to monitor (bitwise mask of EVENT_READ|EVENT_WRITE) - data -- attached data - - Returns: - SelectorKey instance - - Raises: - Anything that unregister() or register() raises - """ - self.unregister(fileobj) - return self.register(fileobj, events, data) - - @abstractmethod - def select(self, timeout=None): - """Perform the actual selection, until some monitored file objects are - ready or a timeout expires. - - Parameters: - timeout -- if timeout > 0, this specifies the maximum wait time, in - seconds - if timeout <= 0, the select() call won't block, and will - report the currently ready file objects - if timeout is None, select() will block until a monitored - file object becomes ready - - Returns: - list of (key, events) for ready file objects - `events` is a bitwise mask of EVENT_READ|EVENT_WRITE - """ - raise NotImplementedError - - def close(self): - """Close the selector. - - This must be called to make sure that any underlying resource is freed. - """ - pass - - def get_key(self, fileobj): - """Return the key associated to a registered file object. - - Returns: - SelectorKey for this file object - """ - mapping = self.get_map() - if mapping is None: - raise RuntimeError('Selector is closed') - try: - return mapping[fileobj] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - @abstractmethod - def get_map(self): - """Return a mapping of file objects to selector keys.""" - raise NotImplementedError - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - - -class _BaseSelectorImpl(BaseSelector): - """Base selector implementation.""" - - def __init__(self): - # this maps file descriptors to keys - self._fd_to_key = {} - # read-only mapping returned by get_map() - self._map = _SelectorMapping(self) - - def _fileobj_lookup(self, fileobj): - """Return a file descriptor from a file object. - - This wraps _fileobj_to_fd() to do an exhaustive search in case - the object is invalid but we still have it in our map. This - is used by unregister() so we can unregister an object that - was previously registered even if it is closed. It is also - used by _SelectorMapping. - """ - try: - return _fileobj_to_fd(fileobj) - except ValueError: - # Do an exhaustive search. - for key in self._fd_to_key.values(): - if key.fileobj is fileobj: - return key.fd - # Raise ValueError after all. - raise - - def register(self, fileobj, events, data=None): - if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): - raise ValueError("Invalid events: {0!r}".format(events)) - - key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) - - if key.fd in self._fd_to_key: - raise KeyError("{0!r} (FD {1}) is already registered" - .format(fileobj, key.fd)) - - self._fd_to_key[key.fd] = key - return key - - def unregister(self, fileobj): - try: - key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - return key - - def modify(self, fileobj, events, data=None): - # TODO: Subclasses can probably optimize this even further. - try: - key = self._fd_to_key[self._fileobj_lookup(fileobj)] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - if events != key.events: - self.unregister(fileobj) - key = self.register(fileobj, events, data) - elif data != key.data: - # Use a shortcut to update the data. - key = key._replace(data=data) - self._fd_to_key[key.fd] = key - return key - - def close(self): - self._fd_to_key.clear() - self._map = None - - def get_map(self): - return self._map - - def _key_from_fd(self, fd): - """Return the key associated to a given file descriptor. - - Parameters: - fd -- file descriptor - - Returns: - corresponding key, or None if not found - """ - try: - return self._fd_to_key[fd] - except KeyError: - return None - - -class SelectSelector(_BaseSelectorImpl): - """Select-based selector.""" - - def __init__(self): - super(SelectSelector, self).__init__() - self._readers = set() - self._writers = set() - - def register(self, fileobj, events, data=None): - key = super(SelectSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - self._readers.add(key.fd) - if events & EVENT_WRITE: - self._writers.add(key.fd) - return key - - def unregister(self, fileobj): - key = super(SelectSelector, self).unregister(fileobj) - self._readers.discard(key.fd) - self._writers.discard(key.fd) - return key - - if sys.platform == 'win32': - def _select(self, r, w, _, timeout=None): - r, w, x = select.select(r, w, w, timeout) - return r, w + x, [] - else: - _select = staticmethod(select.select) - - def select(self, timeout=None): - timeout = None if timeout is None else max(timeout, 0) - ready = [] - try: - r, w, _ = self._select(self._readers, self._writers, [], timeout) - except select.error as exc: - if exc.args[0] == EINTR: - return ready - else: - raise - r = set(r) - w = set(w) - for fd in r | w: - events = 0 - if fd in r: - events |= EVENT_READ - if fd in w: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - -if hasattr(select, 'poll'): - - class PollSelector(_BaseSelectorImpl): - """Poll-based selector.""" - - def __init__(self): - super(PollSelector, self).__init__() - self._poll = select.poll() - - def register(self, fileobj, events, data=None): - key = super(PollSelector, self).register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._poll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super(PollSelector, self).unregister(fileobj) - self._poll.unregister(key.fd) - return key - - def select(self, timeout=None): - if timeout is None: - timeout = None - elif timeout <= 0: - timeout = 0 - else: - # poll() has a resolution of 1 millisecond, round away from - # zero to wait *at least* timeout seconds. - timeout = int(math.ceil(timeout * 1e3)) - ready = [] - try: - fd_event_list = self._poll.poll(timeout) - except select.error as exc: - if exc.args[0] == EINTR: - return ready - else: - raise - for fd, event in fd_event_list: - events = 0 - if event & ~select.POLLIN: - events |= EVENT_WRITE - if event & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - -if hasattr(select, 'epoll'): - - class EpollSelector(_BaseSelectorImpl): - """Epoll-based selector.""" - - def __init__(self): - super(EpollSelector, self).__init__() - self._epoll = select.epoll() - - def fileno(self): - return self._epoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(EpollSelector, self).register(fileobj, events, data) - epoll_events = 0 - if events & EVENT_READ: - epoll_events |= select.EPOLLIN - if events & EVENT_WRITE: - epoll_events |= select.EPOLLOUT - self._epoll.register(key.fd, epoll_events) - return key - - def unregister(self, fileobj): - key = super(EpollSelector, self).unregister(fileobj) - try: - self._epoll.unregister(key.fd) - except IOError: - # This can happen if the FD was closed since it - # was registered. - pass - return key - - def select(self, timeout=None): - if timeout is None: - timeout = -1 - elif timeout <= 0: - timeout = 0 - else: - # epoll_wait() has a resolution of 1 millisecond, round away - # from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) * 1e-3 - - # epoll_wait() expects `maxevents` to be greater than zero; - # we want to make sure that `select()` can be called when no - # FD is registered. - max_ev = max(len(self._fd_to_key), 1) - - ready = [] - try: - fd_event_list = self._epoll.poll(timeout, max_ev) - except IOError as exc: - if exc.errno == EINTR: - return ready - else: - raise - for fd, event in fd_event_list: - events = 0 - if event & ~select.EPOLLIN: - events |= EVENT_WRITE - if event & ~select.EPOLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._epoll.close() - super(EpollSelector, self).close() - - -if hasattr(select, 'devpoll'): - - class DevpollSelector(_BaseSelectorImpl): - """Solaris /dev/poll selector.""" - - def __init__(self): - super(DevpollSelector, self).__init__() - self._devpoll = select.devpoll() - - def fileno(self): - return self._devpoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(DevpollSelector, self).register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._devpoll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super(DevpollSelector, self).unregister(fileobj) - self._devpoll.unregister(key.fd) - return key - - def select(self, timeout=None): - if timeout is None: - timeout = None - elif timeout <= 0: - timeout = 0 - else: - # devpoll() has a resolution of 1 millisecond, round away from - # zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1e3) - ready = [] - try: - fd_event_list = self._devpoll.poll(timeout) - except OSError as exc: - if exc.errno == EINTR: - return ready - else: - raise - for fd, event in fd_event_list: - events = 0 - if event & ~select.POLLIN: - events |= EVENT_WRITE - if event & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._devpoll.close() - super(DevpollSelector, self).close() - - -if hasattr(select, 'kqueue'): - - class KqueueSelector(_BaseSelectorImpl): - """Kqueue-based selector.""" - - def __init__(self): - super(KqueueSelector, self).__init__() - self._kqueue = select.kqueue() - - def fileno(self): - return self._kqueue.fileno() - - def register(self, fileobj, events, data=None): - key = super(KqueueSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - kev = select.kevent(key.fd, select.KQ_FILTER_READ, - select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - if events & EVENT_WRITE: - kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, - select.KQ_EV_ADD) - self._kqueue.control([kev], 0, 0) - return key - - def unregister(self, fileobj): - key = super(KqueueSelector, self).unregister(fileobj) - if key.events & EVENT_READ: - kev = select.kevent(key.fd, select.KQ_FILTER_READ, - select.KQ_EV_DELETE) - try: - self._kqueue.control([kev], 0, 0) - except OSError: - # This can happen if the FD was closed since it - # was registered. - pass - if key.events & EVENT_WRITE: - kev = select.kevent(key.fd, select.KQ_FILTER_WRITE, - select.KQ_EV_DELETE) - try: - self._kqueue.control([kev], 0, 0) - except OSError: - # See comment above. - pass - return key - - def select(self, timeout=None): - timeout = None if timeout is None else max(timeout, 0) - max_ev = len(self._fd_to_key) - ready = [] - try: - kev_list = self._kqueue.control(None, max_ev, timeout) - except OSError as exc: - if exc.errno == EINTR: - return ready - else: - raise - for kev in kev_list: - fd = kev.ident - flag = kev.filter - events = 0 - if flag == select.KQ_FILTER_READ: - events |= EVENT_READ - if flag == select.KQ_FILTER_WRITE: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._kqueue.close() - super(KqueueSelector, self).close() - - -# Choose the best implementation, roughly: -# epoll|kqueue|devpoll > poll > select. -# select() also can't accept a FD > FD_SETSIZE (usually around 1024) -if 'KqueueSelector' in globals(): - DefaultSelector = KqueueSelector -elif 'EpollSelector' in globals(): - DefaultSelector = EpollSelector -elif 'DevpollSelector' in globals(): - DefaultSelector = DevpollSelector -elif 'PollSelector' in globals(): - DefaultSelector = PollSelector -else: - DefaultSelector = SelectSelector diff --git a/kafka/vendor/six.py b/kafka/vendor/six.py deleted file mode 100644 index 3621a0ab..00000000 --- a/kafka/vendor/six.py +++ /dev/null @@ -1,897 +0,0 @@ -# pylint: skip-file - -# Copyright (c) 2010-2017 Benjamin Peterson -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -"""Utilities for writing code that runs on Python 2 and 3""" - -from __future__ import absolute_import - -import functools -import itertools -import operator -import sys -import types - -__author__ = "Benjamin Peterson " -__version__ = "1.11.0" - - -# Useful for very coarse version differentiation. -PY2 = sys.version_info[0] == 2 -PY3 = sys.version_info[0] == 3 -PY34 = sys.version_info[0:2] >= (3, 4) - -if PY3: - string_types = str, - integer_types = int, - class_types = type, - text_type = str - binary_type = bytes - - MAXSIZE = sys.maxsize -else: - string_types = basestring, - integer_types = (int, long) - class_types = (type, types.ClassType) - text_type = unicode - binary_type = str - - if sys.platform.startswith("java"): - # Jython always uses 32 bits. - MAXSIZE = int((1 << 31) - 1) - else: - # It's possible to have sizeof(long) != sizeof(Py_ssize_t). - class X(object): - - def __len__(self): - return 1 << 31 - try: - len(X()) - except OverflowError: - # 32-bit - MAXSIZE = int((1 << 31) - 1) - else: - # 64-bit - MAXSIZE = int((1 << 63) - 1) - - # Don't del it here, cause with gc disabled this "leaks" to garbage. - # Note: This is a kafka-python customization, details at: - # https://github.com/dpkp/kafka-python/pull/979#discussion_r100403389 - # del X - - -def _add_doc(func, doc): - """Add documentation to a function.""" - func.__doc__ = doc - - -def _import_module(name): - """Import module, returning the module after the last dot.""" - __import__(name) - return sys.modules[name] - - -class _LazyDescr(object): - - def __init__(self, name): - self.name = name - - def __get__(self, obj, tp): - result = self._resolve() - setattr(obj, self.name, result) # Invokes __set__. - try: - # This is a bit ugly, but it avoids running this again by - # removing this descriptor. - delattr(obj.__class__, self.name) - except AttributeError: - pass - return result - - -class MovedModule(_LazyDescr): - - def __init__(self, name, old, new=None): - super(MovedModule, self).__init__(name) - if PY3: - if new is None: - new = name - self.mod = new - else: - self.mod = old - - def _resolve(self): - return _import_module(self.mod) - - def __getattr__(self, attr): - _module = self._resolve() - value = getattr(_module, attr) - setattr(self, attr, value) - return value - - -class _LazyModule(types.ModuleType): - - def __init__(self, name): - super(_LazyModule, self).__init__(name) - self.__doc__ = self.__class__.__doc__ - - def __dir__(self): - attrs = ["__doc__", "__name__"] - attrs += [attr.name for attr in self._moved_attributes] - return attrs - - # Subclasses should override this - _moved_attributes = [] - - -class MovedAttribute(_LazyDescr): - - def __init__(self, name, old_mod, new_mod, old_attr=None, new_attr=None): - super(MovedAttribute, self).__init__(name) - if PY3: - if new_mod is None: - new_mod = name - self.mod = new_mod - if new_attr is None: - if old_attr is None: - new_attr = name - else: - new_attr = old_attr - self.attr = new_attr - else: - self.mod = old_mod - if old_attr is None: - old_attr = name - self.attr = old_attr - - def _resolve(self): - module = _import_module(self.mod) - return getattr(module, self.attr) - - -class _SixMetaPathImporter(object): - - """ - A meta path importer to import six.moves and its submodules. - - This class implements a PEP302 finder and loader. It should be compatible - with Python 2.5 and all existing versions of Python3 - """ - - def __init__(self, six_module_name): - self.name = six_module_name - self.known_modules = {} - - def _add_module(self, mod, *fullnames): - for fullname in fullnames: - self.known_modules[self.name + "." + fullname] = mod - - def _get_module(self, fullname): - return self.known_modules[self.name + "." + fullname] - - def find_module(self, fullname, path=None): - if fullname in self.known_modules: - return self - return None - - def __get_module(self, fullname): - try: - return self.known_modules[fullname] - except KeyError: - raise ImportError("This loader does not know module " + fullname) - - def load_module(self, fullname): - try: - # in case of a reload - return sys.modules[fullname] - except KeyError: - pass - mod = self.__get_module(fullname) - if isinstance(mod, MovedModule): - mod = mod._resolve() - else: - mod.__loader__ = self - sys.modules[fullname] = mod - return mod - - def is_package(self, fullname): - """ - Return true, if the named module is a package. - - We need this method to get correct spec objects with - Python 3.4 (see PEP451) - """ - return hasattr(self.__get_module(fullname), "__path__") - - def get_code(self, fullname): - """Return None - - Required, if is_package is implemented""" - self.__get_module(fullname) # eventually raises ImportError - return None - get_source = get_code # same as get_code - -_importer = _SixMetaPathImporter(__name__) - - -class _MovedItems(_LazyModule): - - """Lazy loading of moved objects""" - __path__ = [] # mark as package - - -_moved_attributes = [ - MovedAttribute("cStringIO", "cStringIO", "io", "StringIO"), - MovedAttribute("filter", "itertools", "builtins", "ifilter", "filter"), - MovedAttribute("filterfalse", "itertools", "itertools", "ifilterfalse", "filterfalse"), - MovedAttribute("input", "__builtin__", "builtins", "raw_input", "input"), - MovedAttribute("intern", "__builtin__", "sys"), - MovedAttribute("map", "itertools", "builtins", "imap", "map"), - MovedAttribute("getcwd", "os", "os", "getcwdu", "getcwd"), - MovedAttribute("getcwdb", "os", "os", "getcwd", "getcwdb"), - MovedAttribute("getoutput", "commands", "subprocess"), - MovedAttribute("range", "__builtin__", "builtins", "xrange", "range"), - MovedAttribute("reload_module", "__builtin__", "importlib" if PY34 else "imp", "reload"), - MovedAttribute("reduce", "__builtin__", "functools"), - MovedAttribute("shlex_quote", "pipes", "shlex", "quote"), - MovedAttribute("StringIO", "StringIO", "io"), - MovedAttribute("UserDict", "UserDict", "collections"), - MovedAttribute("UserList", "UserList", "collections"), - MovedAttribute("UserString", "UserString", "collections"), - MovedAttribute("xrange", "__builtin__", "builtins", "xrange", "range"), - MovedAttribute("zip", "itertools", "builtins", "izip", "zip"), - MovedAttribute("zip_longest", "itertools", "itertools", "izip_longest", "zip_longest"), - MovedModule("builtins", "__builtin__"), - MovedModule("configparser", "ConfigParser"), - MovedModule("copyreg", "copy_reg"), - MovedModule("dbm_gnu", "gdbm", "dbm.gnu"), - MovedModule("_dummy_thread", "dummy_thread", "_dummy_thread"), - MovedModule("http_cookiejar", "cookielib", "http.cookiejar"), - MovedModule("http_cookies", "Cookie", "http.cookies"), - MovedModule("html_entities", "htmlentitydefs", "html.entities"), - MovedModule("html_parser", "HTMLParser", "html.parser"), - MovedModule("http_client", "httplib", "http.client"), - MovedModule("email_mime_base", "email.MIMEBase", "email.mime.base"), - MovedModule("email_mime_image", "email.MIMEImage", "email.mime.image"), - MovedModule("email_mime_multipart", "email.MIMEMultipart", "email.mime.multipart"), - MovedModule("email_mime_nonmultipart", "email.MIMENonMultipart", "email.mime.nonmultipart"), - MovedModule("email_mime_text", "email.MIMEText", "email.mime.text"), - MovedModule("BaseHTTPServer", "BaseHTTPServer", "http.server"), - MovedModule("CGIHTTPServer", "CGIHTTPServer", "http.server"), - MovedModule("SimpleHTTPServer", "SimpleHTTPServer", "http.server"), - MovedModule("cPickle", "cPickle", "pickle"), - MovedModule("queue", "Queue"), - MovedModule("reprlib", "repr"), - MovedModule("socketserver", "SocketServer"), - MovedModule("_thread", "thread", "_thread"), - MovedModule("tkinter", "Tkinter"), - MovedModule("tkinter_dialog", "Dialog", "tkinter.dialog"), - MovedModule("tkinter_filedialog", "FileDialog", "tkinter.filedialog"), - MovedModule("tkinter_scrolledtext", "ScrolledText", "tkinter.scrolledtext"), - MovedModule("tkinter_simpledialog", "SimpleDialog", "tkinter.simpledialog"), - MovedModule("tkinter_tix", "Tix", "tkinter.tix"), - MovedModule("tkinter_ttk", "ttk", "tkinter.ttk"), - MovedModule("tkinter_constants", "Tkconstants", "tkinter.constants"), - MovedModule("tkinter_dnd", "Tkdnd", "tkinter.dnd"), - MovedModule("tkinter_colorchooser", "tkColorChooser", - "tkinter.colorchooser"), - MovedModule("tkinter_commondialog", "tkCommonDialog", - "tkinter.commondialog"), - MovedModule("tkinter_tkfiledialog", "tkFileDialog", "tkinter.filedialog"), - MovedModule("tkinter_font", "tkFont", "tkinter.font"), - MovedModule("tkinter_messagebox", "tkMessageBox", "tkinter.messagebox"), - MovedModule("tkinter_tksimpledialog", "tkSimpleDialog", - "tkinter.simpledialog"), - MovedModule("urllib_parse", __name__ + ".moves.urllib_parse", "urllib.parse"), - MovedModule("urllib_error", __name__ + ".moves.urllib_error", "urllib.error"), - MovedModule("urllib", __name__ + ".moves.urllib", __name__ + ".moves.urllib"), - MovedModule("urllib_robotparser", "robotparser", "urllib.robotparser"), - MovedModule("xmlrpc_client", "xmlrpclib", "xmlrpc.client"), - MovedModule("xmlrpc_server", "SimpleXMLRPCServer", "xmlrpc.server"), -] -# Add windows specific modules. -if sys.platform == "win32": - _moved_attributes += [ - MovedModule("winreg", "_winreg"), - ] - -for attr in _moved_attributes: - setattr(_MovedItems, attr.name, attr) - if isinstance(attr, MovedModule): - _importer._add_module(attr, "moves." + attr.name) -del attr - -_MovedItems._moved_attributes = _moved_attributes - -moves = _MovedItems(__name__ + ".moves") -_importer._add_module(moves, "moves") - - -class Module_six_moves_urllib_parse(_LazyModule): - - """Lazy loading of moved objects in six.moves.urllib_parse""" - - -_urllib_parse_moved_attributes = [ - MovedAttribute("ParseResult", "urlparse", "urllib.parse"), - MovedAttribute("SplitResult", "urlparse", "urllib.parse"), - MovedAttribute("parse_qs", "urlparse", "urllib.parse"), - MovedAttribute("parse_qsl", "urlparse", "urllib.parse"), - MovedAttribute("urldefrag", "urlparse", "urllib.parse"), - MovedAttribute("urljoin", "urlparse", "urllib.parse"), - MovedAttribute("urlparse", "urlparse", "urllib.parse"), - MovedAttribute("urlsplit", "urlparse", "urllib.parse"), - MovedAttribute("urlunparse", "urlparse", "urllib.parse"), - MovedAttribute("urlunsplit", "urlparse", "urllib.parse"), - MovedAttribute("quote", "urllib", "urllib.parse"), - MovedAttribute("quote_plus", "urllib", "urllib.parse"), - MovedAttribute("unquote", "urllib", "urllib.parse"), - MovedAttribute("unquote_plus", "urllib", "urllib.parse"), - MovedAttribute("unquote_to_bytes", "urllib", "urllib.parse", "unquote", "unquote_to_bytes"), - MovedAttribute("urlencode", "urllib", "urllib.parse"), - MovedAttribute("splitquery", "urllib", "urllib.parse"), - MovedAttribute("splittag", "urllib", "urllib.parse"), - MovedAttribute("splituser", "urllib", "urllib.parse"), - MovedAttribute("splitvalue", "urllib", "urllib.parse"), - MovedAttribute("uses_fragment", "urlparse", "urllib.parse"), - MovedAttribute("uses_netloc", "urlparse", "urllib.parse"), - MovedAttribute("uses_params", "urlparse", "urllib.parse"), - MovedAttribute("uses_query", "urlparse", "urllib.parse"), - MovedAttribute("uses_relative", "urlparse", "urllib.parse"), -] -for attr in _urllib_parse_moved_attributes: - setattr(Module_six_moves_urllib_parse, attr.name, attr) -del attr - -Module_six_moves_urllib_parse._moved_attributes = _urllib_parse_moved_attributes - -_importer._add_module(Module_six_moves_urllib_parse(__name__ + ".moves.urllib_parse"), - "moves.urllib_parse", "moves.urllib.parse") - - -class Module_six_moves_urllib_error(_LazyModule): - - """Lazy loading of moved objects in six.moves.urllib_error""" - - -_urllib_error_moved_attributes = [ - MovedAttribute("URLError", "urllib2", "urllib.error"), - MovedAttribute("HTTPError", "urllib2", "urllib.error"), - MovedAttribute("ContentTooShortError", "urllib", "urllib.error"), -] -for attr in _urllib_error_moved_attributes: - setattr(Module_six_moves_urllib_error, attr.name, attr) -del attr - -Module_six_moves_urllib_error._moved_attributes = _urllib_error_moved_attributes - -_importer._add_module(Module_six_moves_urllib_error(__name__ + ".moves.urllib.error"), - "moves.urllib_error", "moves.urllib.error") - - -class Module_six_moves_urllib_request(_LazyModule): - - """Lazy loading of moved objects in six.moves.urllib_request""" - - -_urllib_request_moved_attributes = [ - MovedAttribute("urlopen", "urllib2", "urllib.request"), - MovedAttribute("install_opener", "urllib2", "urllib.request"), - MovedAttribute("build_opener", "urllib2", "urllib.request"), - MovedAttribute("pathname2url", "urllib", "urllib.request"), - MovedAttribute("url2pathname", "urllib", "urllib.request"), - MovedAttribute("getproxies", "urllib", "urllib.request"), - MovedAttribute("Request", "urllib2", "urllib.request"), - MovedAttribute("OpenerDirector", "urllib2", "urllib.request"), - MovedAttribute("HTTPDefaultErrorHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPRedirectHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPCookieProcessor", "urllib2", "urllib.request"), - MovedAttribute("ProxyHandler", "urllib2", "urllib.request"), - MovedAttribute("BaseHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPPasswordMgr", "urllib2", "urllib.request"), - MovedAttribute("HTTPPasswordMgrWithDefaultRealm", "urllib2", "urllib.request"), - MovedAttribute("AbstractBasicAuthHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPBasicAuthHandler", "urllib2", "urllib.request"), - MovedAttribute("ProxyBasicAuthHandler", "urllib2", "urllib.request"), - MovedAttribute("AbstractDigestAuthHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPDigestAuthHandler", "urllib2", "urllib.request"), - MovedAttribute("ProxyDigestAuthHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPSHandler", "urllib2", "urllib.request"), - MovedAttribute("FileHandler", "urllib2", "urllib.request"), - MovedAttribute("FTPHandler", "urllib2", "urllib.request"), - MovedAttribute("CacheFTPHandler", "urllib2", "urllib.request"), - MovedAttribute("UnknownHandler", "urllib2", "urllib.request"), - MovedAttribute("HTTPErrorProcessor", "urllib2", "urllib.request"), - MovedAttribute("urlretrieve", "urllib", "urllib.request"), - MovedAttribute("urlcleanup", "urllib", "urllib.request"), - MovedAttribute("URLopener", "urllib", "urllib.request"), - MovedAttribute("FancyURLopener", "urllib", "urllib.request"), - MovedAttribute("proxy_bypass", "urllib", "urllib.request"), - MovedAttribute("parse_http_list", "urllib2", "urllib.request"), - MovedAttribute("parse_keqv_list", "urllib2", "urllib.request"), -] -for attr in _urllib_request_moved_attributes: - setattr(Module_six_moves_urllib_request, attr.name, attr) -del attr - -Module_six_moves_urllib_request._moved_attributes = _urllib_request_moved_attributes - -_importer._add_module(Module_six_moves_urllib_request(__name__ + ".moves.urllib.request"), - "moves.urllib_request", "moves.urllib.request") - - -class Module_six_moves_urllib_response(_LazyModule): - - """Lazy loading of moved objects in six.moves.urllib_response""" - - -_urllib_response_moved_attributes = [ - MovedAttribute("addbase", "urllib", "urllib.response"), - MovedAttribute("addclosehook", "urllib", "urllib.response"), - MovedAttribute("addinfo", "urllib", "urllib.response"), - MovedAttribute("addinfourl", "urllib", "urllib.response"), -] -for attr in _urllib_response_moved_attributes: - setattr(Module_six_moves_urllib_response, attr.name, attr) -del attr - -Module_six_moves_urllib_response._moved_attributes = _urllib_response_moved_attributes - -_importer._add_module(Module_six_moves_urllib_response(__name__ + ".moves.urllib.response"), - "moves.urllib_response", "moves.urllib.response") - - -class Module_six_moves_urllib_robotparser(_LazyModule): - - """Lazy loading of moved objects in six.moves.urllib_robotparser""" - - -_urllib_robotparser_moved_attributes = [ - MovedAttribute("RobotFileParser", "robotparser", "urllib.robotparser"), -] -for attr in _urllib_robotparser_moved_attributes: - setattr(Module_six_moves_urllib_robotparser, attr.name, attr) -del attr - -Module_six_moves_urllib_robotparser._moved_attributes = _urllib_robotparser_moved_attributes - -_importer._add_module(Module_six_moves_urllib_robotparser(__name__ + ".moves.urllib.robotparser"), - "moves.urllib_robotparser", "moves.urllib.robotparser") - - -class Module_six_moves_urllib(types.ModuleType): - - """Create a six.moves.urllib namespace that resembles the Python 3 namespace""" - __path__ = [] # mark as package - parse = _importer._get_module("moves.urllib_parse") - error = _importer._get_module("moves.urllib_error") - request = _importer._get_module("moves.urllib_request") - response = _importer._get_module("moves.urllib_response") - robotparser = _importer._get_module("moves.urllib_robotparser") - - def __dir__(self): - return ['parse', 'error', 'request', 'response', 'robotparser'] - -_importer._add_module(Module_six_moves_urllib(__name__ + ".moves.urllib"), - "moves.urllib") - - -def add_move(move): - """Add an item to six.moves.""" - setattr(_MovedItems, move.name, move) - - -def remove_move(name): - """Remove item from six.moves.""" - try: - delattr(_MovedItems, name) - except AttributeError: - try: - del moves.__dict__[name] - except KeyError: - raise AttributeError("no such move, %r" % (name,)) - - -if PY3: - _meth_func = "__func__" - _meth_self = "__self__" - - _func_closure = "__closure__" - _func_code = "__code__" - _func_defaults = "__defaults__" - _func_globals = "__globals__" -else: - _meth_func = "im_func" - _meth_self = "im_self" - - _func_closure = "func_closure" - _func_code = "func_code" - _func_defaults = "func_defaults" - _func_globals = "func_globals" - - -try: - advance_iterator = next -except NameError: - def advance_iterator(it): - return it.next() -next = advance_iterator - - -try: - callable = callable -except NameError: - def callable(obj): - return any("__call__" in klass.__dict__ for klass in type(obj).__mro__) - - -if PY3: - def get_unbound_function(unbound): - return unbound - - create_bound_method = types.MethodType - - def create_unbound_method(func, cls): - return func - - Iterator = object -else: - def get_unbound_function(unbound): - return unbound.im_func - - def create_bound_method(func, obj): - return types.MethodType(func, obj, obj.__class__) - - def create_unbound_method(func, cls): - return types.MethodType(func, None, cls) - - class Iterator(object): - - def next(self): - return type(self).__next__(self) - - callable = callable -_add_doc(get_unbound_function, - """Get the function out of a possibly unbound function""") - - -get_method_function = operator.attrgetter(_meth_func) -get_method_self = operator.attrgetter(_meth_self) -get_function_closure = operator.attrgetter(_func_closure) -get_function_code = operator.attrgetter(_func_code) -get_function_defaults = operator.attrgetter(_func_defaults) -get_function_globals = operator.attrgetter(_func_globals) - - -if PY3: - def iterkeys(d, **kw): - return iter(d.keys(**kw)) - - def itervalues(d, **kw): - return iter(d.values(**kw)) - - def iteritems(d, **kw): - return iter(d.items(**kw)) - - def iterlists(d, **kw): - return iter(d.lists(**kw)) - - viewkeys = operator.methodcaller("keys") - - viewvalues = operator.methodcaller("values") - - viewitems = operator.methodcaller("items") -else: - def iterkeys(d, **kw): - return d.iterkeys(**kw) - - def itervalues(d, **kw): - return d.itervalues(**kw) - - def iteritems(d, **kw): - return d.iteritems(**kw) - - def iterlists(d, **kw): - return d.iterlists(**kw) - - viewkeys = operator.methodcaller("viewkeys") - - viewvalues = operator.methodcaller("viewvalues") - - viewitems = operator.methodcaller("viewitems") - -_add_doc(iterkeys, "Return an iterator over the keys of a dictionary.") -_add_doc(itervalues, "Return an iterator over the values of a dictionary.") -_add_doc(iteritems, - "Return an iterator over the (key, value) pairs of a dictionary.") -_add_doc(iterlists, - "Return an iterator over the (key, [values]) pairs of a dictionary.") - - -if PY3: - def b(s): - return s.encode("latin-1") - - def u(s): - return s - unichr = chr - import struct - int2byte = struct.Struct(">B").pack - del struct - byte2int = operator.itemgetter(0) - indexbytes = operator.getitem - iterbytes = iter - import io - StringIO = io.StringIO - BytesIO = io.BytesIO - _assertCountEqual = "assertCountEqual" - if sys.version_info[1] <= 1: - _assertRaisesRegex = "assertRaisesRegexp" - _assertRegex = "assertRegexpMatches" - else: - _assertRaisesRegex = "assertRaisesRegex" - _assertRegex = "assertRegex" -else: - def b(s): - return s - # Workaround for standalone backslash - - def u(s): - return unicode(s.replace(r'\\', r'\\\\'), "unicode_escape") - unichr = unichr - int2byte = chr - - def byte2int(bs): - return ord(bs[0]) - - def indexbytes(buf, i): - return ord(buf[i]) - iterbytes = functools.partial(itertools.imap, ord) - import StringIO - StringIO = BytesIO = StringIO.StringIO - _assertCountEqual = "assertItemsEqual" - _assertRaisesRegex = "assertRaisesRegexp" - _assertRegex = "assertRegexpMatches" -_add_doc(b, """Byte literal""") -_add_doc(u, """Text literal""") - - -def assertCountEqual(self, *args, **kwargs): - return getattr(self, _assertCountEqual)(*args, **kwargs) - - -def assertRaisesRegex(self, *args, **kwargs): - return getattr(self, _assertRaisesRegex)(*args, **kwargs) - - -def assertRegex(self, *args, **kwargs): - return getattr(self, _assertRegex)(*args, **kwargs) - - -if PY3: - exec_ = getattr(moves.builtins, "exec") - - def reraise(tp, value, tb=None): - try: - if value is None: - value = tp() - if value.__traceback__ is not tb: - raise value.with_traceback(tb) - raise value - finally: - value = None - tb = None - -else: - def exec_(_code_, _globs_=None, _locs_=None): - """Execute code in a namespace.""" - if _globs_ is None: - frame = sys._getframe(1) - _globs_ = frame.f_globals - if _locs_ is None: - _locs_ = frame.f_locals - del frame - elif _locs_ is None: - _locs_ = _globs_ - exec("""exec _code_ in _globs_, _locs_""") - - exec_("""def reraise(tp, value, tb=None): - try: - raise tp, value, tb - finally: - tb = None -""") - - -if sys.version_info[:2] == (3, 2): - exec_("""def raise_from(value, from_value): - try: - if from_value is None: - raise value - raise value from from_value - finally: - value = None -""") -elif sys.version_info[:2] > (3, 2): - exec_("""def raise_from(value, from_value): - try: - raise value from from_value - finally: - value = None -""") -else: - def raise_from(value, from_value): - raise value - - -print_ = getattr(moves.builtins, "print", None) -if print_ is None: - def print_(*args, **kwargs): - """The new-style print function for Python 2.4 and 2.5.""" - fp = kwargs.pop("file", sys.stdout) - if fp is None: - return - - def write(data): - if not isinstance(data, basestring): - data = str(data) - # If the file has an encoding, encode unicode with it. - if (isinstance(fp, file) and - isinstance(data, unicode) and - fp.encoding is not None): - errors = getattr(fp, "errors", None) - if errors is None: - errors = "strict" - data = data.encode(fp.encoding, errors) - fp.write(data) - want_unicode = False - sep = kwargs.pop("sep", None) - if sep is not None: - if isinstance(sep, unicode): - want_unicode = True - elif not isinstance(sep, str): - raise TypeError("sep must be None or a string") - end = kwargs.pop("end", None) - if end is not None: - if isinstance(end, unicode): - want_unicode = True - elif not isinstance(end, str): - raise TypeError("end must be None or a string") - if kwargs: - raise TypeError("invalid keyword arguments to print()") - if not want_unicode: - for arg in args: - if isinstance(arg, unicode): - want_unicode = True - break - if want_unicode: - newline = unicode("\n") - space = unicode(" ") - else: - newline = "\n" - space = " " - if sep is None: - sep = space - if end is None: - end = newline - for i, arg in enumerate(args): - if i: - write(sep) - write(arg) - write(end) -if sys.version_info[:2] < (3, 3): - _print = print_ - - def print_(*args, **kwargs): - fp = kwargs.get("file", sys.stdout) - flush = kwargs.pop("flush", False) - _print(*args, **kwargs) - if flush and fp is not None: - fp.flush() - -_add_doc(reraise, """Reraise an exception.""") - -if sys.version_info[0:2] < (3, 4): - def wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS, - updated=functools.WRAPPER_UPDATES): - def wrapper(f): - f = functools.wraps(wrapped, assigned, updated)(f) - f.__wrapped__ = wrapped - return f - return wrapper -else: - wraps = functools.wraps - - -def with_metaclass(meta, *bases): - """Create a base class with a metaclass.""" - # This requires a bit of explanation: the basic idea is to make a dummy - # metaclass for one level of class instantiation that replaces itself with - # the actual metaclass. - class metaclass(type): - - def __new__(cls, name, this_bases, d): - return meta(name, bases, d) - - @classmethod - def __prepare__(cls, name, this_bases): - return meta.__prepare__(name, bases) - return type.__new__(metaclass, 'temporary_class', (), {}) - - -def add_metaclass(metaclass): - """Class decorator for creating a class with a metaclass.""" - def wrapper(cls): - orig_vars = cls.__dict__.copy() - slots = orig_vars.get('__slots__') - if slots is not None: - if isinstance(slots, str): - slots = [slots] - for slots_var in slots: - orig_vars.pop(slots_var) - orig_vars.pop('__dict__', None) - orig_vars.pop('__weakref__', None) - return metaclass(cls.__name__, cls.__bases__, orig_vars) - return wrapper - - -def python_2_unicode_compatible(klass): - """ - A decorator that defines __unicode__ and __str__ methods under Python 2. - Under Python 3 it does nothing. - - To support Python 2 and 3 with a single code base, define a __str__ method - returning text and apply this decorator to the class. - """ - if PY2: - if '__str__' not in klass.__dict__: - raise ValueError("@python_2_unicode_compatible cannot be applied " - "to %s because it doesn't define __str__()." % - klass.__name__) - klass.__unicode__ = klass.__str__ - klass.__str__ = lambda self: self.__unicode__().encode('utf-8') - return klass - - -# Complete the moves implementation. -# This code is at the end of this module to speed up module loading. -# Turn this module into a package. -__path__ = [] # required for PEP 302 and PEP 451 -__package__ = __name__ # see PEP 366 @ReservedAssignment -if globals().get("__spec__") is not None: - __spec__.submodule_search_locations = [] # PEP 451 @UndefinedVariable -# Remove other six meta path importers, since they cause problems. This can -# happen if six is removed from sys.modules and then reloaded. (Setuptools does -# this for some reason.) -if sys.meta_path: - for i, importer in enumerate(sys.meta_path): - # Here's some real nastiness: Another "instance" of the six module might - # be floating around. Therefore, we can't use isinstance() to check for - # the six meta path importer, since the other six instance will have - # inserted an importer with different class. - if (type(importer).__name__ == "_SixMetaPathImporter" and - importer.name == __name__): - del sys.meta_path[i] - break - del i, importer -# Finally, add the importer to the meta path import hook. -sys.meta_path.append(_importer) diff --git a/kafka/vendor/socketpair.py b/kafka/vendor/socketpair.py deleted file mode 100644 index b55e629e..00000000 --- a/kafka/vendor/socketpair.py +++ /dev/null @@ -1,58 +0,0 @@ -# pylint: skip-file -# vendored from https://github.com/mhils/backports.socketpair -from __future__ import absolute_import - -import sys -import socket -import errno - -_LOCALHOST = '127.0.0.1' -_LOCALHOST_V6 = '::1' - -if not hasattr(socket, "socketpair"): - # Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain. - def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): - if family == socket.AF_INET: - host = _LOCALHOST - elif family == socket.AF_INET6: - host = _LOCALHOST_V6 - else: - raise ValueError("Only AF_INET and AF_INET6 socket address families " - "are supported") - if type != socket.SOCK_STREAM: - raise ValueError("Only SOCK_STREAM socket type is supported") - if proto != 0: - raise ValueError("Only protocol zero is supported") - - # We create a connected TCP socket. Note the trick with - # setblocking(False) that prevents us from having to create a thread. - lsock = socket.socket(family, type, proto) - try: - lsock.bind((host, 0)) - lsock.listen(min(socket.SOMAXCONN, 128)) - # On IPv6, ignore flow_info and scope_id - addr, port = lsock.getsockname()[:2] - csock = socket.socket(family, type, proto) - try: - csock.setblocking(False) - if sys.version_info >= (3, 0): - try: - csock.connect((addr, port)) - except (BlockingIOError, InterruptedError): - pass - else: - try: - csock.connect((addr, port)) - except socket.error as e: - if e.errno != errno.WSAEWOULDBLOCK: - raise - csock.setblocking(True) - ssock, _ = lsock.accept() - except Exception: - csock.close() - raise - finally: - lsock.close() - return (ssock, csock) - - socket.socketpair = socketpair diff --git a/setup.py b/setup.py index fc69494b..07d9710c 100644 --- a/setup.py +++ b/setup.py @@ -172,7 +172,7 @@ def read_version(): }, download_url="https://pypi.python.org/pypi/aiokafka", license="Apache 2", - packages=["aiokafka", "kafka"], + packages=["aiokafka"], python_requires=">=3.8", install_requires=install_requires, extras_require=extras_require, diff --git a/tests/kafka/__init__.py b/tests/kafka/__init__.py deleted file mode 100644 index 329277dc..00000000 --- a/tests/kafka/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from __future__ import absolute_import - -# Set default logging handler to avoid "No handler found" warnings. -import logging -logging.basicConfig(level=logging.INFO) - -from kafka.future import Future -Future.error_on_callbacks = True # always fail during testing diff --git a/tests/kafka/conftest.py b/tests/kafka/conftest.py deleted file mode 100644 index 04aec4b8..00000000 --- a/tests/kafka/conftest.py +++ /dev/null @@ -1,140 +0,0 @@ -from __future__ import absolute_import - -import uuid - -import pytest - -from tests.kafka.testutil import env_kafka_version, random_string -from tests.kafka.fixtures import KafkaFixture, ZookeeperFixture - -@pytest.fixture(scope="module") -def zookeeper(): - """Return a Zookeeper fixture""" - zk_instance = ZookeeperFixture.instance() - yield zk_instance - zk_instance.close() - - -@pytest.fixture(scope="module") -def kafka_broker(kafka_broker_factory): - """Return a Kafka broker fixture""" - return kafka_broker_factory()[0] - - -@pytest.fixture(scope="module") -def kafka_broker_factory(zookeeper): - """Return a Kafka broker fixture factory""" - assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests' - - _brokers = [] - def factory(**broker_params): - params = {} if broker_params is None else broker_params.copy() - params.setdefault('partitions', 4) - num_brokers = params.pop('num_brokers', 1) - brokers = tuple(KafkaFixture.instance(x, zookeeper, **params) - for x in range(num_brokers)) - _brokers.extend(brokers) - return brokers - - yield factory - - for broker in _brokers: - broker.close() - - -@pytest.fixture -def kafka_consumer(kafka_consumer_factory): - """Return a KafkaConsumer fixture""" - return kafka_consumer_factory() - - -@pytest.fixture -def kafka_consumer_factory(kafka_broker, topic, request): - """Return a KafkaConsumer factory fixture""" - _consumer = [None] - - def factory(**kafka_consumer_params): - params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() - params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) - params.setdefault('auto_offset_reset', 'earliest') - _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) - return _consumer[0] - - yield factory - - if _consumer[0]: - _consumer[0].close() - - -@pytest.fixture -def kafka_producer(kafka_producer_factory): - """Return a KafkaProducer fixture""" - yield kafka_producer_factory() - - -@pytest.fixture -def kafka_producer_factory(kafka_broker, request): - """Return a KafkaProduce factory fixture""" - _producer = [None] - - def factory(**kafka_producer_params): - params = {} if kafka_producer_params is None else kafka_producer_params.copy() - params.setdefault('client_id', 'producer_%s' % (request.node.name,)) - _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) - return _producer[0] - - yield factory - - if _producer[0]: - _producer[0].close() - -@pytest.fixture -def kafka_admin_client(kafka_admin_client_factory): - """Return a KafkaAdminClient fixture""" - yield kafka_admin_client_factory() - -@pytest.fixture -def kafka_admin_client_factory(kafka_broker): - """Return a KafkaAdminClient factory fixture""" - _admin_client = [None] - - def factory(**kafka_admin_client_params): - params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy() - _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params)) - return _admin_client[0] - - yield factory - - if _admin_client[0]: - _admin_client[0].close() - -@pytest.fixture -def topic(kafka_broker, request): - """Return a topic fixture""" - topic_name = '%s_%s' % (request.node.name, random_string(10)) - kafka_broker.create_topics([topic_name]) - return topic_name - - -@pytest.fixture() -def send_messages(topic, kafka_producer, request): - """A factory that returns a send_messages function with a pre-populated - topic topic / producer.""" - - def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): - """ - messages is typically `range(0,100)` - partition is an int - """ - messages_and_futures = [] # [(message, produce_future),] - for i in number_range: - # request.node.name provides the test name (including parametrized values) - encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') - future = kafka_producer.send(topic, value=encoded_msg, partition=partition) - messages_and_futures.append((encoded_msg, future)) - kafka_producer.flush() - for (msg, f) in messages_and_futures: - assert f.succeeded() - return [msg for (msg, f) in messages_and_futures] - - return _send_messages diff --git a/tests/kafka/fixtures.py b/tests/kafka/fixtures.py deleted file mode 100644 index b6854e54..00000000 --- a/tests/kafka/fixtures.py +++ /dev/null @@ -1,651 +0,0 @@ -from __future__ import absolute_import - -import atexit -import logging -import os -import os.path -import socket -import subprocess -import time -import uuid - -import py -from kafka.vendor.six.moves import urllib, range -from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 - -from aiokafka import errors -from aiokafka.errors import InvalidReplicationFactorError -from aiokafka.protocol.admin import CreateTopicsRequest -from aiokafka.protocol.metadata import MetadataRequest -from tests.kafka.testutil import env_kafka_version, random_string -from tests.kafka.service import ExternalService, SpawnedService - -log = logging.getLogger(__name__) - - -def get_open_port(): - sock = socket.socket() - sock.bind(("127.0.0.1", 0)) - port = sock.getsockname()[1] - sock.close() - return port - - -def gen_ssl_resources(directory): - os.system(""" - cd {0} - echo Generating SSL resources in {0} - - # Step 1 - keytool -keystore kafka.server.keystore.jks -alias localhost -validity 1 \ - -genkey -storepass foobar -keypass foobar \ - -dname "CN=localhost, OU=kafka-python, O=kafka-python, L=SF, ST=CA, C=US" \ - -ext SAN=dns:localhost - - # Step 2 - openssl genrsa -out ca-key 2048 - openssl req -new -x509 -key ca-key -out ca-cert -days 1 \ - -subj "/C=US/ST=CA/O=MyOrg, Inc./CN=mydomain.com" - keytool -keystore kafka.server.truststore.jks -alias CARoot -import \ - -file ca-cert -storepass foobar -noprompt - - # Step 3 - keytool -keystore kafka.server.keystore.jks -alias localhost -certreq \ - -file cert-file -storepass foobar - openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed \ - -days 1 -CAcreateserial -passin pass:foobar - keytool -keystore kafka.server.keystore.jks -alias CARoot -import \ - -file ca-cert -storepass foobar -noprompt - keytool -keystore kafka.server.keystore.jks -alias localhost -import \ - -file cert-signed -storepass foobar -noprompt - """.format(directory)) - - -class Fixture(object): - kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2') - scala_version = os.environ.get("SCALA_VERSION", '2.8.0') - project_root = os.environ.get('PROJECT_ROOT', - os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - kafka_root = os.environ.get("KAFKA_ROOT", - os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) - - def __init__(self): - self.child = None - - @classmethod - def download_official_distribution(cls, - kafka_version=None, - scala_version=None, - output_dir=None): - if not kafka_version: - kafka_version = cls.kafka_version - if not scala_version: - scala_version = cls.scala_version - if not output_dir: - output_dir = os.path.join(cls.project_root, 'servers', 'dist') - - distfile = 'kafka_%s-%s' % (scala_version, kafka_version,) - url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,) - output_file = os.path.join(output_dir, distfile + '.tgz') - - if os.path.isfile(output_file): - log.info("Found file already on disk: %s", output_file) - return output_file - - # New tarballs are .tgz, older ones are sometimes .tar.gz - try: - url = url_base + distfile + '.tgz' - log.info("Attempting to download %s", url) - response = urllib.request.urlopen(url) - except urllib.error.HTTPError: - log.exception("HTTP Error") - url = url_base + distfile + '.tar.gz' - log.info("Attempting to download %s", url) - response = urllib.request.urlopen(url) - - log.info("Saving distribution file to %s", output_file) - with open(output_file, 'w') as output_file_fd: - output_file_fd.write(response.read()) - - return output_file - - @classmethod - def test_resource(cls, filename): - return os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename) - - @classmethod - def kafka_run_class_args(cls, *args): - result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] - result.extend([str(arg) for arg in args]) - return result - - def kafka_run_class_env(self): - env = os.environ.copy() - env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \ - (self.test_resource("log4j.properties"),) - return env - - @classmethod - def render_template(cls, source_file, target_file, binding): - log.info('Rendering %s from template %s', target_file.strpath, source_file) - with open(source_file, "r") as handle: - template = handle.read() - assert len(template) > 0, 'Empty template %s' % (source_file,) - with open(target_file.strpath, "w") as handle: - handle.write(template.format(**binding)) - handle.flush() - os.fsync(handle) - - # fsync directory for durability - # https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/ - dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) - os.fsync(dirfd) - os.close(dirfd) - log.debug("Template string:") - for line in template.splitlines(): - log.debug(' ' + line.strip()) - log.debug("Rendered template:") - with open(target_file.strpath, 'r') as o: - for line in o: - log.debug(' ' + line.strip()) - log.debug("binding:") - for key, value in binding.items(): - log.debug(" {key}={value}".format(key=key, value=value)) - - def dump_logs(self): - self.child.dump_logs() - - -class ZookeeperFixture(Fixture): - @classmethod - def instance(cls): - if "ZOOKEEPER_URI" in os.environ: - parse = urlparse(os.environ["ZOOKEEPER_URI"]) - (host, port) = (parse.hostname, parse.port) - fixture = ExternalService(host, port) - else: - (host, port) = ("127.0.0.1", None) - fixture = cls(host, port) - - fixture.open() - return fixture - - def __init__(self, host, port, tmp_dir=None): - super(ZookeeperFixture, self).__init__() - self.host = host - self.port = port - - self.tmp_dir = tmp_dir - - def kafka_run_class_env(self): - env = super(ZookeeperFixture, self).kafka_run_class_env() - env['LOG_DIR'] = self.tmp_dir.join('logs').strpath - return env - - def out(self, message): - log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message) - - def open(self): - if self.tmp_dir is None: - self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member - self.tmp_dir.ensure(dir=True) - - self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" tmp_dir = %s", self.tmp_dir.strpath) - - # Configure Zookeeper child process - template = self.test_resource("zookeeper.properties") - properties = self.tmp_dir.join("zookeeper.properties") - args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", - properties.strpath) - env = self.kafka_run_class_env() - - # Party! - timeout = 5 - max_timeout = 120 - backoff = 1 - end_at = time.time() + max_timeout - tries = 1 - auto_port = (self.port is None) - while time.time() < end_at: - if auto_port: - self.port = get_open_port() - self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) - self.render_template(template, properties, vars(self)) - self.child = SpawnedService(args, env) - self.child.start() - timeout = min(timeout, max(end_at - time.time(), 0)) - if self.child.wait_for(r"binding to port", timeout=timeout): - break - self.child.dump_logs() - self.child.stop() - timeout *= 2 - time.sleep(backoff) - tries += 1 - backoff += 1 - else: - raise RuntimeError('Failed to start Zookeeper before max_timeout') - self.out("Done!") - atexit.register(self.close) - - def close(self): - if self.child is None: - return - self.out("Stopping...") - self.child.stop() - self.child = None - self.out("Done!") - self.tmp_dir.remove() - - def __del__(self): - self.close() - - -class KafkaFixture(Fixture): - broker_user = 'alice' - broker_password = 'alice-secret' - - @classmethod - def instance(cls, broker_id, zookeeper, zk_chroot=None, - host=None, port=None, - transport='PLAINTEXT', replicas=1, partitions=2, - sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): - - if zk_chroot is None: - zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") - if "KAFKA_URI" in os.environ: - parse = urlparse(os.environ["KAFKA_URI"]) - (host, port) = (parse.hostname, parse.port) - fixture = ExternalService(host, port) - else: - if host is None: - host = "localhost" - fixture = KafkaFixture(host, port, broker_id, - zookeeper, zk_chroot, - transport=transport, - replicas=replicas, partitions=partitions, - sasl_mechanism=sasl_mechanism, - auto_create_topic=auto_create_topic, - tmp_dir=tmp_dir) - - fixture.open() - return fixture - - def __init__(self, host, port, broker_id, zookeeper, zk_chroot, - replicas=1, partitions=2, transport='PLAINTEXT', - sasl_mechanism=None, auto_create_topic=True, - tmp_dir=None): - super(KafkaFixture, self).__init__() - - self.host = host - self.port = port - - self.broker_id = broker_id - self.auto_create_topic = auto_create_topic - self.transport = transport.upper() - if sasl_mechanism is not None: - self.sasl_mechanism = sasl_mechanism.upper() - else: - self.sasl_mechanism = None - self.ssl_dir = self.test_resource('ssl') - - # TODO: checking for port connection would be better than scanning logs - # until then, we need the pattern to work across all supported broker versions - # The logging format changed slightly in 1.0.0 - self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) - # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism - self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user) - - self.zookeeper = zookeeper - self.zk_chroot = zk_chroot - # Add the attributes below for the template binding - self.zk_host = self.zookeeper.host - self.zk_port = self.zookeeper.port - - self.replicas = replicas - self.partitions = partitions - - self.tmp_dir = tmp_dir - self.running = False - - self._client = None - self.sasl_config = '' - self.jaas_config = '' - - def _sasl_config(self): - if not self.sasl_enabled: - return '' - - sasl_config = ( - 'sasl.enabled.mechanisms={mechanism}\n' - 'sasl.mechanism.inter.broker.protocol={mechanism}\n' - ) - return sasl_config.format(mechanism=self.sasl_mechanism) - - def _jaas_config(self): - if not self.sasl_enabled: - return '' - - elif self.sasl_mechanism == 'PLAIN': - jaas_config = ( - 'org.apache.kafka.common.security.plain.PlainLoginModule required\n' - ' username="{user}" password="{password}" user_{user}="{password}";\n' - ) - elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"): - jaas_config = ( - 'org.apache.kafka.common.security.scram.ScramLoginModule required\n' - ' username="{user}" password="{password}";\n' - ) - else: - raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) - return jaas_config.format(user=self.broker_user, password=self.broker_password) - - def _add_scram_user(self): - self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user)) - args = self.kafka_run_class_args( - "kafka.admin.ConfigCommand", - "--zookeeper", - "%s:%d/%s" % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - "--alter", - "--entity-type", "users", - "--entity-name", self.broker_user, - "--add-config", - "{}=[password={}]".format(self.sasl_mechanism, self.broker_password), - ) - env = self.kafka_run_class_env() - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - stdout, stderr = proc.communicate() - - if proc.returncode != 0: - self.out("Failed to save credentials to zookeeper!") - self.out(stdout) - self.out(stderr) - raise RuntimeError("Failed to save credentials to zookeeper!") - self.out("User created.") - - @property - def sasl_enabled(self): - return self.sasl_mechanism is not None - - def bootstrap_server(self): - return '%s:%d' % (self.host, self.port) - - def kafka_run_class_env(self): - env = super(KafkaFixture, self).kafka_run_class_env() - env['LOG_DIR'] = self.tmp_dir.join('logs').strpath - return env - - def out(self, message): - log.info("*** Kafka [%s:%s]: %s", self.host, self.port or '(auto)', message) - - def _create_zk_chroot(self): - self.out("Creating Zookeeper chroot node...") - args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", - "-server", - "%s:%d" % (self.zookeeper.host, - self.zookeeper.port), - "create", - "/%s" % (self.zk_chroot,), - "kafka-python") - env = self.kafka_run_class_env() - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - stdout, stderr = proc.communicate() - - if proc.returncode != 0: - self.out("Failed to create Zookeeper chroot node") - self.out(stdout) - self.out(stderr) - raise RuntimeError("Failed to create Zookeeper chroot node") - self.out("Kafka chroot created in Zookeeper!") - - def start(self): - # Configure Kafka child process - properties = self.tmp_dir.join("kafka.properties") - jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") - properties_template = self.test_resource("kafka.properties") - jaas_conf_template = self.test_resource("kafka_server_jaas.conf") - - args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) - env = self.kafka_run_class_env() - if self.sasl_enabled: - opts = env.get('KAFKA_OPTS', '').strip() - opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath) - env['KAFKA_OPTS'] = opts - self.render_template(jaas_conf_template, jaas_conf, vars(self)) - - timeout = 5 - max_timeout = 120 - backoff = 1 - end_at = time.time() + max_timeout - tries = 1 - auto_port = (self.port is None) - while time.time() < end_at: - # We have had problems with port conflicts on travis - # so we will try a different port on each retry - # unless the fixture was passed a specific port - if auto_port: - self.port = get_open_port() - self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) - self.render_template(properties_template, properties, vars(self)) - - self.child = SpawnedService(args, env) - self.child.start() - timeout = min(timeout, max(end_at - time.time(), 0)) - if self._broker_ready(timeout) and self._scram_user_present(timeout): - break - - self.child.dump_logs() - self.child.stop() - - timeout *= 2 - time.sleep(backoff) - tries += 1 - backoff += 1 - else: - raise RuntimeError('Failed to start KafkaInstance before max_timeout') - - (self._client,) = self.get_clients(1, client_id='_internal_client') - - self.out("Done!") - self.running = True - - def _broker_ready(self, timeout): - return self.child.wait_for(self.start_pattern, timeout=timeout) - - def _scram_user_present(self, timeout): - # no need to wait for scram user if scram is not used - if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'): - return True - return self.child.wait_for(self.scram_pattern, timeout=timeout) - - def open(self): - if self.running: - self.out("Instance already running") - return - - # Create directories - if self.tmp_dir is None: - self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member - self.tmp_dir.ensure(dir=True) - self.tmp_dir.ensure('logs', dir=True) - self.tmp_dir.ensure('data', dir=True) - - self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" transport = %s", self.transport) - log.info(" sasl_mechanism = %s", self.sasl_mechanism) - log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zookeeper.host) - log.info(" zk_port = %s", self.zookeeper.port) - log.info(" zk_chroot = %s", self.zk_chroot) - log.info(" replicas = %s", self.replicas) - log.info(" partitions = %s", self.partitions) - log.info(" tmp_dir = %s", self.tmp_dir.strpath) - - self._create_zk_chroot() - self.sasl_config = self._sasl_config() - self.jaas_config = self._jaas_config() - # add user to zookeeper for the first server - if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: - self._add_scram_user() - self.start() - - atexit.register(self.close) - - def __del__(self): - self.close() - - def stop(self): - if not self.running: - self.out("Instance already stopped") - return - - self.out("Stopping...") - self.child.stop() - self.child = None - self.running = False - self.out("Stopped!") - - def close(self): - self.stop() - if self.tmp_dir is not None: - self.tmp_dir.remove() - self.tmp_dir = None - self.out("Done!") - - def dump_logs(self): - super(KafkaFixture, self).dump_logs() - self.zookeeper.dump_logs() - - def _send_request(self, request, timeout=None): - def _failure(error): - raise error - retries = 10 - while True: - node_id = self._client.least_loaded_node() - for connect_retry in range(40): - self._client.maybe_connect(node_id) - if self._client.connected(node_id): - break - self._client.poll(timeout_ms=100) - else: - raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) - - try: - future = self._client.send(node_id, request) - future.error_on_callbacks = True - future.add_errback(_failure) - self._client.poll(future=future, timeout_ms=timeout) - return future.value - except Exception as exc: - time.sleep(1) - retries -= 1 - if retries == 0: - raise exc - else: - pass # retry - - def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000): - if num_partitions is None: - num_partitions = self.partitions - if replication_factor is None: - replication_factor = self.replicas - - # Try different methods to create a topic, from the fastest to the slowest - if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas: - self._create_topic_via_metadata(topic_name, timeout_ms) - elif env_kafka_version() >= (0, 10, 1, 0): - try: - self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) - except InvalidReplicationFactorError: - # wait and try again - # on travis the brokers sometimes take a while to find themselves - time.sleep(0.5) - self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) - else: - self._create_topic_via_cli(topic_name, num_partitions, replication_factor) - - def _create_topic_via_metadata(self, topic_name, timeout_ms=10000): - self._send_request(MetadataRequest[0]([topic_name]), timeout_ms) - - def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): - request = CreateTopicsRequest[0]([(topic_name, num_partitions, - replication_factor, [], [])], timeout_ms) - response = self._send_request(request, timeout=timeout_ms) - for topic_result in response.topic_errors: - error_code = topic_result[1] - if error_code != 0: - raise errors.for_code(error_code) - - def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): - args = self.kafka_run_class_args('kafka.admin.TopicCommand', - '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - '--create', - '--topic', topic_name, - '--partitions', self.partitions \ - if num_partitions is None else num_partitions, - '--replication-factor', self.replicas \ - if replication_factor is None \ - else replication_factor) - if env_kafka_version() >= (0, 10): - args.append('--if-not-exists') - env = self.kafka_run_class_env() - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - if 'kafka.common.TopicExistsException' not in stdout: - self.out("Failed to create topic %s" % (topic_name,)) - self.out(stdout) - self.out(stderr) - raise RuntimeError("Failed to create topic %s" % (topic_name,)) - - def get_topic_names(self): - args = self.kafka_run_class_args('kafka.admin.TopicCommand', - '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - '--list' - ) - env = self.kafka_run_class_env() - env.pop('KAFKA_LOG4J_OPTS') - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - self.out("Failed to list topics!") - self.out(stdout) - self.out(stderr) - raise RuntimeError("Failed to list topics!") - return stdout.decode().splitlines(False) - - def create_topics(self, topic_names, num_partitions=None, replication_factor=None): - for topic_name in topic_names: - self._create_topic(topic_name, num_partitions, replication_factor) - - def _enrich_client_params(self, params, **defaults): - params = params.copy() - for key, value in defaults.items(): - params.setdefault(key, value) - params.setdefault('bootstrap_servers', self.bootstrap_server()) - if self.sasl_enabled: - params.setdefault('sasl_mechanism', self.sasl_mechanism) - params.setdefault('security_protocol', self.transport) - if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): - params.setdefault('sasl_plain_username', self.broker_user) - params.setdefault('sasl_plain_password', self.broker_password) - return params - - @staticmethod - def _create_many_clients(cnt, cls, *args, **params): - client_id = params['client_id'] - for _ in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield cls(*args, **params) diff --git a/tests/kafka/service.py b/tests/kafka/service.py deleted file mode 100644 index 045d780e..00000000 --- a/tests/kafka/service.py +++ /dev/null @@ -1,133 +0,0 @@ -from __future__ import absolute_import - -import logging -import os -import re -import select -import subprocess -import sys -import threading -import time - -__all__ = [ - 'ExternalService', - 'SpawnedService', -] - -log = logging.getLogger(__name__) - - -class ExternalService(object): - def __init__(self, host, port): - log.info("Using already running service at %s:%d", host, port) - self.host = host - self.port = port - - def open(self): - pass - - def close(self): - pass - - -class SpawnedService(threading.Thread): - def __init__(self, args=None, env=None): - super(SpawnedService, self).__init__() - - if args is None: - raise TypeError("args parameter is required") - self.args = args - self.env = env - self.captured_stdout = [] - self.captured_stderr = [] - - self.should_die = threading.Event() - self.child = None - self.alive = False - self.daemon = True - log.info("Created service for command:") - log.info(" "+' '.join(self.args)) - log.debug("With environment:") - for key, value in self.env.items(): - log.debug(" {key}={value}".format(key=key, value=value)) - - def _spawn(self): - if self.alive: return - if self.child and self.child.poll() is None: return - - self.child = subprocess.Popen( - self.args, - preexec_fn=os.setsid, # to avoid propagating signals - env=self.env, - bufsize=1, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.alive = self.child.poll() is None - - def _despawn(self): - if self.child.poll() is None: - self.child.terminate() - self.alive = False - for _ in range(50): - if self.child.poll() is not None: - self.child = None - break - time.sleep(0.1) - else: - self.child.kill() - - def run(self): - self._spawn() - while True: - try: - (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) - except select.error as ex: - if ex.args[0] == 4: - continue - else: - raise - - if self.child.stdout in rds: - line = self.child.stdout.readline().decode('utf-8').rstrip() - if line: - self.captured_stdout.append(line) - - if self.child.stderr in rds: - line = self.child.stderr.readline().decode('utf-8').rstrip() - if line: - self.captured_stderr.append(line) - - if self.child.poll() is not None: - self.dump_logs() - break - - if self.should_die.is_set(): - self._despawn() - break - - def dump_logs(self): - sys.stderr.write('\n'.join(self.captured_stderr)) - sys.stdout.write('\n'.join(self.captured_stdout)) - - def wait_for(self, pattern, timeout=30): - start = time.time() - while True: - if not self.is_alive(): - raise RuntimeError("Child thread died already.") - - elapsed = time.time() - start - if elapsed >= timeout: - log.error("Waiting for %r timed out after %d seconds", pattern, timeout) - return False - - if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None: - log.info("Found pattern %r in %d seconds via stdout", pattern, elapsed) - return True - if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None: - log.info("Found pattern %r in %d seconds via stderr", pattern, elapsed) - return True - time.sleep(0.1) - - def stop(self): - self.should_die.set() - self.join() diff --git a/tests/kafka/testutil.py b/tests/kafka/testutil.py deleted file mode 100644 index ec4d70bf..00000000 --- a/tests/kafka/testutil.py +++ /dev/null @@ -1,46 +0,0 @@ -from __future__ import absolute_import - -import os -import random -import re -import string -import time - - -def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')): - return _matcher.sub('_', string) - - -def random_string(length): - return "".join(random.choice(string.ascii_letters) for i in range(length)) - - -def env_kafka_version(): - """Return the Kafka version set in the OS environment as a tuple. - - Example: '0.8.1.1' --> (0, 8, 1, 1) - """ - if 'KAFKA_VERSION' not in os.environ: - return () - return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) - - -def assert_message_count(messages, num_messages): - """Check that we received the expected number of messages with no duplicates.""" - # Make sure we got them all - assert len(messages) == num_messages - # Make sure there are no duplicates - # Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers, - # timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes. - unique_messages = {(m.key, m.value) for m in messages} - assert len(unique_messages) == num_messages - - -class Timer(object): - def __enter__(self): - self.start = time.time() - return self - - def __exit__(self, *args): - self.end = time.time() - self.interval = self.end - self.start