diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index d2243e38..0bc26698 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -29,7 +29,9 @@ class ForwarderConfig: Class that converts the pv information to a forwarder config message payload """ - def __init__(self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144") -> None: + def __init__( + self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144" + ) -> None: self.schema = schema self.topic = topic self.epics_protocol = epics_protocol diff --git a/BlockServerToKafka/kafka_producer.py b/BlockServerToKafka/kafka_producer.py index d2cab5d3..40d5c618 100644 --- a/BlockServerToKafka/kafka_producer.py +++ b/BlockServerToKafka/kafka_producer.py @@ -17,7 +17,7 @@ from typing import List from kafka import KafkaConsumer, KafkaProducer, errors -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.Protocol import ( +from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py index 12db74c8..3ec02e2e 100644 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ b/BlockServerToKafka/test_modules/test_forwarder_config.py @@ -15,11 +15,11 @@ # http://opensource.org/licenses/eclipse-1.0.php import unittest -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.Protocol import ( +from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.UpdateType import UpdateType -from streaming_data_types.forwarder_config_update_rf5k import deserialise_rf5k +from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import UpdateType +from streaming_data_types.forwarder_config_update_fc00 import deserialise_fc00 from BlockServerToKafka.forwarder_config import ForwarderConfig @@ -33,7 +33,7 @@ class TestForwarderConfig(unittest.TestCase): @staticmethod def is_flatbuffers(payload): try: - deserialise_rf5k(payload) + deserialise_fc00(payload) except ValueError: return False return True @@ -51,24 +51,24 @@ def test_WHEN_new_forwarder_config_created_THEN_returns_configuration_update_con self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.ADD) def test_WHEN_forwarder_config_removed_THEN_output_has_correct_command_type(self): raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.REMOVE) def test_WHEN_all_pvs_removed_THEN_output_has_correct_command_type(self): raw_output = self.kafka_forwarder.remove_all_forwarder_configuration() - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.REMOVEALL) def test_WHEN_new_forwarder_config_created_THEN_returns_flatbuffer_containing_streams_with_channels_and_converters( self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: self.assertEqual(self.test_block_1, stream.channel) @@ -80,7 +80,7 @@ def test_GIVEN_using_version_4_WHEN_new_forwarder_config_created_THEN_returns_JS ): kafka_version_4 = ForwarderConfig(epics_protocol=Protocol.PVA, topic=self.test_topic) raw_output = kafka_version_4.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: self.assertEqual(stream.protocol, Protocol.PVA) @@ -89,7 +89,7 @@ def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_TH self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(1, len(output[1])) def test_GIVEN_configuration_with_two_block_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_two_stream( @@ -98,14 +98,14 @@ def test_GIVEN_configuration_with_two_block_WHEN_new_forwarder_config_created_TH raw_output = self.kafka_forwarder.create_forwarder_configuration( self.config_with_two_blocks ) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(2, len(output[1])) def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_block_pv_string( self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) stream = output[1][0] self.assertEqual(self.test_block_1, stream.channel) @@ -115,7 +115,7 @@ def test_GIVEN_configuration_with_two_blocks_WHEN_new_forwarder_config_created_T raw_output = self.kafka_forwarder.create_forwarder_configuration( self.config_with_two_blocks ) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) for blk in [self.test_block_1, self.test_block_2]: self.assertTrue(blk in [stream.channel for stream in output[1]]) @@ -127,7 +127,7 @@ def test_GIVEN_configuration_with_one_block_WHEN_removed_old_forwarder_THEN_retu self, ): raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(self.test_block_1, output[1][0].channel) def test_GIVEN_configuration_with_two_blocks_WHEN_removed_old_forwarder_THEN_returns_JSON_containing_both_block_pv_string( @@ -136,6 +136,6 @@ def test_GIVEN_configuration_with_two_blocks_WHEN_removed_old_forwarder_THEN_ret raw_output = self.kafka_forwarder.remove_forwarder_configuration( self.config_with_two_blocks ) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) for blk in [self.test_block_1, self.test_block_2]: self.assertTrue(blk in [stream.channel for stream in output[1]])