Skip to content

Commit

Permalink
Merge pull request #176 from spit4520/master
Browse files Browse the repository at this point in the history
Added in GET /connections to update restarting node
  • Loading branch information
chowyu12 authored Dec 11, 2023
2 parents 805a7b8 + 4f98fae commit e3fa657
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 10 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ log/*
.vscode/settings.json
.pre-commit-config.yaml
hmq.exe
*.sw*
*.swo
*.swp
*.swn
42 changes: 38 additions & 4 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"sync"
"time"
encJson "encoding/json"

"github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics"
Expand Down Expand Up @@ -406,8 +407,19 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) error{
}
}
b.clients.Store(cid, c)

pubInfo := Info{
ClientID: info.clientID,
Username: info.username,
Password: info.password,
Keepalive: info.keepalive,
WillMsg: &PubPacket{
TopicName: info.willMsg.TopicName,
Payload: info.willMsg.Payload,
},
}

b.OnlineOfflineNotification(cid, true)
b.OnlineOfflineNotification(pubInfo, true, c.lastMsgTime)
{
b.Publish(&bridge.Elements{
ClientID: msg.ClientIdentifier,
Expand Down Expand Up @@ -698,11 +710,33 @@ func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string) {
b.BroadcastSubOrUnsubMessage(unsub)
}

func (b *Broker) OnlineOfflineNotification(clientID string, online bool) {
type OnlineOfflineMsg struct {
ClientID string `json:"clientID"`
Online bool `json:"online"`
Timestamp string `json:"timestamp"`
ClientInfo Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
}

func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64) {
packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
packet.TopicName = "$SYS/broker/connection/clients/" + clientID
packet.TopicName = "$SYS/broker/connection/clients/" + info.ClientID
packet.Qos = 0
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, clientID, online, time.Now().UTC().Format(time.RFC3339)))

msg := OnlineOfflineMsg{
ClientID: info.ClientID,
Online: online,
Timestamp: time.Now().UTC().Format(time.RFC3339),
ClientInfo: info,
LastMsgTime: lastMsg,
}

if b, err := encJson.Marshal(msg); err != nil {
//This is a TERRIBLE situation, falling back to legacy format to not break API Contract
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, info.ClientID, online, time.Now().UTC().Format(time.RFC3339)))
} else {
packet.Payload = b
}

b.PublishMessage(packet)
}
29 changes: 28 additions & 1 deletion broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type client struct {
mqueue *queue.Queue
retryTimer *time.Timer
retryTimerLock sync.Mutex
lastMsgTime int64
}

type InflightStatus uint8
Expand Down Expand Up @@ -111,6 +112,19 @@ type info struct {
remoteIP string
}

type PubPacket struct {
TopicName string `json:"topicName"`
Payload []byte `json:"payload"`
}

type Info struct {
ClientID string `json:"clientId"`
Username string `json:"username"`
Password []byte `json:"password"`
Keepalive uint16 `json:"keepalive"`
WillMsg *PubPacket `json:"willMsg"`
}

type route struct {
remoteID string
remoteUrl string
Expand All @@ -122,6 +136,7 @@ var (
)

func (c *client) init() {
c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged
c.status = Connected
c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String())
remoteAddr := c.conn.RemoteAddr()
Expand Down Expand Up @@ -185,6 +200,8 @@ func (c *client) readLoop() {
if _, isDisconnect := packet.(*packets.DisconnectPacket); isDisconnect {
c.info.willMsg = nil
c.cancelFunc()
} else {
c.lastMsgTime = time.Now().Unix()
}

msg := &Message{
Expand Down Expand Up @@ -842,8 +859,18 @@ func (c *client) Close() {

if c.typ == CLIENT {
b.BroadcastUnSubscribe(unSubTopics)
pubInfo := Info{
ClientID: c.info.clientID,
Username: c.info.username,
Password: c.info.password,
Keepalive: c.info.keepalive,
WillMsg: &PubPacket{
TopicName: c.info.willMsg.TopicName,
Payload: c.info.willMsg.Payload,
},
}
//offline notification
b.OnlineOfflineNotification(c.info.clientID, false)
b.OnlineOfflineNotification(pubInfo, false, c.lastMsgTime)
}

if c.info.willMsg != nil {
Expand Down
47 changes: 42 additions & 5 deletions broker/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,24 @@ import (
"github.com/gin-gonic/gin"
)

const (
CONNECTIONS = "api/v1/connections"
)

type ConnClient struct {
Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
}

type resp struct {
Code int `json:"code,omitempty"`
Clients []ConnClient `json:"clients,omitempty"`
}

func InitHTTPMoniter(b *Broker) {
gin.SetMode(gin.ReleaseMode)
router := gin.Default()
router.DELETE("api/v1/connections/:clientid", func(c *gin.Context) {
router.DELETE(CONNECTIONS + "/:clientid", func(c *gin.Context) {
clientid := c.Param("clientid")
cli, ok := b.clients.Load(clientid)
if ok {
Expand All @@ -16,10 +30,33 @@ func InitHTTPMoniter(b *Broker) {
conn.Close()
}
}
resp := map[string]int{
"code": 0,
}
c.JSON(200, &resp)
r := resp{Code: 0}
c.JSON(200, &r)
})
router.GET(CONNECTIONS, func(c *gin.Context) {
conns := make([]ConnClient, 0)
b.clients.Range(func (k, v interface{}) bool {
cl, _ := v.(*client)

msg := ConnClient{
Info: Info{
ClientID: cl.info.clientID,
Username: cl.info.username,
Password: cl.info.password,
Keepalive: cl.info.keepalive,
WillMsg: &PubPacket{
TopicName: cl.info.willMsg.TopicName,
Payload: cl.info.willMsg.Payload,
},
},
LastMsgTime: cl.lastMsgTime,
}

conns = append(conns, msg)
return true
})
r := resp{Clients: conns}
c.JSON(200, &r)
})

router.Run(":" + b.config.HTTPPort)
Expand Down

0 comments on commit e3fa657

Please sign in to comment.