-
Notifications
You must be signed in to change notification settings - Fork 0
/
IotSensor.py
175 lines (145 loc) · 7.55 KB
/
IotSensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#
# Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# This Greengrass example simulates an IoT Sensor sending data to Greengrass at a fixed interval.
# In addition the IoT device also sends a message to update the thing shadow.
# Please refer to the AWS Greengrass Getting Started Guide, Module 5 for more information.
#
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient, AWSIoTMQTTClient
import sys
import logging
import time
import json
import argparse
import os
import re
from itertools import cycle
import random
from gg_discovery_api import GGDiscovery
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException
MAX_DISCOVERY_RETRIES = 10 # MAX tries at discovery before giving up
GROUP_PATH = "./groupCA/" # directory storing discovery info
CA_NAME = "root-ca.crt" # stores GGC CA cert
GGC_ADDR_NAME = "ggc-host" # stores GGC host address
# Custom Shadow callback for updating the desired state in the shadow
def customShadowCallback_Update(payload, responseStatus, token):
# payload is a JSON string ready to be parsed using json.loads(...)
# in both Py2.x and Py3.x
if responseStatus == "timeout":
print("Update request " + token + " time out!")
if responseStatus == "accepted":
payloadDict = json.loads(payload)
print("~~~~~~~~~~Shadow Update Accepted~~~~~~~~~~~~~")
print("Update request with token: " + token + " accepted!")
print("property: " + str(payloadDict["state"]["desired"]["property"]))
print("~~~~~~~~~~~~~~~~~~~~~~~\n\n")
shadow_update_topic = '$aws/things/' + clientId + '/shadow/update'
logger.info("reporting state to shadow: " + shadow_update_topic)
myAWSIoTMQTTClient.publish(shadow_update_topic, json.dumps(str(payloadDict["state"]["desired"]["property"]), indent=4), 0)
if responseStatus == "rejected":
print("Update request " + token + " rejected!")
# function does basic regex check to see if value might be an ip address
def isIpAddress(value):
match = re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}', value)
if match:
return True
return False
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
AllowedActions = ['both', 'publish', 'subscribe']
# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-n", "--thingName", action="store", dest="thingName", default="Bot", help="Targeted thing name")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="Iot-Sensor",
help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="sensor/test/python", help="Targeted topic")
parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
help="Use MQTT over WebSocket")
parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
help="Operation modes: %s"%str(AllowedActions))
parser.add_argument("--connect-to", action="store", dest="connectTo", default="greengrass", help="Where to connect to. Can be either awsiot or greengrass")
args = parser.parse_args()
host = args.host
iotCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
thingName = args.thingName
clientId = args.clientId
port = args.port
useWebsocket = args.useWebsocket
topic = args.topic
connectTo = args.connectTo
coreCAFile = "core-CAs.crt"
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.INFO) # set to logging.DEBUG for additional logging
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
rootCAPath = iotCAPath
if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443
port = 443
if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883
port = 8883
if connectTo == "greengrass":
CAFile = coreCAFile
logger.info("connecting to GREENGRASS: starting discover")
print("starting discover")
discovery = GGDiscovery(clientId, host, 8443, rootCAPath, certificatePath, privateKeyPath)
myAWSIoTMQTTClient = None
if useWebsocket:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
myAWSIoTMQTTClient.connect()
if args.mode == 'both' or args.mode == 'subscribe':
myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)
#myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient(clientId)
#myAWSIoTMQTTShadowClient.configureEndpoint(host, 8883)
#myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTShadowClient configuration
#myAWSIoTMQTTShadowClient.configureAutoReconnectBackoffTime(1, 32, 20)
#myAWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(10) # 10 sec
#myAWSIoTMQTTShadowClient.configureMQTTOperationTimeout(5) # 5 sec
# Connect to AWS IoT
#myAWSIoTMQTTShadowClient.connect()
#deviceShadowHandler = myAWSIoTMQTTShadowClient.createShadowHandlerWithName(thingName, True)
# This loop simulates an IoT sensor generating a random number corresponding to the reading of a piece of equipment.
# This data will be fed into a lambda function which will generate a response after invoking an ML model.
shadow_topics = '$aws/things/' + clientId + '/shadow/update'
loopCount = 0
do = True
while do:
JSONPayload = '{"state":{"desired":{"property":' + '"' + str(random.random()) + '"}}}'
print(JSONPayload)
myAWSIoTMQTTClient.publish(topic, JSONPayload, 1)
logger.info("subscribe and set sdwCallback: topic: " + shadow_topics)
myAWSIoTMQTTClient.subscribe(shadow_topics, 0, customShadowCallback_Update)
logger.info("reporting state to shadow: " + shadow_topics)
myAWSIoTMQTTClient.publish(shadow_topics, JSONPayload, 0)
# myAWSIoTMQTTClient.shadowUpdate(JSONPayload, customShadowCallback_Update, 5)
loopCount += 1
time.sleep(20)