-
Notifications
You must be signed in to change notification settings - Fork 230
/
debug.go
237 lines (200 loc) · 7.16 KB
/
debug.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
package debug
import (
"fmt"
"log/slog"
"strings"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
)
// Options contains configuration settings for the debug output.
type Options struct {
Enable bool `yaml:"enable" json:"enable"` // non-zero field for enabling hook using file-based config
ShowPacketData bool `yaml:"show_packet_data" json:"show_packet_data"` // include decoded packet data (default false)
ShowPings bool `yaml:"show_pings" json:"show_pings"` // show ping requests and responses (default false)
ShowPasswords bool `yaml:"show_passwords" json:"show_passwords"` // show connecting user passwords (default false)
}
// Hook is a debugging hook which logs additional low-level information from the server.
type Hook struct {
mqtt.HookBase
config *Options
Log *slog.Logger
}
// ID returns the ID of the hook.
func (h *Hook) ID() string {
return "debug"
}
// Provides indicates that this hook provides all methods.
func (h *Hook) Provides(b byte) bool {
return true
}
// Init is called when the hook is initialized.
func (h *Hook) Init(config any) error {
if _, ok := config.(*Options); !ok && config != nil {
return mqtt.ErrInvalidConfigType
}
if config == nil {
config = new(Options)
}
h.config = config.(*Options)
return nil
}
// SetOpts is called when the hook receives inheritable server parameters.
func (h *Hook) SetOpts(l *slog.Logger, opts *mqtt.HookOptions) {
h.Log = l
h.Log.Debug("", "method", "SetOpts")
}
// Stop is called when the hook is stopped.
func (h *Hook) Stop() error {
h.Log.Debug("", "method", "Stop")
return nil
}
// OnStarted is called when the server starts.
func (h *Hook) OnStarted() {
h.Log.Debug("", "method", "OnStarted")
}
// OnStopped is called when the server stops.
func (h *Hook) OnStopped() {
h.Log.Debug("", "method", "OnStopped")
}
// OnPacketRead is called when a new packet is received from a client.
func (h *Hook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
if (pk.FixedHeader.Type == packets.Pingresp || pk.FixedHeader.Type == packets.Pingreq) && !h.config.ShowPings {
return pk, nil
}
h.Log.Debug(fmt.Sprintf("%s << %s", strings.ToUpper(packets.PacketNames[pk.FixedHeader.Type]), cl.ID), "m", h.packetMeta(pk))
return pk, nil
}
// OnPacketSent is called when a packet is sent to a client.
func (h *Hook) OnPacketSent(cl *mqtt.Client, pk packets.Packet, b []byte) {
if (pk.FixedHeader.Type == packets.Pingresp || pk.FixedHeader.Type == packets.Pingreq) && !h.config.ShowPings {
return
}
h.Log.Debug(fmt.Sprintf("%s >> %s", strings.ToUpper(packets.PacketNames[pk.FixedHeader.Type]), cl.ID), "m", h.packetMeta(pk))
}
// OnRetainMessage is called when a published message is retained (or retain deleted/modified).
func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
h.Log.Debug("retained message on topic", "m", h.packetMeta(pk))
}
// OnQosPublish is called when a publish packet with Qos is issued to a subscriber.
func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, resends int) {
h.Log.Debug("inflight out", "m", h.packetMeta(pk))
}
// OnQosComplete is called when the Qos flow for a message has been completed.
func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
h.Log.Debug("inflight complete", "m", h.packetMeta(pk))
}
// OnQosDropped is called the Qos flow for a message expires.
func (h *Hook) OnQosDropped(cl *mqtt.Client, pk packets.Packet) {
h.Log.Debug("inflight dropped", "m", h.packetMeta(pk))
}
// OnLWTSent is called when a Will Message has been issued from a disconnecting client.
func (h *Hook) OnLWTSent(cl *mqtt.Client, pk packets.Packet) {
h.Log.Debug("sent lwt for client", "method", "OnLWTSent", "client", cl.ID)
}
// OnRetainedExpired is called when the server clears expired retained messages.
func (h *Hook) OnRetainedExpired(filter string) {
h.Log.Debug("retained message expired", "method", "OnRetainedExpired", "topic", filter)
}
// OnClientExpired is called when the server clears an expired client.
func (h *Hook) OnClientExpired(cl *mqtt.Client) {
h.Log.Debug("client session expired", "method", "OnClientExpired", "client", cl.ID)
}
// StoredClients is called when the server restores clients from a store.
func (h *Hook) StoredClients() (v []storage.Client, err error) {
h.Log.Debug("", "method", "StoredClients")
return v, nil
}
// StoredSubscriptions is called when the server restores subscriptions from a store.
func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) {
h.Log.Debug("", "method", "StoredSubscriptions")
return v, nil
}
// StoredRetainedMessages is called when the server restores retained messages from a store.
func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) {
h.Log.Debug("", "method", "StoredRetainedMessages")
return v, nil
}
// StoredInflightMessages is called when the server restores inflight messages from a store.
func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) {
h.Log.Debug("", "method", "StoredInflightMessages")
return v, nil
}
// StoredSysInfo is called when the server restores system info from a store.
func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
h.Log.Debug("", "method", "StoredSysInfo")
return v, nil
}
// packetMeta adds additional type-specific metadata to the debug logs.
func (h *Hook) packetMeta(pk packets.Packet) map[string]any {
m := map[string]any{}
switch pk.FixedHeader.Type {
case packets.Connect:
m["id"] = pk.Connect.ClientIdentifier
m["clean"] = pk.Connect.Clean
m["keepalive"] = pk.Connect.Keepalive
m["version"] = pk.ProtocolVersion
m["username"] = string(pk.Connect.Username)
if h.config.ShowPasswords {
m["password"] = string(pk.Connect.Password)
}
if pk.Connect.WillFlag {
m["will_topic"] = pk.Connect.WillTopic
m["will_payload"] = string(pk.Connect.WillPayload)
}
case packets.Publish:
m["topic"] = pk.TopicName
m["payload"] = string(pk.Payload)
m["raw"] = pk.Payload
m["qos"] = pk.FixedHeader.Qos
m["id"] = pk.PacketID
case packets.Connack:
fallthrough
case packets.Disconnect:
fallthrough
case packets.Puback:
fallthrough
case packets.Pubrec:
fallthrough
case packets.Pubrel:
fallthrough
case packets.Pubcomp:
m["id"] = pk.PacketID
m["reason"] = int(pk.ReasonCode)
if pk.ReasonCode > packets.CodeSuccess.Code && pk.ProtocolVersion == 5 {
m["reason_string"] = pk.Properties.ReasonString
}
case packets.Subscribe:
f := map[string]int{}
ids := map[string]int{}
for _, v := range pk.Filters {
f[v.Filter] = int(v.Qos)
ids[v.Filter] = v.Identifier
}
m["filters"] = f
m["subids"] = f
case packets.Unsubscribe:
f := []string{}
for _, v := range pk.Filters {
f = append(f, v.Filter)
}
m["filters"] = f
case packets.Suback:
fallthrough
case packets.Unsuback:
r := []int{}
for _, v := range pk.ReasonCodes {
r = append(r, int(v))
}
m["reasons"] = r
case packets.Auth:
// tbd
}
if h.config.ShowPacketData {
m["packet"] = pk
}
return m
}