Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.

feat: Add kafka to docker compose #5

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
!.gitignore
TODO
__pycache__
*.DS_Store*
*.egg-info/
/build/
/dist/
/.idea/
venv/
env/
transifex_input.yaml
114 changes: 114 additions & 0 deletions tutorevent_bus_redis/patches/local-docker-compose-services
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
{% if RUN_KAFKA_SERVER %}
# needed by Kafka to keep track of nodes, topics, and messages.
zookeeper:
image: confluentinc/cp-zookeeper:6.2.1
restart: unless-stopped
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

# Events broker
kafka:
image: confluentinc/cp-server:6.2.1
depends_on:
- zookeeper
restart: unless-stopped
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:18081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

# storage layer for data schemas in Kafka
schema-registry:
image: confluentinc/cp-schema-registry:6.2.1
depends_on:
- kafka
restart: unless-stopped
ports:
- "18081:18081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:18081
{% endif %}

{% if RUN_KAFKA_SERVER and RUN_KAFKA_UI %}
# browser app for monitoring local Kafka cluster. This is quite memory- and CPU-intensive, so it should only be used for local Kafka debugging
kafka-control-center:
image: confluentinc/cp-enterprise-control-center:6.2.1
depends_on:
- kafka
- schema-registry
restart: unless-stopped
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:29092
CONTROL_CENTER_SCHEMA_REGISTRY_URL: http://schema-registry:18081
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
{% endif %}

{% if EVENT_BUS_BACKEND %}
# This is just a stub test for showing how we could run consumers
login-consumer:
image: docker.io/overhangio/openedx:17.0.2-nightly
environment:
SERVICE_VARIANT: lms
DJANGO_SETTINGS_MODULE: lms.envs.tutor.production
command: >
./manage.py lms consume_events -t user-login -g user-activity-service {% if EVENT_BUS_BACKEND == "redis" %}--extra '{"consumer_name": "user-login-1"}'{% endif %}
restart: unless-stopped
volumes:
- ../apps/openedx/settings/lms:/openedx/edx-platform/lms/envs/tutor:ro
- ../apps/openedx/settings/cms:/openedx/edx-platform/cms/envs/tutor:ro
- ../apps/openedx/config:/openedx/config:ro
- ../../data/lms:/openedx/data
- ../../data/openedx-media:/openedx/media
depends_on:
- {% if EVENT_BUS_BACKEND == "redis" %}redis{% elif EVENT_BUS_BACKEND == "kafka" %}kafka{% endif %}

# This is just a stub test for showing how we could run consumers
tracking-consumer:
image: docker.io/overhangio/openedx:17.0.2-nightly
environment:
SERVICE_VARIANT: lms
DJANGO_SETTINGS_MODULE: lms.envs.tutor.production
command: >
./manage.py lms consume_events -t analytics -g analytics-service {% if EVENT_BUS_BACKEND == "redis" %}--extra '{"consumer_name": "analytics-1"}'{% endif %}
restart: unless-stopped
volumes:
- ../apps/openedx/settings/lms:/openedx/edx-platform/lms/envs/tutor:ro
- ../apps/openedx/settings/cms:/openedx/edx-platform/cms/envs/tutor:ro
- ../apps/openedx/config:/openedx/config:ro
- ../../data/lms:/openedx/data
- ../../data/openedx-media:/openedx/media
{%- for mount in iter_mounts(MOUNTS, "openedx", "lms") %}
- {{ mount }}
{%- endfor %}
depends_on:
- {% if EVENT_BUS_BACKEND == "redis" %}redis{% elif EVENT_BUS_BACKEND == "kafka" %}kafka{% endif %}
{% endif %}
42 changes: 42 additions & 0 deletions tutorevent_bus_redis/patches/openedx-common-settings
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Backend independent settings for event production
EVENT_BUS_PRODUCER_CONFIG = {{ EVENT_BUS_PRODUCER_CONFIG }}
SEND_CATALOG_INFO_SIGNAL = {{ EVENT_BUS_SEND_CATALOG_INFO_SIGNAL }}


{% if EVENT_BUS_BACKEND == 'redis' %}
# redis connection url
# https://redis.readthedocs.io/en/stable/examples/ssl_connection_examples.html#Connecting-to-a-Redis-instance-via-a-URL-string
EVENT_BUS_REDIS_CONNECTION_URL = "{{ EVENT_BUS_REDIS_CONNECTION_URL }}"
EVENT_BUS_TOPIC_PREFIX = "{{ EVENT_BUS_REDIS_TOPIC_PREFIX }}"
EVENT_BUS_PRODUCER = "{{ EVENT_BUS_REDIS_PRODUCER }}"
EVENT_BUS_CONSUMER = "{{ EVENT_BUS_REDIS_CONSUMER }}"
{% if EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT %}
EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT = int("{{ EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT }}")
{% endif %}
EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT = int("{{ EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT }}")
EVENT_BUS_REDIS_STREAM_MAX_LEN = int("{{ EVENT_BUS_REDIS_STREAM_MAX_LEN }}")
{% endif %}

{% if EVENT_BUS_BACKEND == 'kafka' %}
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = "{{ EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL }}"
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = "{{ EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS }}"
EVENT_BUS_PRODUCER = "{{ EVENT_BUS_KAFKA_PRODUCER }}"
EVENT_BUS_CONSUMER = "{{ EVENT_BUS_KAFKA_CONSUMER }}"
EVENT_BUS_TOPIC_PREFIX = "{{ EVENT_BUS_KAFKA_TOPIC_PREFIX }}"
{% endif %}

{% if EVENT_BUS_TRACKING_LOGS %}
SEND_TRACKING_EVENT_EMITTED_SIGNAL = True

# Update the backends to use the event bus
EVENT_TRACKING_BACKENDS["xapi"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend"
EVENT_TRACKING_BACKENDS["caliper"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend"

# Update backend to send events in sync mode
EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"]["backends"]["xapi"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter"
EVENT_TRACKING_BACKENDS["caliper"]["OPTIONS"]["backends"]["caliper"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter"

# Remove caliper from the tracking backends to prevent double-event-emission
EVENT_TRACKING_BACKENDS.pop("caliper")

{% endif %}
95 changes: 95 additions & 0 deletions tutorevent_bus_redis/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,108 @@
########################################
# CONFIGURATION
########################################
# FIXME: Update this to a saner config structure less likely to break, and able
# to activate and deactivate individual events more easily.
PRODUCER_CONFIG = {
'org.openedx.content_authoring.xblock.published.v1': {
'content-authoring-xblock-lifecycle':
{'event_key_field': 'xblock_info.usage_key', 'enabled': False},
'content-authoring-xblock-published':
{'event_key_field': 'xblock_info.usage_key', 'enabled': False},
},
'org.openedx.content_authoring.xblock.deleted.v1': {
'content-authoring-xblock-lifecycle':
{'event_key_field': 'xblock_info.usage_key', 'enabled': False},
},
'org.openedx.learning.auth.session.login.completed.v1': {
'user-login': {'event_key_field': 'user.pii.username', 'enabled': False},
},
'org.openedx.analytics.tracking.event.emitted.v1': {
'analytics': {'event_key_field': 'tracking_log.name', 'enabled': True}
},
}

hooks.Filters.CONFIG_DEFAULTS.add_items(
[
# Add your new settings that have default values here.
# Each new setting is a pair: (setting_name, default_value).
# Prefix your setting names with 'EVENT_BUS_REDIS_'.
("EVENT_BUS_REDIS_VERSION", __version__),

# Possible values are "kafka", "redis", or None to disable the
# event bus
("EVENT_BUS_BACKEND", "redis"),

# Settings for producing events
("EVENT_BUS_SEND_CATALOG_INFO_SIGNAL", True),
("EVENT_BUS_TRACKING_LOGS", True),
(
# FIXME: We should only install the one that's configured
"OPENEDX_EXTRA_PIP_REQUIREMENTS",
[
"edx-event-bus-redis==0.3.3",
"edx-event-bus-kafka==v5.6.0",
"openedx-events==v9.5.1",
"confluent_kafka[avro,schema-registry]",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pin this one?

"git+https://github.com/openedx/platform-plugin-aspects.git@bmtcril/tracking_event_command",
],
),
("EVENT_BUS_PRODUCER_CONFIG", PRODUCER_CONFIG),

######################################
# redis backend settings

# If true, this will run a separate instance of redis just for the
# event bus to prevent resource conflicts with other services
# TODO: Implement this
# ("RUN_DEDICATED_REDIS_BUS_SERVER", True),

# Prefix for topics sent over the event bus
("EVENT_BUS_REDIS_TOPIC_PREFIX", "openedx"),

# Producer class which can send events to redis streams.
("EVENT_BUS_REDIS_PRODUCER", "edx_event_bus_redis.create_producer"),

# Consumer class which can consume events from redis streams.
("EVENT_BUS_REDIS_CONSUMER", "edx_event_bus_redis.RedisEventConsumer"),

# If the consumer encounters this many consecutive errors, exit with an error. This is intended to be used in a
# context where a management system (such as Kubernetes) will relaunch the consumer automatically.
# Default is "None", which means the consumer will never relaunch.
("EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT", 0),

# How long the consumer should wait for new entries in a stream.
# As we are running the consumer in a while True loop, changing this setting doesn't make much difference expect
# for changing number of monitoring messages while waiting for new events.
# https://redis.io/commands/xread/#blocking-for-data
("EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT", 60),

# Limits stream size to approximately this number
("EVENT_BUS_REDIS_STREAM_MAX_LEN", 10_000),

######################################
# Kafka backend settings
# TODO: Move hard coded settings from local-docker-compose-services here
# Version of https://github.com/openedx/event-bus-kafka to install
# This is what follows 'pip install' so you can use official versions
# or install from forks / branches / PRs here
("EVENT_BUS_KAFKA_RELEASE", "edx-event-bus-kafka=='v5.6.0'"),

# This will run schema-manager, zookeeper and kafka. Set to False if you
# are using a 3rd party to host Kafka or managing it outside of Tutor.
("RUN_KAFKA_SERVER", False),

# This will run kafka-control-center. This consumes a lot of resources,
# you can turn it off separately from the required services. Requires
# RUN_KAFKA_SERVER to be True as well.
("RUN_KAFKA_UI", False),

("EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL", "http://schema-registry:18081"),
("EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS", "kafka:29092"),
("EVENT_BUS_KAFKA_PRODUCER", "edx_event_bus_kafka.create_producer"),
("EVENT_BUS_KAFKA_CONSUMER", "edx_event_bus_kafka.KafkaEventConsumer"),
("EVENT_BUS_KAFKA_TOPIC_PREFIX", "dev"),
("EVENT_BUS_REDIS_CONNECTION_URL", "redis://{% if REDIS_USERNAME and REDIS_PASSWORD %}{{ REDIS_USERNAME }}:{{""REDIS_PASSWORD }}{% endif %}@{{ REDIS_HOST }}:{{ REDIS_PORT }}/5")
]
)

Expand Down