diff --git a/Dockerfile b/Dockerfile index a22f7d8..fe7be91 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM debian:9.5 -LABEL maintainer="Kyle Wilcox " +LABEL MAINTAINER="Kyle Wilcox " ENV DEBIAN_FRONTEND noninteractive ENV LANG C.UTF-8 @@ -16,30 +16,32 @@ RUN apt-get update && apt-get install -y \ wget \ && \ apt-get clean && \ - rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* /var/cache/oracle-jdk8-installer - -# Copy over environment definition -COPY environment.yml /tmp/environment.yml + rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* # Setup CONDA (https://hub.docker.com/r/continuumio/miniconda3/~/dockerfile/) -ENV MINICONDA_VERSION latest -RUN echo 'export PATH=/opt/conda/bin:$PATH' > /etc/profile.d/conda.sh && \ - curl -k -o /miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-$MINICONDA_VERSION-Linux-x86_64.sh && \ +ENV MINICONDA_VERSION py38_4.8.2 +ENV MINICONDA_SHA256 5bbb193fd201ebe25f4aeb3c58ba83feced6a25982ef4afa86d5506c3656c142 +RUN curl -k -o /miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-$MINICONDA_VERSION-Linux-x86_64.sh && \ + echo $MINICONDA_SHA256 /miniconda.sh | sha256sum --check && \ /bin/bash /miniconda.sh -b -p /opt/conda && \ rm /miniconda.sh && \ - /opt/conda/bin/conda config \ - --set always_yes yes \ - --set changeps1 no \ - --set show_channel_urls True \ - && \ - /opt/conda/bin/conda env update -n root --file /tmp/environment.yml && \ - /opt/conda/bin/conda clean -a -y + /opt/conda/bin/conda update -c conda-forge -n base conda && \ + /opt/conda/bin/conda clean -afy && \ + /opt/conda/bin/conda init && \ + find /opt/conda/ -follow -type f -name '*.a' -delete && \ + find /opt/conda/ -follow -type f -name '*.js.map' -delete && \ + /opt/conda/bin/conda install -y -c conda-forge -n base mamba pip && \ + /opt/conda/bin/conda clean -afy ENV PATH /opt/conda/bin:$PATH -# Copy packrat contents and install -ENV CODE_HOME /easyavro -WORKDIR $CODE_HOME -COPY . $CODE_HOME +COPY environment.yml /tmp/ +RUN mamba env create -n runenv -f /tmp/environment.yml && \ + echo "conda activate runenv" >> /root/.bashrc + +ENV PROJECT_ROOT /code +RUN mkdir -p "$PROJECT_ROOT" +COPY . $PROJECT_ROOT +WORKDIR $PROJECT_ROOT -CMD ["py.test", "-s", "-rxs", "-v"] +CMD ["conda", "run", "-n", "runenv", "--no-capture-output", "pytest", "-s", "-rxs", "-v"] diff --git a/README.md b/README.md index 5f8d9e0..8d35512 100644 --- a/README.md +++ b/README.md @@ -171,14 +171,16 @@ The defaults are sane. They will pull offsets from the broker and set the topic If you need to override any kafka level parameters, you may use the the `kafka_conf` (`dict`) initialization parameter on `Consumer`. It will override any of the defaults the `Consumer` uses. See the documentation for the `config` parameter to [`Consumer`](https://docs.confluent.io/current/clients/confluent-kafka-python/#consumer), [`AvroConsumer`](https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.avro.AvroConsumer) and the [list of librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties). -##### Parameters for `consume` function +##### Parameters for `start` function -* `on_receive` (`Callable[[str, str], None]`) - Function that is executed (in a new thread) for each retrieved message. +* `on_receive` (`Callable[str, str]` or `Callable[List[Message]]`) - Function that is executed (in a new thread) for each retrieved message. * `on_receive_timeout` (`int`) - Seconds the `Consumer` will wait for the calls to `on_receive` to exit before moving on. By default it will wait forever. You should set this to a reasonable maximum number seconds your `on_receive` callback will take to prevent dead-lock when the `Consumer` is exiting and trying to cleanup its spawned threads. * `timeout` (`int`) - The `timeout` parameter to the `poll` function in `confluent-kafka`. Controls how long `poll` will block while waiting for messages. * `loop` (`bool`) - If the `Consumer` will keep looping for message or break after retrieving the first chunk message. This is useful when testing. * `initial_wait` (`int`)- Seconds the Consumer should wait before starting to consume. This is useful when testing. * `cleanup_every` (`int`) - Try to cleanup spawned thread after this many messages. +* `num_messages` (`int`) - Consume this many messages from the topic at once. This can improve throughput when dealing with high-volume topics that can benefit from processing many messages at once. +* `receive_messages_in_callback` (`bool`) - Instead of calling the `on_receive` callback with key/value pairs, call it with `confluent_kafka.Message` objects. This requires the user to call `message.key()` and `message.value()` on each. This gives the user access to other message attributes like `message.topic()` in the callback. **Setting this parameter to `True` is recommended for any new code.** ```python from easyavro import EasyConsumer @@ -191,7 +193,7 @@ bc = EasyConsumer( consumer_group='easyavro.testing', kafka_topic='my-topic' ) -bc.consume(on_receive=on_receive) +bc.start(on_receive=on_receive) ``` Or pass in your own [kafka config](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties) dict. @@ -211,7 +213,7 @@ bc = EasyConsumer( 'offset.store.method': 'file' } ) -bc.consume(on_receive=on_receive) +bc.start(on_receive=on_receive) ``` Or pass in a value to use for the `auto.offset.reset` [topic config](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties) setting. @@ -228,7 +230,7 @@ bc = EasyConsumer( kafka_topic='my-topic', offset='earliest' ) -bc.consume(on_receive=on_receive) +bc.start(on_receive=on_receive) ``` #### EasyConsumer @@ -246,7 +248,7 @@ bc = EasyConsumer( consumer_group='easyavro.testing', kafka_topic='my-topic' ) -bc.consume(on_receive=on_receive) +bc.start(on_receive=on_receive) ``` You can unpack data as needed in the callback function @@ -263,13 +265,36 @@ bc = EasyConsumer( consumer_group='easyavro.testing', kafka_topic='my-topic' ) -bc.consume(on_receive=on_receive) +bc.start(on_receive=on_receive) ``` +You can receive a list of Message objects instead of key/value pairs -#### EasyAvroProducer +```python +import msgpack +from typing import List +from easyavro import EasyConsumer +from confluent_kafka import Message -If you are using a Confluent SchemaRegistry this helper exists to match your topic name to existing schemas in the registry. Your schemas must already be available in the schema registry as `[topic]-key` and `[topic]-value`. +def on_receive(messages: List[Message]) -> None: + for m in messages: + print( + "Got Message - Topic:{}\nKey:{}\nValue:{}\n".format(m.topic(), m.key(), m.value() + ) + +bc = EasyConsumer( + kafka_brokers=['localhost:4001'], + consumer_group='easyavro.testing', + kafka_topic='my-topic', +) +bc.start(on_receive=on_receive, num_messages=5, receive_messages_in_callback=True) +``` + + +#### EasyAvroConsumer + +If you are using a Confluent SchemaRegistry this helper exists to match your topic name to existing schemas in the registry. Your schemas must already be available in the schema registry as `[topic]-key` and `[topic]-value`. Pass the `schema_registry_url` parameter to EasyAvroConsumer and the rest is +taken care of. ```python from easyavro import EasyAvroConsumer @@ -278,11 +303,12 @@ def on_receive(key: str, value: str) -> None: print("Got Key:{}\nValue:{}\n".format(key, value)) bc = EasyAvroConsumer( + schema_registry_url='http://localhost:4002', kafka_brokers=['localhost:4001'], consumer_group='easyavro.testing', kafka_topic='my-topic' ) -bc.consume(on_receive=on_receive) +bc.start(on_receive=on_receive) ``` ## Testing @@ -320,5 +346,5 @@ docker run --net="host" easyavro ``` conda env create environment.yml -py.test -s -rxs -v +pytest -s -rxs -v ``` diff --git a/easyavro/__init__.py b/easyavro/__init__.py index e767e14..7c0ddd0 100644 --- a/easyavro/__init__.py +++ b/easyavro/__init__.py @@ -1,7 +1,6 @@ #!python # coding=utf-8 from .consumer import EasyAvroConsumer, EasyConsumer -from .multi_consumer import EasyMultiConsumer, EasyMultiAvroConsumer from .producer import EasyAvroProducer, EasyProducer, schema __version__ = "3.0.0" @@ -10,8 +9,6 @@ __all__ = [ 'EasyConsumer', 'EasyAvroConsumer', - 'EasyMultiConsumer', - 'EasyMultiAvroConsumer', 'EasyProducer', 'EasyAvroProducer', 'schema' diff --git a/easyavro/consumer.py b/easyavro/consumer.py index 741b9d9..fa4b7cd 100644 --- a/easyavro/consumer.py +++ b/easyavro/consumer.py @@ -17,24 +17,76 @@ class BaseConsumer: - def consume(self, - on_recieve: Callable[[str, str], None] = None, - on_receive: Callable[[str, str], None] = None, - on_recieve_timeout: int = None, - on_receive_timeout: int = None, - timeout: int = None, - loop: bool = True, - initial_wait: int = None, - cleanup_every: int = 1000 - ) -> None: - - if on_recieve is not None: - L.warning("Use `on_receive` (spelled correctly)") - on_receive = on_recieve - - if on_recieve_timeout is not None: - L.warning("Use `on_receive_timeout` (spelled correctly)") - on_receive_timeout = on_recieve_timeout + def deserialize(self, message): + # Apply any decoding, like from an AvroConsumer + if hasattr(self, '_serializer') and self._serializer: + try: + if message.value() is not None: + decoded_value = self._serializer.decode_message(message.value(), is_key=False) + message.set_value(decoded_value) + if message.key() is not None: + decoded_key = self._serializer.decode_message(message.key(), is_key=True) + message.set_key(decoded_key) + except SerializerError as e: + raise SerializerError( + "Message de-serialization failed for message at {} [{}] offset {}: {}".format( + message.topic(), + message.partition(), + message.offset(), + e + ) + ) + + return message + + def send(self, messages, on_receive, receive_messages_in_callback): + callbacks = [] + + if messages: + if receive_messages_in_callback is True: + # Call the function for each valid message + t = threading.Thread( + name='EasyAvro-on_receive', + target=on_receive, + args=(messages, ) + ) + t.start() + callbacks.append(t) + else: + for m in messages: + # Call the function for each valid message + t = threading.Thread( + name='EasyAvro-on_receive', + target=on_receive, + args=( m.key(), m.value() ) + ) + t.start() + callbacks.append(t) + + return callbacks + + def cleanup(self, callbacks, cleanup_every): + # Periodically clean up threads to prevent the list of callback_threads + # from becoming absolutely huge on long running Consumers + if len(callbacks) >= cleanup_every: + for x in callbacks: + x.join(0) + callbacks = [ + x for x in callbacks if x.is_alive() + ] + cleaned = cleanup_every - len(callbacks) + L.info('Cleaned up {} completed threads'.format(cleaned)) + + def start(self, + on_receive: Callable[[str, str], None] = None, + on_receive_timeout: int = None, + timeout: int = None, + loop: bool = True, + num_messages: int = 1, + receive_messages_in_callback: bool = False, + initial_wait: int = None, + cleanup_every: int = 1000 + ) -> None: if on_receive is None: def on_receive(k, v): @@ -51,44 +103,48 @@ def on_receive(k, v): loops += 1 callback_threads = [] - self.subscribe([self.kafka_topic]) + self.subscribe(self.kafka_topics) L.info("Starting consumer...") + + break_loop = False + try: while True: try: - msg = self.poll(timeout=timeout) - if msg is None: - continue - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - if loop is True: - continue + messages = self.consume(num_messages=num_messages, timeout=timeout) + + valid_messages = [] + for msg in messages: + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + if loop is True: + continue + else: + break_loop = True else: - break + L.error(msg.error()) else: - L.error(msg.error()) - else: - # Call the function we passed in if we consumed a valid message - t = threading.Thread( - name='EasyAvro-on_receive', - target=on_receive, - args=(msg.key(), msg.value()) - ) - t.start() - callback_threads.append(t) - except SerializerError as e: - L.warning('Message deserialization failed: {}"'.format(e)) + try: + msg = self.deserialize(msg) + except SerializerError as e: + L.warning('Message de-serialization failed: {}'.format(e)) + else: + valid_messages.append(msg) + + callback_threads += self.send( + valid_messages, + on_receive, + receive_messages_in_callback + ) + except KeyboardInterrupt: + raise finally: - # Periodically clean up threads to prevent the list of callback_threads - # from becoming absolutely huge on long running Consumers - if len(callback_threads) >= cleanup_every: - for x in callback_threads: - x.join(0) - callback_threads = [ - x for x in callback_threads if x.is_alive() - ] - cleaned = cleanup_every - len(callback_threads) - L.info('Cleaned up {} completed threads'.format(cleaned)) + self.cleanup(callback_threads, cleanup_every) + + if break_loop is True: + break except KeyboardInterrupt: L.info("Aborted via keyboard") @@ -111,10 +167,12 @@ class EasyConsumer(BaseConsumer, Consumer): def __init__(self, kafka_brokers: List[str], consumer_group: str, - kafka_topic: str, + kafka_topics: List[str] = None, + kafka_topic: str = None, topic_config: dict = None, offset: str = None, - kafka_conf: dict = None) -> None: + kafka_conf: dict = None, + schema_registry_url: str = None ) -> None: kafka_conf = kafka_conf or {} @@ -123,7 +181,15 @@ def __init__(self, L.warning("topic_config is deprecated. Put these info kafka_conf") kafka_conf.update(topic_config) - self.kafka_topic = kafka_topic + # Backwards compat + if isinstance(kafka_topic, str): + L.warning("Please use the `kafka_topics` argument (List[str])") + self.kafka_topics = [kafka_topic] + else: + self.kafka_topics = kafka_topics + + if not isinstance(self.kafka_topics, list): + raise ValueError("Please supply a `kafka_topics` (List[str]) parameter") # A simplier way to set the topic offset if offset is not None and 'auto.offset.reset' not in kafka_conf: @@ -137,44 +203,11 @@ def __init__(self, 'enable.partition.eof': True, } - super().__init__( - {**conf, **kafka_conf} - ) - - -class EasyAvroConsumer(BaseConsumer, AvroConsumer): - - def __init__(self, - schema_registry_url: str, - kafka_brokers: List[str], - consumer_group: str, - kafka_topic: str, - topic_config: dict = None, - offset: str = None, - kafka_conf: dict = None) -> None: + if schema_registry_url is not None: + conf['schema.registry.url'] = schema_registry_url - kafka_conf = kafka_conf or {} + super().__init__({**conf, **kafka_conf}) - topic_config = topic_config or {} - if topic_config: - L.warning("topic_config is deprecated. Put these info kafka_conf") - kafka_conf.update(topic_config) - - self.kafka_topic = kafka_topic - - # A simplier way to set the topic offset - if offset is not None and 'auto.offset.reset' not in kafka_conf: - kafka_conf['auto.offset.reset'] = offset - - conf = { - 'bootstrap.servers': ','.join(kafka_brokers), - 'schema.registry.url': schema_registry_url, - 'client.id': self.__class__.__name__, - 'group.id': consumer_group, - 'api.version.request': 'true', - 'enable.partition.eof': True, - } - super().__init__( - {**conf, **kafka_conf} - ) +class EasyAvroConsumer(EasyConsumer, AvroConsumer): + pass \ No newline at end of file diff --git a/easyavro/multi_consumer.py b/easyavro/multi_consumer.py deleted file mode 100644 index 4e7266a..0000000 --- a/easyavro/multi_consumer.py +++ /dev/null @@ -1,185 +0,0 @@ -#!python -# coding=utf-8 -import time -import logging -import threading -from typing import List, Callable -from datetime import datetime, timedelta - -from confluent_kafka import Consumer, KafkaError -from confluent_kafka.avro import AvroConsumer -from confluent_kafka.avro.serializer import SerializerError - -L = logging.getLogger('easyavro') -L.propagate = False -L.addHandler(logging.NullHandler()) - - -class MultiConsumer: - - def start(self, - on_receive: Callable[[str, str], None] = None, - on_receive_timeout: int = None, - timeout: int = None, - loop: bool = True, - num_messages: int = 1, - initial_wait: int = None, - cleanup_every: int = 1000 - ) -> None: - - if on_receive is None: - def on_receive(k, v): - L.info("Received message:\nKey: {}\nValue: {}".format(k, v)) - - if initial_wait is not None: - initial_wait = int(initial_wait) - loops = 0 - started = datetime.now() - start_delta = timedelta(seconds=initial_wait) - while (datetime.now() - started) < start_delta: - L.info("Starting in {} seconds".format(initial_wait - loops)) - time.sleep(1) - loops += 1 - - callback_threads = [] - self.subscribe(self.kafka_topics) - L.info("Starting consumer...") - - break_loop = False - - try: - while True: - - if break_loop is True: - break - - try: - messages = self.consume(num_messages=num_messages, timeout=timeout) - - valid_messages = [] - for msg in messages: - if msg is None: - continue - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - if loop is True: - continue - else: - break_loop = True - else: - L.error(msg.error()) - else: - valid_messages.append(msg) - - # If we got any valid messages call the Thread with the collection - if valid_messages: - # Call the function we passed in if we consumed valid messages - t = threading.Thread( - name='EasyAvro-on_receive', - target=on_receive, - args=(valid_messages,) - ) - t.start() - callback_threads.append(t) - except SerializerError as e: - L.warning('Message deserialization failed: {}"'.format(e)) - finally: - # Periodically clean up threads to prevent the list of callback_threads - # from becoming absolutely huge on long running Consumers - if len(callback_threads) >= cleanup_every: - for x in callback_threads: - x.join(0) - callback_threads = [ - x for x in callback_threads if x.is_alive() - ] - cleaned = cleanup_every - len(callback_threads) - L.info('Cleaned up {} completed threads'.format(cleaned)) - - except KeyboardInterrupt: - L.info("Aborted via keyboard") - finally: - L.info("Waiting for on_receive callbacks to finish...") - # Block for `on_receive_timeout` for each thread that isn't finished - [ ct.join(timeout=on_receive_timeout) for ct in callback_threads ] - # Now see if any threads are still alive (didn't exit after `on_receive_timeout`) - alive_threads = [ at for at in callback_threads if at.is_alive() ] - for at in alive_threads: - L.warning('{0.name}-{0.ident} never exited and is still running'.format(at)) - - L.debug("Closing consumer...") - self.close() - L.info("Done consuming") - - -class EasyMultiConsumer(MultiConsumer, Consumer): - - def __init__(self, - kafka_brokers: List[str], - consumer_group: str, - kafka_topics: List[str], - topic_config: dict = None, - offset: str = None, - kafka_conf: dict = None) -> None: - - kafka_conf = kafka_conf or {} - - topic_config = topic_config or {} - if topic_config: - L.warning("topic_config is deprecated. Put these info kafka_conf") - kafka_conf.update(topic_config) - - self.kafka_topics = kafka_topics - - # A simplier way to set the topic offset - if offset is not None and 'auto.offset.reset' not in kafka_conf: - kafka_conf['auto.offset.reset'] = offset - - conf = { - 'bootstrap.servers': ','.join(kafka_brokers), - 'client.id': self.__class__.__name__, - 'group.id': consumer_group, - 'api.version.request': 'true', - 'enable.partition.eof': True, - } - - super().__init__( - {**conf, **kafka_conf} - ) - - -class EasyMultiAvroConsumer(MultiConsumer, AvroConsumer): - - def __init__(self, - schema_registry_url: str, - kafka_brokers: List[str], - consumer_group: str, - kafka_topics: List[str], - topic_config: dict = None, - offset: str = None, - kafka_conf: dict = None) -> None: - - kafka_conf = kafka_conf or {} - - topic_config = topic_config or {} - if topic_config: - L.warning("topic_config is deprecated. Put these info kafka_conf") - kafka_conf.update(topic_config) - - self.kafka_topics = kafka_topics - - # A simplier way to set the topic offset - if offset is not None and 'auto.offset.reset' not in kafka_conf: - kafka_conf['auto.offset.reset'] = offset - - conf = { - 'bootstrap.servers': ','.join(kafka_brokers), - 'schema.registry.url': schema_registry_url, - 'client.id': self.__class__.__name__, - 'group.id': consumer_group, - 'api.version.request': 'true', - 'enable.partition.eof': True, - } - - super().__init__( - {**conf, **kafka_conf} - ) diff --git a/easyavro/tests/test_avro.py b/easyavro/tests/test_avro.py index afae702..258b306 100644 --- a/easyavro/tests/test_avro.py +++ b/easyavro/tests/test_avro.py @@ -3,12 +3,14 @@ import os import uuid import time +import json +from typing import List +from unittest import TestCase from os.path import join as opj from os.path import dirname as dn from os.path import abspath as ap -from unittest import TestCase - +from confluent_kafka import Message from confluent_kafka.avro import CachedSchemaRegistryClient from easyavro import EasyAvroConsumer, EasyAvroProducer, schema @@ -20,6 +22,18 @@ L.handlers = [logging.StreamHandler()] +def psort(l): + def sort_by_uuid(p): + payload = p[1] + if isinstance(payload, str): + payload = json.loads(payload) + + if isinstance(payload, dict): + return payload['uuid'] + + return sorted(l, key=sort_by_uuid) + + class TestAvro(TestCase): def setUp(self): @@ -50,7 +64,7 @@ def setUp(self): schema_registry_url='http://{}:4002'.format(self.testhost), kafka_brokers=['{}:4001'.format(self.testhost)], consumer_group='easyavro.testing', - kafka_topic=self.topic, + kafka_topics=[self.topic], offset='earliest' ) @@ -83,8 +97,12 @@ def test_messages(self): self.bp.produce(records) # Consume - self.bc.consume(on_receive=self.on_receive, timeout=1, loop=False) - assert self.received[len(records) * -1:] == records + self.bc.start( + on_receive=self.on_receive, + timeout=1, + loop=False + ) + assert psort(self.received[len(records) * -1:]) == psort(records) def test_initial_wait(self): # Produce @@ -95,13 +113,14 @@ def test_initial_wait(self): self.bp.produce(records) # Consume - self.bc.consume( + self.bc.start( on_receive=self.on_receive, timeout=1, loop=False, - initial_wait=5 + initial_wait=5, + num_messages=5 ) - assert self.received[len(records) * -1:] == records + assert psort(self.received[len(records) * -1:]) == psort(records) def test_on_receive_timeout(self): # Produce @@ -116,7 +135,7 @@ def test_on_receive_timeout(self): schema_registry_url='http://{}:4002'.format(self.testhost), kafka_brokers=['{}:4001'.format(self.testhost)], consumer_group='easyavro.testing', - kafka_topic=self.topic, + kafka_topics=[self.topic], offset='earliest' ) received = [] @@ -124,9 +143,8 @@ def test_on_receive_timeout(self): def on_receive(key: str, value: str) -> None: time.sleep(10) raise ValueError('hi') - L.info("Received message") - bc.consume( + bc.start( on_receive=on_receive, on_receive_timeout=1, timeout=1, @@ -155,14 +173,21 @@ def test_cleanup(self): ] self.bp.produce(records) + def on_receive(messages: List[Message]) -> None: + for m in messages: + self.received.append((m.key(), m.value())) + L.info("Received message") + # Consume - self.bc.consume( - on_receive=self.on_receive, + self.bc.start( + on_receive=on_receive, timeout=1, loop=False, - cleanup_every=2 + cleanup_every=2, + num_messages=5, + receive_messages_in_callback=True ) - assert self.received[len(records) * -1:] == records + assert psort(self.received[len(records) * -1:]) == psort(records) def test_overflow_with_custom_config(self): self.bp = EasyAvroProducer( @@ -183,7 +208,7 @@ def test_overflow_with_custom_config(self): with self.assertRaises(BufferError): self.bp.produce(records) - def test_dont_overflow_with_batch_specified(self): + def test_do_not_overflow_with_batch_specified(self): self.bp = EasyAvroProducer( schema_registry_url='http://{}:4002'.format(self.testhost), kafka_brokers=['{}:4001'.format(self.testhost)], diff --git a/easyavro/tests/test_multi.py b/easyavro/tests/test_multi.py deleted file mode 100644 index c413269..0000000 --- a/easyavro/tests/test_multi.py +++ /dev/null @@ -1,252 +0,0 @@ -#!python -# coding=utf-8 -import os -import json -import uuid -import time -from typing import List -from unittest import TestCase - - -import msgpack -from confluent_kafka import Message - -from easyavro import EasyProducer, EasyMultiConsumer - -import logging -L = logging.getLogger('easyavro') -L.propagate = False -L.setLevel(logging.DEBUG) -L.handlers = [logging.StreamHandler()] - - -class TestMultiMsgPack(TestCase): - - def setUp(self): - - self.testhost = os.environ.get('EASYAVRO_TESTING_HOST', 'localhost') - - self.topic = 'easyavro-testing-topic-noavro' - - self.bp = EasyProducer( - kafka_brokers=['{}:4001'.format(self.testhost)], - kafka_topic=self.topic - ) - - self.bc = EasyMultiConsumer( - kafka_brokers=['{}:4001'.format(self.testhost)], - consumer_group='easyavro.testing', - kafka_topics=[self.topic], - offset='earliest', - num_messages=100 - ) - - def on_receive(messages: List[Message]) -> None: - for m in messages: - self.received.append((m.key().decode('utf-8'), msgpack.loads(m.value()))) - L.info("Received message") - self.received = [] - self.on_receive = on_receive - - def tearDown(self): - del self.bp - del self.bc - - def test_messages(self): - # Produce - m1 = str(uuid.uuid4()) - m2 = str(uuid.uuid4()) - m3 = str(uuid.uuid4()) - records = [ - ('ADD', msgpack.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('UPDATE', msgpack.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('DELETE', msgpack.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('ADD', msgpack.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('UPDATE', msgpack.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('DELETE', msgpack.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('ADD', msgpack.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ('UPDATE', msgpack.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ('DELETE', msgpack.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ] - self.bp.produce(records) - - # Consume - self.bc.start(on_receive=self.on_receive, timeout=1, loop=False) - - loaded_records = [ (k, msgpack.loads(v)) for (k, v) in records ] - - assert self.received[len(records) * -1:] == loaded_records - - -class TestMultiNoAvro(TestCase): - - def setUp(self): - - self.testhost = os.environ.get('EASYAVRO_TESTING_HOST', 'localhost') - - self.topic = 'easyavro-testing-topic-noavro' - - self.bp = EasyProducer( - kafka_brokers=['{}:4001'.format(self.testhost)], - kafka_topic=self.topic - ) - - self.bc = EasyMultiConsumer( - kafka_brokers=['{}:4001'.format(self.testhost)], - consumer_group='easyavro.testing', - kafka_topics=[self.topic], - offset='earliest' - ) - - def on_receive(messages: List[Message]) -> None: - for m in messages: - self.received.append((m.key().decode('utf-8'), m.value().decode('utf-8'))) - L.info("Received message") - self.received = [] - self.on_receive = on_receive - - def tearDown(self): - del self.bp - del self.bc - - def test_messages(self): - # Produce - m1 = str(uuid.uuid4()) - m2 = str(uuid.uuid4()) - m3 = str(uuid.uuid4()) - records = [ - ('ADD', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('UPDATE', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('DELETE', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('ADD', json.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('UPDATE', json.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('DELETE', json.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('ADD', json.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ('UPDATE', json.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ('DELETE', json.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ] - self.bp.produce(records) - - # Consume - self.bc.start( - on_receive=self.on_receive, - timeout=1, - loop=False - ) - - assert self.received[len(records) * -1:] == records - - def test_initial_wait(self): - # Produce - m1 = str(uuid.uuid4()) - records = [ - ('ADD', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ] - self.bp.produce(records) - - # Consume - self.bc.start( - on_receive=self.on_receive, - timeout=1, - loop=False, - initial_wait=5, - num_messages=5 - ) - assert self.received[len(records) * -1:] == records - - def test_on_receive_timeout(self): - # Produce - m1 = str(uuid.uuid4()) - records = [ - ('ADD', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ] - self.bp.produce(records) - - # Consume - bc = EasyMultiConsumer( - kafka_brokers=['{}:4001'.format(self.testhost)], - consumer_group='easyavro.testing', - kafka_topics=[self.topic], - offset='earliest', - ) - received = [] - - def on_receive(messages: List[Message]) -> None: - time.sleep(10) - raise ValueError('hi') - - bc.start( - on_receive=on_receive, - on_receive_timeout=1, - timeout=1, - loop=False, - num_messages=2 - ) - # Our callbacks take longer than the `on_receive_timeout` - # so we should receive nothing even though the callback - # might finish eventually - assert received[len(records) * -1:] == [] - - def test_cleanup(self): - # Produce - m1 = str(uuid.uuid4()) - m2 = str(uuid.uuid4()) - m3 = str(uuid.uuid4()) - records = [ - ('ADD', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('UPDATE', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('DELETE', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('ADD', json.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('UPDATE', json.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('DELETE', json.dumps({ 'uuid': m2, 'properties': {'name': 'TEST 2' }})), - ('ADD', json.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ('UPDATE', json.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ('DELETE', json.dumps({ 'uuid': m3, 'properties': {'name': 'TEST 3' }})), - ] - self.bp.produce(records) - - # Consume - self.bc.start( - on_receive=self.on_receive, - timeout=1, - loop=False, - cleanup_every=2, - num_messages=2 - ) - assert self.received[len(records) * -1:] == records - - def test_overflow_with_custom_config(self): - self.bp = EasyProducer( - kafka_brokers=['{}:4001'.format(self.testhost)], - kafka_topic=self.topic, - kafka_conf={ - 'queue.buffering.max.messages': 1 - } - ) - - m1 = str(uuid.uuid4()) - records = [ - ('ADD', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('UPDATE', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ] - - with self.assertRaises(BufferError): - self.bp.produce(records) - - def test_dont_overflow_with_batch_specified(self): - self.bp = EasyProducer( - kafka_brokers=['{}:4001'.format(self.testhost)], - kafka_topic=self.topic, - kafka_conf={ - 'queue.buffering.max.messages': 1 - } - ) - - m1 = str(uuid.uuid4()) - records = [ - ('ADD', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ('UPDATE', json.dumps({ 'uuid': m1, 'properties': {'name': 'TEST 1' }})), - ] - - # This should not error now - self.bp.produce(records, batch=1) diff --git a/easyavro/tests/test_noavro.py b/easyavro/tests/test_noavro.py index 36ec40e..468724d 100644 --- a/easyavro/tests/test_noavro.py +++ b/easyavro/tests/test_noavro.py @@ -4,10 +4,12 @@ import json import uuid import time - +from typing import List from unittest import TestCase import msgpack +from confluent_kafka import Message + from easyavro import EasyConsumer, EasyProducer import logging @@ -17,6 +19,18 @@ L.handlers = [logging.StreamHandler()] +def psort(l): + def sort_by_uuid(p): + payload = p[1] + if isinstance(payload, str): + payload = json.loads(payload) + + if isinstance(payload, dict): + return payload['uuid'] + + return sorted(l, key=sort_by_uuid) + + class TestMsgPack(TestCase): def setUp(self): @@ -33,7 +47,7 @@ def setUp(self): self.bc = EasyConsumer( kafka_brokers=['{}:4001'.format(self.testhost)], consumer_group='easyavro.testing', - kafka_topic=self.topic, + kafka_topics=[self.topic], offset='earliest' ) @@ -66,11 +80,15 @@ def test_messages(self): self.bp.produce(records) # Consume - self.bc.consume(on_receive=self.on_receive, timeout=1, loop=False) + self.bc.start( + on_receive=self.on_receive, + timeout=1, + loop=False + ) loaded_records = [ (k, msgpack.loads(v)) for (k, v) in records ] - assert self.received[len(records) * -1:] == loaded_records + assert psort(self.received[len(records) * -1:]) == psort(loaded_records) class TestNoAvro(TestCase): @@ -89,7 +107,7 @@ def setUp(self): self.bc = EasyConsumer( kafka_brokers=['{}:4001'.format(self.testhost)], consumer_group='easyavro.testing', - kafka_topic=self.topic, + kafka_topics=[self.topic], offset='earliest' ) @@ -122,9 +140,13 @@ def test_messages(self): self.bp.produce(records) # Consume - self.bc.consume(on_receive=self.on_receive, timeout=1, loop=False) + self.bc.start( + on_receive=self.on_receive, + timeout=1, + loop=False + ) - assert self.received[len(records) * -1:] == records + assert psort(self.received[len(records) * -1:]) == psort(records) def test_initial_wait(self): # Produce @@ -135,13 +157,14 @@ def test_initial_wait(self): self.bp.produce(records) # Consume - self.bc.consume( + self.bc.start( on_receive=self.on_receive, timeout=1, loop=False, - initial_wait=5 + initial_wait=5, + num_messages=5 ) - assert self.received[len(records) * -1:] == records + assert psort(self.received[len(records) * -1:]) == psort(records) def test_on_receive_timeout(self): # Produce @@ -155,7 +178,7 @@ def test_on_receive_timeout(self): bc = EasyConsumer( kafka_brokers=['{}:4001'.format(self.testhost)], consumer_group='easyavro.testing', - kafka_topic=self.topic, + kafka_topics=[self.topic], offset='earliest' ) received = [] @@ -163,9 +186,8 @@ def test_on_receive_timeout(self): def on_receive(key: str, value: str) -> None: time.sleep(10) raise ValueError('hi') - L.info("Received message") - bc.consume( + bc.start( on_receive=on_receive, on_receive_timeout=1, timeout=1, @@ -194,14 +216,21 @@ def test_cleanup(self): ] self.bp.produce(records) + def on_receive(messages: List[Message]) -> None: + for m in messages: + self.received.append((m.key().decode('utf-8'), m.value().decode('utf-8'))) + L.info("Received message") + # Consume - self.bc.consume( - on_receive=self.on_receive, + self.bc.start( + on_receive=on_receive, timeout=1, loop=False, - cleanup_every=2 + cleanup_every=2, + num_messages=5, + receive_messages_in_callback=True ) - assert self.received[len(records) * -1:] == records + assert psort(self.received[len(records) * -1:]) == psort(records) def test_overflow_with_custom_config(self): self.bp = EasyProducer( @@ -221,7 +250,7 @@ def test_overflow_with_custom_config(self): with self.assertRaises(BufferError): self.bp.produce(records) - def test_dont_overflow_with_batch_specified(self): + def test_do_not_overflow_with_batch_specified(self): self.bp = EasyProducer( kafka_brokers=['{}:4001'.format(self.testhost)], kafka_topic=self.topic,