Skip to content

Commit

Permalink
Consolidate and simplify the code, docucment Multi features
Browse files Browse the repository at this point in the history
  • Loading branch information
kwilcox committed Mar 10, 2021
1 parent 0390595 commit 547a02d
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 595 deletions.
42 changes: 22 additions & 20 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM debian:9.5
LABEL maintainer="Kyle Wilcox <kyle@axiomdatascience.com>"
LABEL MAINTAINER="Kyle Wilcox <kyle@axds.co>"
ENV DEBIAN_FRONTEND noninteractive
ENV LANG C.UTF-8

Expand All @@ -16,30 +16,32 @@ RUN apt-get update && apt-get install -y \
wget \
&& \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* /var/cache/oracle-jdk8-installer

# Copy over environment definition
COPY environment.yml /tmp/environment.yml
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

# Setup CONDA (https://hub.docker.com/r/continuumio/miniconda3/~/dockerfile/)
ENV MINICONDA_VERSION latest
RUN echo 'export PATH=/opt/conda/bin:$PATH' > /etc/profile.d/conda.sh && \
curl -k -o /miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-$MINICONDA_VERSION-Linux-x86_64.sh && \
ENV MINICONDA_VERSION py38_4.8.2
ENV MINICONDA_SHA256 5bbb193fd201ebe25f4aeb3c58ba83feced6a25982ef4afa86d5506c3656c142
RUN curl -k -o /miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-$MINICONDA_VERSION-Linux-x86_64.sh && \
echo $MINICONDA_SHA256 /miniconda.sh | sha256sum --check && \
/bin/bash /miniconda.sh -b -p /opt/conda && \
rm /miniconda.sh && \
/opt/conda/bin/conda config \
--set always_yes yes \
--set changeps1 no \
--set show_channel_urls True \
&& \
/opt/conda/bin/conda env update -n root --file /tmp/environment.yml && \
/opt/conda/bin/conda clean -a -y
/opt/conda/bin/conda update -c conda-forge -n base conda && \
/opt/conda/bin/conda clean -afy && \
/opt/conda/bin/conda init && \
find /opt/conda/ -follow -type f -name '*.a' -delete && \
find /opt/conda/ -follow -type f -name '*.js.map' -delete && \
/opt/conda/bin/conda install -y -c conda-forge -n base mamba pip && \
/opt/conda/bin/conda clean -afy

ENV PATH /opt/conda/bin:$PATH

# Copy packrat contents and install
ENV CODE_HOME /easyavro
WORKDIR $CODE_HOME
COPY . $CODE_HOME
COPY environment.yml /tmp/
RUN mamba env create -n runenv -f /tmp/environment.yml && \
echo "conda activate runenv" >> /root/.bashrc

ENV PROJECT_ROOT /code
RUN mkdir -p "$PROJECT_ROOT"
COPY . $PROJECT_ROOT
WORKDIR $PROJECT_ROOT

CMD ["py.test", "-s", "-rxs", "-v"]
CMD ["conda", "run", "-n", "runenv", "--no-capture-output", "pytest", "-s", "-rxs", "-v"]
48 changes: 37 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,16 @@ The defaults are sane. They will pull offsets from the broker and set the topic

If you need to override any kafka level parameters, you may use the the `kafka_conf` (`dict`) initialization parameter on `Consumer`. It will override any of the defaults the `Consumer` uses. See the documentation for the `config` parameter to [`Consumer`](https://docs.confluent.io/current/clients/confluent-kafka-python/#consumer), [`AvroConsumer`](https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.avro.AvroConsumer) and the [list of librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties).

##### Parameters for `consume` function
##### Parameters for `start` function

* `on_receive` (`Callable[[str, str], None]`) - Function that is executed (in a new thread) for each retrieved message.
* `on_receive` (`Callable[str, str]` or `Callable[List[Message]]`) - Function that is executed (in a new thread) for each retrieved message.
* `on_receive_timeout` (`int`) - Seconds the `Consumer` will wait for the calls to `on_receive` to exit before moving on. By default it will wait forever. You should set this to a reasonable maximum number seconds your `on_receive` callback will take to prevent dead-lock when the `Consumer` is exiting and trying to cleanup its spawned threads.
* `timeout` (`int`) - The `timeout` parameter to the `poll` function in `confluent-kafka`. Controls how long `poll` will block while waiting for messages.
* `loop` (`bool`) - If the `Consumer` will keep looping for message or break after retrieving the first chunk message. This is useful when testing.
* `initial_wait` (`int`)- Seconds the Consumer should wait before starting to consume. This is useful when testing.
* `cleanup_every` (`int`) - Try to cleanup spawned thread after this many messages.
* `num_messages` (`int`) - Consume this many messages from the topic at once. This can improve throughput when dealing with high-volume topics that can benefit from processing many messages at once.
* `receive_messages_in_callback` (`bool`) - Instead of calling the `on_receive` callback with key/value pairs, call it with `confluent_kafka.Message` objects. This requires the user to call `message.key()` and `message.value()` on each. This gives the user access to other message attributes like `message.topic()` in the callback. **Setting this parameter to `True` is recommended for any new code.**

```python
from easyavro import EasyConsumer
Expand All @@ -191,7 +193,7 @@ bc = EasyConsumer(
consumer_group='easyavro.testing',
kafka_topic='my-topic'
)
bc.consume(on_receive=on_receive)
bc.start(on_receive=on_receive)
```

Or pass in your own [kafka config](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties) dict.
Expand All @@ -211,7 +213,7 @@ bc = EasyConsumer(
'offset.store.method': 'file'
}
)
bc.consume(on_receive=on_receive)
bc.start(on_receive=on_receive)
```

Or pass in a value to use for the `auto.offset.reset` [topic config](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties) setting.
Expand All @@ -228,7 +230,7 @@ bc = EasyConsumer(
kafka_topic='my-topic',
offset='earliest'
)
bc.consume(on_receive=on_receive)
bc.start(on_receive=on_receive)
```

#### EasyConsumer
Expand All @@ -246,7 +248,7 @@ bc = EasyConsumer(
consumer_group='easyavro.testing',
kafka_topic='my-topic'
)
bc.consume(on_receive=on_receive)
bc.start(on_receive=on_receive)
```

You can unpack data as needed in the callback function
Expand All @@ -263,13 +265,36 @@ bc = EasyConsumer(
consumer_group='easyavro.testing',
kafka_topic='my-topic'
)
bc.consume(on_receive=on_receive)
bc.start(on_receive=on_receive)
```

You can receive a list of Message objects instead of key/value pairs

#### EasyAvroProducer
```python
import msgpack
from typing import List
from easyavro import EasyConsumer
from confluent_kafka import Message

If you are using a Confluent SchemaRegistry this helper exists to match your topic name to existing schemas in the registry. Your schemas must already be available in the schema registry as `[topic]-key` and `[topic]-value`.
def on_receive(messages: List[Message]) -> None:
for m in messages:
print(
"Got Message - Topic:{}\nKey:{}\nValue:{}\n".format(m.topic(), m.key(), m.value()
)

bc = EasyConsumer(
kafka_brokers=['localhost:4001'],
consumer_group='easyavro.testing',
kafka_topic='my-topic',
)
bc.start(on_receive=on_receive, num_messages=5, receive_messages_in_callback=True)
```


#### EasyAvroConsumer

If you are using a Confluent SchemaRegistry this helper exists to match your topic name to existing schemas in the registry. Your schemas must already be available in the schema registry as `[topic]-key` and `[topic]-value`. Pass the `schema_registry_url` parameter to EasyAvroConsumer and the rest is
taken care of.

```python
from easyavro import EasyAvroConsumer
Expand All @@ -278,11 +303,12 @@ def on_receive(key: str, value: str) -> None:
print("Got Key:{}\nValue:{}\n".format(key, value))

bc = EasyAvroConsumer(
schema_registry_url='http://localhost:4002',
kafka_brokers=['localhost:4001'],
consumer_group='easyavro.testing',
kafka_topic='my-topic'
)
bc.consume(on_receive=on_receive)
bc.start(on_receive=on_receive)
```

## Testing
Expand Down Expand Up @@ -320,5 +346,5 @@ docker run --net="host" easyavro

```
conda env create environment.yml
py.test -s -rxs -v
pytest -s -rxs -v
```
3 changes: 0 additions & 3 deletions easyavro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!python
# coding=utf-8
from .consumer import EasyAvroConsumer, EasyConsumer
from .multi_consumer import EasyMultiConsumer, EasyMultiAvroConsumer
from .producer import EasyAvroProducer, EasyProducer, schema

__version__ = "3.0.0"
Expand All @@ -10,8 +9,6 @@
__all__ = [
'EasyConsumer',
'EasyAvroConsumer',
'EasyMultiConsumer',
'EasyMultiAvroConsumer',
'EasyProducer',
'EasyAvroProducer',
'schema'
Expand Down
Loading

0 comments on commit 547a02d

Please sign in to comment.