Bind-Mount Volume for configuration file
volumes :
- ' ./publish/:/publish/'
- ' ./cfg-data/:/cfg-data/'
Default app configuration
{
"MQTT" : {
"HOST" : " ie-databus" ,
"PORT" : " 1883" ,
"USERNAME" : " edge" ,
"PASSWORD" : " edge"
},
"KAFKA" : {
"HOST" : " 192.168.253.143" ,
"PORT" : " 9092"
},
"CONSUMER_TOPICS" : [
{
"MQTT" : " ie/d/kafka" ,
"KAFKA" : " EdgeDevice"
}
],
"PRODUCER_TOPICS" : [
{
"MQTT" : " ie/d/j/simatic/v1/s7c1/dp/r/#" ,
"KAFKA" : " EdgeDevice" ,
"KEY" : " S7-Connector"
}
]
}
def readJsonFile (path ):
with open (path ) as file :
configJson = json .load (file )
return configJson
print ('Global: reading configuration file' )
try :
configuration = readJsonFile ('./config/config-default.json' )
configJson = readJsonFile ('/cfg-data/config.json' )
except :
print ('Warning, using default configuration because reading config.json file failed' )
client = mqtt .Client ()
client .username_pw_set (config .MQTT ['USERNAME' ], config .MQTT ['PASSWORD' ])
client .on_connect = on_connect
Connect MQTT-Client to Databus
client .connect (config .MQTT ['HOST' ], int (config .MQTT ['PORT' ]), 60 )
Subscribe to Topics on Databus
def on_connect (client , userdata , flags , rc ):
print ('Connected to MQTT-Broker with result code ' + str (rc ))
for topic in config .TOPICS :
print ('Subribed to Topic: ' + topic ['MQTT' ])
client .subscribe (topic ['MQTT' ])
Publish to Topic on Databus
client .publish (topic , msg )
def on_message (client , userdata , msg ):
print (msg .topic + ' ' + str (msg .payload ))
// do something
adminClient = AdminClient ({'bootstrap.servers' : config .KAFKA ['HOST' ] + ':' + config .KAFKA ['PORT' ]})
new_topics = [NewTopic (topic ['KAFKA' ], num_partitions = 3 , replication_factor = 1 ) for topic in config .TOPICS ]
fs = adminClient .create_topics (new_topics )
producer = Producer ({'bootstrap.servers' : config .KAFKA ['HOST' ] + ':' + config .KAFKA ['PORT' ]})
def produce (topic , data , key ):
producer .poll (0 )
producer .produce (topic , data , key = key , callback = delivery_report )
consumer = Consumer ({
'bootstrap.servers' : config .KAFKA ['HOST' ] + ':' + config .KAFKA ['PORT' ],
'group.id' : 'edgedevice' ,
'auto.offset.reset' : 'earliest'
})
kafkaTopics = [topic ['KAFKA' ] for topic in config .TOPICS ]
consumer .subscribe (kafkaTopics )
while True :
msg = consumer .poll (timeout = 1.0 )
if msg is None :
continue
if msg .error ():
print ('Consumer error: {}' .format (msg .error ()))
continue
print (msg .topic ())
for topic in config .TOPICS :
if (topic ['KAFKA' ] == msg .topic ()):
value = msg .value ().decode ('utf-8' )
print ('Received message: {}' .format (value ))
mqtt .publish (topic ['MQTT' ], value )
consumer .close ()