Skip to content

Commit

Permalink
Merge pull request #34 from Nasdaq/release-0.4.x
Browse files Browse the repository at this point in the history
Adding support for timeout and number of message settings
  • Loading branch information
ruchirvaninasdaq authored Dec 13, 2022
2 parents fee3ba9 + df0e2c1 commit 6b81b70
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 47 deletions.
15 changes: 5 additions & 10 deletions ncdssdk/src/main/python/ncdsclient/NCDSClient.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import logging
from importlib import resources
from ncdssdk.src.main.python.ncdsclient.internal.utils import ConsumerConfig
from ncdssdk.src.main.python.ncdsclient.internal.utils.AuthenticationConfigLoader import AuthenticationConfigLoader
from ncdssdk.src.main.python.ncdsclient.consumer.NasdaqKafkaAvroConsumer import NasdaqKafkaAvroConsumer
from ncdssdk.src.main.python.ncdsclient.internal.utils import IsItPyTest
import ncdssdk.src.main.resources as sysresources
import json
from confluent_kafka import OFFSET_END
from ncdssdk.src.main.python.ncdsclient.internal.utils import LoggingConfig
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader


class NCDSClient:
Expand All @@ -27,14 +24,12 @@ def __init__(self, security_cfg, kafka_cfg):
self.nasdaq_kafka_avro_consumer = None
LoggingConfig.create_logger()
self.logger = logging.getLogger(__name__)
self.kafka_cfg = kafka_cfg
self.kafka_config_loader = KafkaConfigLoader()

if kafka_cfg:
kafka_cfg['logger'] = logging.getLogger(__name__)

with resources.open_text(sysresources, "consumer-properties.json") as f:
self.consumer_props = json.load(f)
f.close()

try:
auth_config_loader = AuthenticationConfigLoader()
if security_cfg is not None and auth_config_loader.validate_security_config(security_cfg):
Expand Down Expand Up @@ -103,7 +98,7 @@ def top_messages(self, topic_name, timestamp=None):
kafka_consumer = self.ncds_kafka_consumer(topic_name, timestamp)
self.logger.debug("kafka_consumer is now trying to consume")
records = kafka_consumer.consume(
self.consumer_props[ConsumerConfig.NUM_MESSAGES], self.consumer_props[ConsumerConfig.TIMEOUT])
self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES], self.kafka_cfg[self.kafka_config_loader.TIMEOUT])
return records

def get_sample_messages(self, topic_name, message_name, all_messages):
Expand All @@ -127,7 +122,7 @@ def get_sample_messages(self, topic_name, message_name, all_messages):

while not found:
messages = kafka_consumer.consume(
self.consumer_props[ConsumerConfig.NUM_MESSAGES], self.consumer_props[ConsumerConfig.TIMEOUT])
self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES], self.kafka_cfg[self.kafka_config_loader.TIMEOUT])
if not messages or self.end_of_data(kafka_consumer):
print(
"--------------------------------END of Stream------------------")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader
from ncdssdk.src.main.python.ncdsclient.internal.utils import IsItPyTest, SeekToMidnight
from confluent_kafka import TopicPartition, OFFSET_INVALID, OFFSET_END, OFFSET_BEGINNING
import ncdssdk.src.main.python.ncdsclient.internal.utils.ConsumerConfig as config
from datetime import datetime
from ncdssdk.src.main.python.ncdsclient.internal.utils.Oauth import Oauth
import datetime
from pprint import pformat


class NasdaqKafkaAvroConsumer():
Expand Down Expand Up @@ -37,17 +37,17 @@ def __init__(self, security_cfg, kafka_cfg):

self.logger = logging.getLogger(__name__)

kafka_config_loader = KafkaConfigLoader()
self.kafka_config_loader = KafkaConfigLoader()
auth_config_loader = AuthenticationConfigLoader()
if self.kafka_cfg is None:
if IsItPyTest.is_py_test():
pytest_kafka_cfg = kafka_config_loader.load_test_config()
pytest_kafka_cfg = self.kafka_config_loader.load_test_config()
self.kafka_props = pytest_kafka_cfg
else:
raise Exception("Kafka Configuration not defined")
else:
self.kafka_props = self.kafka_cfg
kafka_config_loader.validate_and_add_specific_properties(
self.kafka_config_loader.validate_and_add_specific_properties(
self.kafka_props)

if self.security_cfg is None:
Expand All @@ -61,6 +61,8 @@ def __init__(self, security_cfg, kafka_cfg):
self.read_schema_topic.set_security_props(self.security_props)
self.read_schema_topic.set_kafka_props(self.kafka_props)
self.client_ID = auth_config_loader.get_client_id(self.security_props)
self.logger.info("Consumer Config: ")
self.logger.info(pformat(self.kafka_cfg))

def get_kafka_consumer(self, stream_name, timestamp=None):
"""
Expand All @@ -87,8 +89,7 @@ def get_kafka_consumer(self, stream_name, timestamp=None):
if timestamp is None:
self.logger.debug("Timestamp is none")

auto_offset_cfg = self.kafka_props.get(
config.AUTO_OFFSET_RESET_CONFIG)
auto_offset_cfg = self.kafka_props.get(self.kafka_config_loader.AUTO_OFFSET_RESET_CONFIG)
if auto_offset_cfg == "earliest" or auto_offset_cfg == "smallest" or auto_offset_cfg == "beginning":
self.logger.debug(
f"Auto offset reset config set to: {auto_offset_cfg}")
Expand All @@ -104,7 +105,7 @@ def get_kafka_consumer(self, stream_name, timestamp=None):
self.logger.debug(
"offset: " + str(topic_partition.offset) + ", timestamp: " + str(timestamp))
offsets_for_times = kafka_consumer.offsets_for_times(
[topic_partition], timeout=5)
[topic_partition], self.kafka_cfg.TIMEOUT)
except Exception as e:
self.logger.exception(e)
sys.exit(0)
Expand All @@ -130,9 +131,8 @@ def get_consumer(self, avro_schema, stream_name):
Returns:
a :class:`.KafkaAvroConsumer` instance with a key and value deserializer set through the avro_schema parameter
"""
if 'auto.offset.reset' not in self.kafka_props:
self.kafka_props[config.AUTO_OFFSET_RESET_CONFIG] = "earliest"
self.kafka_props[config.GROUP_ID_CONFIG] = f'{self.client_ID}_{stream_name}_{datetime.datetime.today().day}'
if 'group.id' not in self.kafka_props:
self.kafka_props[self.kafka_config_loader.GROUP_ID_CONFIG] = f'{self.client_ID}_{stream_name}_{datetime.datetime.today().day}'
return KafkaAvroConsumer(self.kafka_props, avro_schema)

def get_schema_for_topic(self, topic):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
MessageField)
import logging

from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader


class BasicKafkaConsumer(DeserializingConsumer):
"""
Expand All @@ -20,8 +22,12 @@ class BasicKafkaConsumer(DeserializingConsumer):
def __init__(self, config, key_deserializer, value_deserializer):
config["key.deserializer"] = key_deserializer
config["value.deserializer"] = value_deserializer.decode
kafka_config = config.copy()
del kafka_config[KafkaConfigLoader().TIMEOUT]
del kafka_config[KafkaConfigLoader().NUM_MESSAGES]

self.logger = logging.getLogger(__name__)
super(BasicKafkaConsumer, self).__init__(config)
super(BasicKafkaConsumer, self).__init__(kafka_config)

def ensure_assignment(self):
"""
Expand Down
17 changes: 6 additions & 11 deletions ncdssdk/src/main/python/ncdsclient/internal/ReadSchemaTopic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import ncdssdk.src.main.resources.schemas as schemas
from ncdssdk.src.main.python.ncdsclient.internal.KafkaAvroConsumer import KafkaAvroConsumer
from confluent_kafka import TopicPartition
import ncdssdk.src.main.python.ncdsclient.internal.utils.ConsumerConfig as config
from confluent_kafka import OFFSET_BEGINNING


Expand All @@ -29,22 +28,18 @@ def __init__(self):
self.security_props = None
self.kafka_props = {}
self.logger = logging.getLogger(__name__)

with resources.open_text(sysresources, "consumer-properties.json") as f:
self.consumer_props = json.load(f)
f.close()

self.num_messages = self.consumer_props[config.NUM_MESSAGES]
self.timeout = self.consumer_props[config.TIMEOUT]
self.kafka_config_loader = KafkaConfigLoader()

def read_schema(self, topic):
auth_config_loader = AuthenticationConfigLoader()
schema_consumer = self.get_consumer(
"Control-" + auth_config_loader.get_client_id(self.security_props))
latest_record = None
num_messages = self.kafka_props[self.kafka_config_loader.NUM_MESSAGES]
timeout = self.kafka_props[self.kafka_config_loader.TIMEOUT]
while True:
schema_messages = schema_consumer.consume(
self.num_messages, self.timeout)
num_messages, timeout)
if not schema_messages:
break
for message in reversed(schema_messages):
Expand Down Expand Up @@ -110,8 +105,8 @@ def get_consumer(self, client_id):
if IsItPyTest.is_py_test():
self.kafka_props = KafkaConfigLoader.load_test_config()

self.kafka_props[config.AUTO_OFFSET_RESET_CONFIG] = 'earliest'
self.kafka_props[config.GROUP_ID_CONFIG] = f'{client_id}1'
self.kafka_props[self.kafka_config_loader.AUTO_OFFSET_RESET_CONFIG] = 'earliest'
self.kafka_props[self.kafka_config_loader.GROUP_ID_CONFIG] = f'{client_id}1'

kafka_avro_consumer = KafkaAvroConsumer(
self.kafka_props, ctrl_msg_schema)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class KafkaConfigLoader:

def __init__(self):
self.BOOTSTRAP_SERVERS = "bootstrap.servers"
self.AUTO_OFFSET_RESET_CONFIG = 'auto.offset.reset'
self.GROUP_ID_CONFIG = 'group.id'
self.TIMEOUT = 'timeout'
self.NUM_MESSAGES = 'num_messages'
self.logger = logging.getLogger(__name__)

@staticmethod
Expand All @@ -38,5 +42,12 @@ def validate_and_add_specific_properties(self, p):
if not p[self.BOOTSTRAP_SERVERS]:
raise Exception(
"bootstrap.servers Properties is not set in the Kafka Configuration")
if not p[self.AUTO_OFFSET_RESET_CONFIG]:
self.AUTO_OFFSET_RESET_CONFIG = "earliest"
if self.TIMEOUT not in p:
p[self.TIMEOUT] = 10
if self.NUM_MESSAGES not in p:
p[self.NUM_MESSAGES] = 500

self.nasdaq_specific_config(p)
return p
14 changes: 4 additions & 10 deletions ncdssdk_client/src/main/python/ncdsclient/NCDSSession.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
import sys
import logging
from ncdssdk.src.main.python.ncdsclient.NCDSClient import NCDSClient
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader
from ncdssdk_client.src.main.python.ncdsclient.utils.ValidateInput import ValidateInput
from confluent_kafka import KafkaException
from importlib import resources
import ncdssdk_client.src.main.python.resources as configresources
import ncdssdk.src.main.resources as sysresources
from ncdssdk.src.main.python.ncdsclient.internal.utils import ConsumerConfig
import logging


Expand All @@ -35,6 +35,7 @@ def __init__(self, cmd):
def main(self):
self.security_cfg = load_auth_properties(self.auth_props_file)
self.kafka_cfg = load_kafka_config(self.kafka_props_file)
self.kafka_config_loader = KafkaConfigLoader()

cmd_to_validate = ValidateInput(self.cmd)
cmd_to_validate.validate_user_input()
Expand Down Expand Up @@ -118,12 +119,11 @@ def cont_stream_cmd(self):

try:
while True:
message = consumer.poll(sys.maxsize)
message = consumer.poll(self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES])
if message is None:
print(f"No Records Found for the Topic: {self.topic}")
else:
print(f"value :" + str(message.value()))
consumer.commit(message=message, asynchronous=True)

except KafkaException as e:
logging.exception(f"Error in cont stream {e.args[0].str()}")
Expand All @@ -144,14 +144,10 @@ def filter_stream_cmd(self):
consumer = ncds_client.ncds_kafka_consumer(
self.topic) if not self.timestamp else ncds_client.ncds_kafka_consumer(self.topic, self.timestamp)

with resources.open_text(sysresources, "consumer-properties.json") as f:
consumer_props = json.load(f)
f.close()

try:
while True:
messages = consumer.consume(
consumer_props[ConsumerConfig.NUM_MESSAGES], consumer_props[ConsumerConfig.TIMEOUT])
self.kafka_cfg[self.kafka_config_loader.NUM_MESSAGES], self.kafka_cfg[self.kafka_config_loader.TIMEOUT])
self.logger.debug(
f"number of messages consumed: {len(messages)}")
if len(messages) == 0:
Expand All @@ -172,8 +168,6 @@ def filter_stream_cmd(self):

except KeyError:
pass

consumer.commit(message=message, asynchronous=True)
except Exception as e:
logging.exception(f"Error in filter stream: {e}")

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name='ncdssdk',
version='0.2.0',
version='0.4.0',
description='A Python SDK for developing applications to access the NCDS API',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down

0 comments on commit 6b81b70

Please sign in to comment.