Skip to content

Latest commit

 

History

History
173 lines (147 loc) · 4.12 KB

implementation.md

File metadata and controls

173 lines (147 loc) · 4.12 KB

Implementation

App Configuration

Bind-Mount Volume for configuration file

volumes:
    - './publish/:/publish/'
    - './cfg-data/:/cfg-data/'

Default app configuration

{
  "MQTT": {
    "HOST": "ie-databus",
    "PORT": "1883",
    "USERNAME": "edge",
    "PASSWORD": "edge"
  },
  "KAFKA": {
    "HOST": "192.168.253.143",
    "PORT": "9092"
  },
  "CONSUMER_TOPICS": [
    {
      "MQTT": "ie/d/kafka",
      "KAFKA": "EdgeDevice"
    }
  ],
  "PRODUCER_TOPICS": [
    {
      "MQTT": "ie/d/j/simatic/v1/s7c1/dp/r/#",
      "KAFKA": "EdgeDevice",
      "KEY": "S7-Connector"
    }
  ]
}

Read configuration file

def readJsonFile(path):
    with open(path) as file:
        configJson = json.load(file)
        return configJson


print('Global: reading configuration file')
try:
   configuration = readJsonFile('./config/config-default.json')
   configJson = readJsonFile('/cfg-data/config.json')
except:
   print('Warning, using default configuration because reading config.json file failed')

Connect to Databus

MQTT-Client options

client = mqtt.Client()
client.username_pw_set(config.MQTT['USERNAME'], config.MQTT['PASSWORD'])
client.on_connect = on_connect

Connect MQTT-Client to Databus

client.connect(config.MQTT['HOST'], int(config.MQTT['PORT']), 60)

Subscribe to Topics on Databus

def on_connect(client, userdata, flags, rc):
    print('Connected to MQTT-Broker with result code ' + str(rc))
    for topic in config.TOPICS:
        print('Subribed to Topic: ' + topic['MQTT'])
        client.subscribe(topic['MQTT'])

Publish to Topic on Databus

client.publish(topic, msg)

On Message

def on_message(client, userdata, msg):
    print(msg.topic + ' ' + str(msg.payload))
    //do something

Connect to Apache Kafka

Admin Client

adminClient = AdminClient({'bootstrap.servers': config.KAFKA['HOST'] + ':' + config.KAFKA['PORT']})

Create Topic

new_topics = [NewTopic(topic['KAFKA'], num_partitions=3, replication_factor=1) for topic in config.TOPICS]
fs = adminClient.create_topics(new_topics)

Producer

producer = Producer({'bootstrap.servers': config.KAFKA['HOST'] + ':' + config.KAFKA['PORT']})

Produce Message

def produce(topic, data, key):
    producer.poll(0)
    producer.produce(topic, data, key=key, callback=delivery_report)

Consumer

consumer = Consumer({
    'bootstrap.servers': config.KAFKA['HOST'] + ':' + config.KAFKA['PORT'],
    'group.id': 'edgedevice',
    'auto.offset.reset': 'earliest'
    })

Subscribe to Topics

kafkaTopics = [topic['KAFKA'] for topic in config.TOPICS]
    
consumer.subscribe(kafkaTopics)

Poll loop

while True:
    msg = consumer.poll(timeout=1.0)

    if msg is None:
        continue
    if msg.error():
        print('Consumer error: {}'.format(msg.error()))
        continue
    print(msg.topic())
    for topic in config.TOPICS:
        if(topic['KAFKA'] == msg.topic()):
            value = msg.value().decode('utf-8')
            print('Received message: {}'.format(value))
            mqtt.publish(topic['MQTT'], value)

consumer.close()