Skip to content

Commit

Permalink
chore: adds pin history
Browse files Browse the repository at this point in the history
  • Loading branch information
Russell Green committed May 29, 2023
1 parent 33dc5f0 commit 3b0ca68
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 35 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/docker-publish.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
name: Create and publish a Docker image

on:
push:
branches: ["main"]
release:
types: [published]

env:
REGISTRY: ghcr.io
Expand Down
5 changes: 3 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"git.ignoreLimitWarning": true,
"cSpell.words": [],
"[python]": {
"editor.defaultFormatter": "ms-python.python"
}
"editor.defaultFormatter": "ms-python.black-formatter"
},
"python.formatting.provider": "none"
}
94 changes: 63 additions & 31 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import sys
import os
import datetime
from enum import Enum
import logging as log
import RPi.GPIO as GPIO
Expand All @@ -41,35 +42,35 @@ class InitialPinBehavior(Enum):
UNMODIFIED = 3


host = os.getenv('SERVER_HOST') or '0.0.0.0'
port = os.getenv('SERVER_PORT') or 5000
log_level = os.getenv('SERVER_LOG_LEVEL') or log.INFO
host = os.getenv("SERVER_HOST") or "0.0.0.0"
port = os.getenv("SERVER_PORT") or 5000
log_level = os.getenv("SERVER_LOG_LEVEL") or log.WARN
debug = False
gpio_pins = (7, 11, 12, 13, 15, 16, 18, 22, 29, 31, 32, 33, 35, 36, 37, 38, 40)
gpio_pin_history = {}
initial_pin_state = InitialPinBehavior.DEFAULT
app = Flask(__name__)
CORS(app)
app.url_map.strict_slashes = False


log.basicConfig(
level=log_level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
log.StreamHandler(sys.stdout)
]
handlers=[log.StreamHandler(sys.stdout)],
)


@app.route('/healthz')
@app.route("/healthz")
def health_check():
"""Health check
Returns a value to ensure the service is up.
"""
return 'healthy'
return "healthy"


@app.route('/version')
@app.route("/version")
def version():
"""Version
Expand All @@ -78,22 +79,24 @@ def version():
return __version__


@app.route('/pins')
@app.route("/pins")
def get_all_pins():
"""Get pins
Retries a list of all GPIO pins and their current values.
"""
result = []
for pin in gpio_pins:
pin_result = {'pin': pin, 'value': get_pin_value(pin)}
pin_result = {"pin": pin, "value": get_pin_value(pin)}
history = get_pin_history(pin)
pin_result.update(history)
result.append(pin_result)

log.info("Retrieved values for all pins")
return jsonify(result)


@app.route('/pins/all/<int:value>')
@app.route("/pins/all/<int:value>")
def set_all_pins(value):
"""Set pins to value
Expand All @@ -103,45 +106,50 @@ def set_all_pins(value):
for pin in gpio_pins:
new_value = set_get_pin_value(pin, value)
changed = value != new_value
pin_result = {'pin': pin, 'value': get_pin_value(
pin), 'changed': changed}
pin_result = {"pin": pin, "value": get_pin_value(pin), "changed": changed}

history = get_pin_history(pin)
pin_result.update(history)
result.append(pin_result)

log.info("Set value of all pins to {}".format(value))
return jsonify(result)


@app.route('/pins/<int:pin>')
@app.route("/pins/<int:pin>")
def get_pin(pin):
"""Get pin value
Gets the current value for a given GPIO pin.
"""
pin_value = get_pin_value(pin)
log.info("Retrieved value for pin '{}' (value: {})".format(
str(pin), str(pin_value)))
log.info(
"Retrieved value for pin '{}' (value: {})".format(str(pin), str(pin_value))
)

return str(pin_value)


@app.route('/pins/<int:pin>/<int:value>')
@app.route("/pins/<int:pin>/<int:value>")
def set_pin(pin, value):
"""Set pin value
Sets a GPIO pin to a specified value.
"""
pin_value = set_get_pin_value(pin, value)

if (value != pin_value):
if value != pin_value:
msg = "Failed to set pin '{}'. Expected {} to be {}".format(
pin, pin_value, value)
pin, pin_value, value
)
log.exception(msg)
raise Exception(msg)

log.info("Set value for pin '{}' (value: {})".format(pin, pin_value))
return str(pin_value)


@app.route('/sensors/dht11/<int:pin>')
@app.route("/sensors/dht11/<int:pin>")
def get_sensor_dht11(pin):
"""Get DHT11 sensor reading
Expand All @@ -154,20 +162,27 @@ def get_sensor_dht11(pin):
return jsonify(result.to_dict())


@app.route('/sensors/hcsr04/<int:trigger_pin>/<int:echo_pin>')
@app.route("/sensors/hcsr04/<int:trigger_pin>/<int:echo_pin>")
def get_sensor_hcsr04(trigger_pin, echo_pin):
"""Get HC-SR04 sensor reading
Gets a reading for a HC-SR04 ultrasonic sonar distance sensor.
See: https://adafru.it/3942
"""
log.info(
"Reading HC-SR04 sensor for pin 'trigger: {}, echo: {}'".format(trigger_pin, echo_pin))
args = {'trigger_pin': trigger_pin, 'echo_pin': echo_pin}
"Reading HC-SR04 sensor for pin 'trigger: {}, echo: {}'".format(
trigger_pin, echo_pin
)
)
args = {"trigger_pin": trigger_pin, "echo_pin": echo_pin}
args.update(request.args)

sensor = HCSR04(**args)
result = sensor.read()
log.info("Retrieved HC-SR04 sensor reading for pin 'trigger: {}, echo: {}'".format(trigger_pin, echo_pin))
log.info(
"Retrieved HC-SR04 sensor reading for pin 'trigger: {}, echo: {}'".format(
trigger_pin, echo_pin
)
)
return jsonify(result)


Expand All @@ -194,6 +209,7 @@ def set_pin_value(pin, value):
Sets a GPIO pin value.
"""
GPIO.output(pin, value)
set_pin_history(pin)


def set_get_pin_value(pin, value):
Expand All @@ -205,6 +221,19 @@ def set_get_pin_value(pin, value):
return get_pin_value(pin)


def get_pin_history(pin):
return gpio_pin_history[pin] or {"lastValue": None}


def set_pin_history(pin):
history = {"lastChange": datetime.datetime.now()}
if not gpio_pin_history[pin]:
gpio_pin_history[pin] = history

record = gpio_pin_history[pin]
record.update(history)


def setup_gpio():
"""Sets up the GPIO pins
Expand Down Expand Up @@ -236,15 +265,18 @@ def set_initial_state(pin):
GPIO.output(pin, state)


if __name__ == '__main__':
log.info('Starting app at {}:{} (debug:{}). Version {}'.format(
host, port, debug, __version__))
if __name__ == "__main__":
log.info(
"Starting app at {}:{} (debug:{}). Version {}".format(
host, port, debug, __version__
)
)
try:
setup_gpio()
app.run(debug=debug, host=host, port=port)
except Exception as e:
log.error('Fatal application error occurred: {}'.format(e))
log.error("Fatal application error occurred: {}".format(e))
finally:
log.debug('App is shutting down, cleaning up GPIO')
log.debug("App is shutting down, cleaning up GPIO")
GPIO.cleanup()
log.info('GPIO has been cleaned up')
log.info("GPIO has been cleaned up")

0 comments on commit 3b0ca68

Please sign in to comment.