Skip to content

Commit

Permalink
Refactor MQTT endpoints + Introduce End-to-End encryption using RSA a…
Browse files Browse the repository at this point in the history
…nd AES keys + finetune PTZ
  • Loading branch information
cedricve committed Oct 20, 2023
1 parent 24136f8 commit 0c70ab6
Show file tree
Hide file tree
Showing 13 changed files with 610 additions and 303 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ Next to attaching the configuration file, it is also possible to override the co
| `AGENT_KERBEROSVAULT_DIRECTORY` | The directory, in the provider, where the recordings will be stored in. | "" |
| `AGENT_DROPBOX_ACCESS_TOKEN` | The Access Token from your Dropbox app, that is used to leverage the Dropbox SDK. | "" |
| `AGENT_DROPBOX_DIRECTORY` | The directory, in the provider, where the recordings will be stored in. | "" |
| `AGENT_ENCRYPTION` | Enable 'true' or disable 'false' end-to-end encryption through MQTT (recordings will follow). | "false" |
| `AGENT_ENCRYPTION_FINGERPRINT` | The fingerprint of the keypair (public/private keys), so you know which one to use. | "" |
| `AGENT_ENCRYPTION_PRIVATE_KEY` | The private key (assymetric/RSA) to decryptand sign requests send over MQTT. | "" |
| `AGENT_ENCRYPTION_SYMMETRIC_KEY` | The symmetric key (AES) to encrypt and decrypt request send over MQTT. | "" |

## Contribute with Codespaces

Expand Down
3 changes: 2 additions & 1 deletion machinery/data/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,6 @@
"hub_key": "",
"hub_private_key": "",
"hub_site": "",
"condition_uri": ""
"condition_uri": "",
"encryption": {}
}
51 changes: 29 additions & 22 deletions machinery/src/cloud/Cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,19 +458,17 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
// Allocate frame
frame := ffmpeg.AllocVideoFrame()

key := ""
hubKey := ""
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
key = config.S3.Publickey
hubKey = config.S3.Publickey
} else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" {
key = config.KStorage.CloudKey
hubKey = config.KStorage.CloudKey
}
// This is the new way ;)
if config.HubKey != "" {
key = config.HubKey
hubKey = config.HubKey
}

topic := "kerberos/" + key + "/device/" + config.Key + "/live"

lastLivestreamRequest := int64(0)

var cursorError error
Expand All @@ -491,7 +489,27 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
continue
}
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex)
_, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
bytes, _ := computervision.ImageToBytes(&frame.Image)
encoded := base64.StdEncoding.EncodeToString(bytes)

valueMap := make(map[string]interface{})
valueMap["image"] = encoded
message := models.Message{
Payload: models.Payload{
Action: "receive-sd-stream",
DeviceId: configuration.Config.Key,
Value: valueMap,
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
} else {
log.Log.Info("HandleRequestConfig: something went wrong while sending acknowledge config to hub: " + string(payload))
}
}
}

// Cleanup the frame.
Expand All @@ -505,15 +523,6 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
log.Log.Debug("HandleLiveStreamSD: finished")
}

func sendImage(frame *ffmpeg.VideoFrame, topic string, mqttClient mqtt.Client, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
_, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
bytes, _ := computervision.ImageToBytes(&frame.Image)
encoded := base64.StdEncoding.EncodeToString(bytes)
mqttClient.Publish(topic, 0, false, encoded)
}
}

func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, codecs []av.CodecData, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {

config := configuration.Config
Expand All @@ -532,25 +541,23 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod

if config.Capture.ForwardWebRTC == "true" {
// We get a request with an offer, but we'll forward it.
for m := range communication.HandleLiveHDHandshake {
/*for m := range communication.HandleLiveHDHandshake {
// Forward SDP
m.CloudKey = config.Key
request, err := json.Marshal(m)
if err == nil {
mqttClient.Publish("kerberos/webrtc/request", 2, false, request)
}
}
}*/
} else {
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
key := config.Key + "/" + handshake.Cuuid
webrtc.CandidatesMutex.Lock()
key := config.Key + "/" + handshake.SessionID
_, ok := webrtc.CandidateArrays[key]
if !ok {
webrtc.CandidateArrays[key] = make(chan string, 30)
webrtc.CandidateArrays[key] = make(chan string)
}
webrtc.CandidatesMutex.Unlock()
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake, webrtc.CandidateArrays[key])

}
Expand Down
2 changes: 1 addition & 1 deletion machinery/src/components/Kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
}

// Handle livestream HD (high resolution over WEBRTC)
communication.HandleLiveHDHandshake = make(chan models.SDPPayload, 1)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 1)
if subStreamEnabled {
livestreamHDCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, subStreams, subDecoder, &decoderMutex)
Expand Down
18 changes: 16 additions & 2 deletions machinery/src/computervision/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,23 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
if config.Offline != "true" {
if mqttClient != nil {
if hubKey != "" {
mqttClient.Publish("kerberos/"+hubKey+"/device/"+deviceKey+"/motion", 2, false, "motion")
message := models.Message{
Payload: models.Payload{
Action: "motion",
DeviceId: configuration.Config.Key,
Value: map[string]interface{}{
"timestamp": time.Now().Unix(),
},
},
}
payload, err := models.PackageMQTTMessage(configuration, message)
if err == nil {
mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload)
} else {
log.Log.Info("ProcessMotion: failed to package MQTT message: " + err.Error())
}
} else {
mqttClient.Publish("kerberos/device/"+deviceKey+"/motion", 2, false, "motion")
mqttClient.Publish("kerberos/agent/"+deviceKey, 2, false, "motion")
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions machinery/src/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,24 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) {
case "AGENT_DROPBOX_DIRECTORY":
configuration.Config.Dropbox.Directory = value
break

/* When encryption is enabled */
case "AGENT_ENCRYPTION":
if value == "true" {
configuration.Config.Encryption.Enabled = true
} else {
configuration.Config.Encryption.Enabled = false
}
break
case "AGENT_ENCRYPTION_FINGERPRINT":
configuration.Config.Encryption.Fingerprint = value
break
case "AGENT_ENCRYPTION_PRIVATE_KEY":
configuration.Config.Encryption.PrivateKey = value
break
case "AGENT_ENCRYPTION_SYMMETRIC_KEY":
configuration.Config.Encryption.SymmetricKey = value
break
}
}
}
Expand Down
148 changes: 148 additions & 0 deletions machinery/src/encryption/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package encryption

import (
"bytes"
"crypto"
"crypto/aes"
"crypto/cipher"
"crypto/md5"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"encoding/base64"
"errors"
"hash"
)

// DecryptWithPrivateKey decrypts data with private key
func DecryptWithPrivateKey(ciphertext string, privateKey *rsa.PrivateKey) ([]byte, error) {

// decode our encrypted string into cipher bytes
cipheredValue, _ := base64.StdEncoding.DecodeString(ciphertext)

// decrypt the data
out, err := rsa.DecryptPKCS1v15(nil, privateKey, cipheredValue)

return out, err
}

// SignWithPrivateKey signs data with private key
func SignWithPrivateKey(data []byte, privateKey *rsa.PrivateKey) ([]byte, error) {

// hash the data with sha256
hashed := sha256.Sum256(data)

// sign the data
signature, err := rsa.SignPKCS1v15(nil, privateKey, crypto.SHA256, hashed[:])

return signature, err
}

func AesEncrypt(content string, password string) (string, error) {
salt := make([]byte, 8)
_, err := rand.Read(salt)
if err != nil {
return "", err
}
key, iv, err := DefaultEvpKDF([]byte(password), salt)

block, err := aes.NewCipher(key)
if err != nil {
return "", err
}

mode := cipher.NewCBCEncrypter(block, iv)
cipherBytes := PKCS5Padding([]byte(content), aes.BlockSize)
mode.CryptBlocks(cipherBytes, cipherBytes)

data := make([]byte, 16+len(cipherBytes))
copy(data[:8], []byte("Salted__"))
copy(data[8:16], salt)
copy(data[16:], cipherBytes)

cipherText := base64.StdEncoding.EncodeToString(data)
return cipherText, nil
}

func AesDecrypt(cipherText string, password string) (string, error) {
data, err := base64.StdEncoding.DecodeString(cipherText)
if err != nil {
return "", err
}
if string(data[:8]) != "Salted__" {
return "", errors.New("invalid crypto js aes encryption")
}

salt := data[8:16]
cipherBytes := data[16:]
key, iv, err := DefaultEvpKDF([]byte(password), salt)
if err != nil {
return "", err
}

block, err := aes.NewCipher(key)
if err != nil {
return "", err
}

mode := cipher.NewCBCDecrypter(block, iv)
mode.CryptBlocks(cipherBytes, cipherBytes)

result := PKCS5UnPadding(cipherBytes)
return string(result), nil
}

// https://stackoverflow.com/questions/27677236/encryption-in-javascript-and-decryption-with-php/27678978#27678978
// https://github.com/brix/crypto-js/blob/8e6d15bf2e26d6ff0af5277df2604ca12b60a718/src/evpkdf.js#L55
func EvpKDF(password []byte, salt []byte, keySize int, iterations int, hashAlgorithm string) ([]byte, error) {
var block []byte
var hasher hash.Hash
derivedKeyBytes := make([]byte, 0)
switch hashAlgorithm {
case "md5":
hasher = md5.New()
default:
return []byte{}, errors.New("not implement hasher algorithm")
}
for len(derivedKeyBytes) < keySize*4 {
if len(block) > 0 {
hasher.Write(block)
}
hasher.Write(password)
hasher.Write(salt)
block = hasher.Sum([]byte{})
hasher.Reset()

for i := 1; i < iterations; i++ {
hasher.Write(block)
block = hasher.Sum([]byte{})
hasher.Reset()
}
derivedKeyBytes = append(derivedKeyBytes, block...)
}
return derivedKeyBytes[:keySize*4], nil
}

func DefaultEvpKDF(password []byte, salt []byte) (key []byte, iv []byte, err error) {
// https://github.com/brix/crypto-js/blob/8e6d15bf2e26d6ff0af5277df2604ca12b60a718/src/cipher-core.js#L775
keySize := 256 / 32
ivSize := 128 / 32
derivedKeyBytes, err := EvpKDF(password, salt, keySize+ivSize, 1, "md5")
if err != nil {
return []byte{}, []byte{}, err
}
return derivedKeyBytes[:keySize*4], derivedKeyBytes[keySize*4:], nil
}

// https://stackoverflow.com/questions/41579325/golang-how-do-i-decrypt-with-des-cbc-and-pkcs7
func PKCS5UnPadding(src []byte) []byte {
length := len(src)
unpadding := int(src[length-1])
return src[:(length - unpadding)]
}

func PKCS5Padding(src []byte, blockSize int) []byte {
padding := blockSize - len(src)%blockSize
padtext := bytes.Repeat([]byte{byte(padding)}, padding)
return append(src, padtext...)
}
2 changes: 1 addition & 1 deletion machinery/src/models/Communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Communication struct {
HandleHeartBeat chan string
HandleLiveSD chan int64
HandleLiveHDKeepalive chan string
HandleLiveHDHandshake chan SDPPayload
HandleLiveHDHandshake chan RequestHDStreamPayload
HandleLiveHDPeers chan string
HandleONVIF chan OnvifAction
IsConfiguring *abool.AtomicBool
Expand Down
9 changes: 9 additions & 0 deletions machinery/src/models/Config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
HubPrivateKey string `json:"hub_private_key" bson:"hub_private_key"`
HubSite string `json:"hub_site" bson:"hub_site"`
ConditionURI string `json:"condition_uri" bson:"condition_uri"`
Encryption *Encryption `json:"encryption" bson:"encryption"`
}

// Capture defines which camera type (Id) you are using (IP, USB or Raspberry Pi camera),
Expand Down Expand Up @@ -157,3 +158,11 @@ type Dropbox struct {
AccessToken string `json:"access_token,omitempty" bson:"access_token,omitempty"`
Directory string `json:"directory,omitempty" bson:"directory,omitempty"`
}

// Encryption
type Encryption struct {
Enabled bool `json:"enabled" bson:"enabled"`
Fingerprint string `json:"fingerprint" bson:"fingerprint"`
PrivateKey string `json:"private_key" bson:"private_key"`
SymmetricKey string `json:"symmetric_key" bson:"symmetric_key"`
}
Loading

0 comments on commit 0c70ab6

Please sign in to comment.