Skip to content

Commit

Permalink
patch: eventDispatcher metrics, remove event type label and introduce…
Browse files Browse the repository at this point in the history
… scheme label

- remove event type label
- add scheme label
- add tests for eventDispatcher metrics
  • Loading branch information
denopink committed Oct 14, 2024
1 parent 5359d6d commit 6c3cc9c
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 183 deletions.
2 changes: 1 addition & 1 deletion deviceStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func statusMetadata(d device.Interface) map[string]string {
}

func statusEventType(id device.ID, subtype string) string {
return fmt.Sprintf("device-status/%s/%s", id, subtype)
return fmt.Sprintf("%s/%s/%s", DeviceStatusEventScheme, id, subtype)
}

func onlinePayload(t time.Time, d device.Interface) []byte {
Expand Down
4 changes: 2 additions & 2 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ type outboundEnvelope struct {
cancel func()
}

// eventTypeContextKey is the internal key type for storing the event type
type eventTypeContextKey struct{}
// schemeContextKey is the internal key type for storing the event type
type schemeContextKey struct{}
114 changes: 61 additions & 53 deletions eventDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"io"
"net/http"
"runtime/debug"
"strings"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -88,21 +87,18 @@ func NewEventDispatcher(om OutboundMeasures, o *Outbounder, urlFilter URLFilter)
func (d *eventDispatcher) OnDeviceEvent(event *device.Event) {
// TODO improve how we test dispatchEvent & dispatchTo
var (
err error
message *wrp.Message
eventType = unknown
url = unknown
code = messageDroppedCode
err error
scheme = unknown
)

defer func() {
if r := recover(); nil != r {
d.logger.Debug("stacktrace from panic", zap.String("stacktrace", string(debug.Stack())), zap.Any("panic", r))
switch event.Type {
case device.Connect, device.Disconnect, device.MessageReceived:
d.logger.Error("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, panicReason), zap.String(urlLabel, url), zap.Any("panic", r))
d.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: panicReason, urlLabel: url}).Add(1.0)
d.outboundEvents.With(prometheus.Labels{eventLabel: eventType, reasonLabel: panicReason, urlLabel: url, outcomeLabel: failureOutcome}).Add(1.0)
d.logger.Error("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, panicReason), zap.Any("panic", r))
d.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: messageDroppedCode, reasonLabel: panicReason}).Add(1.0)
d.outboundEvents.With(prometheus.Labels{schemeLabel: scheme, reasonLabel: panicReason, outcomeLabel: failureOutcome}).Add(1.0)
}
}
}()
Expand All @@ -114,70 +110,81 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) {

switch event.Type {
case device.Connect:
eventType, message = newOnlineMessage(d.source, event.Device)
url, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message)
scheme = wrp.SchemeEvent
eventType, message := newOnlineMessage(d.source, event.Device)
_, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message)
if err != nil {
d.logger.Error("Error dispatching online event", zap.Any("eventType", eventType), zap.Any("destination", message.Destination), zap.Error(err))
}

case device.Disconnect:
eventType, message = newOfflineMessage(d.source, event.Device)
url, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message)
scheme = wrp.SchemeEvent
eventType, message := newOfflineMessage(d.source, event.Device)
_, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message)
if err != nil {
d.logger.Error("Error dispatching offline event", zap.Any("eventType", eventType), zap.Any("destination", message.Destination), zap.Error(err))
}
case device.MessageReceived:
if routable, ok := event.Message.(wrp.Routable); ok {
destination := routable.To()
contentType := event.Format.ContentType()
if strings.HasPrefix(destination, EventPrefix) {
var l wrp.Locator
if l, err = wrp.ParseLocator(destination); err == nil {
eventType = l.Authority
url, err = d.dispatchEvent(eventType, contentType, event.Contents)
if err != nil {
d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err))
}
}
} else if strings.HasPrefix(destination, DNSPrefix) {
eventType = event.Type.String()
unfilteredURL := destination[len(DNSPrefix):]
url, err = d.dispatchTo(unfilteredURL, contentType, event.Contents, eventType)
if err != nil {
d.logger.Error("Error dispatching to endpoint", zap.Any("destination", destination), zap.Error(err))
}
} else {
eventType = event.Type.String()
err = ErrorUnroutableDestination
d.logger.Error("Unroutable destination", zap.Any("destination", destination))
}
scheme, err = d.routeMessageReceivedEvent(event)
if err != nil {
scheme = unknown
}
default:
eventType = event.Type.String()
err = ErrorUnsupportedEvent
if routable, ok := event.Message.(wrp.Routable); ok {
url = routable.To()
}
}

var outboundEventsLabels prometheus.Labels
if err != nil {
reason := getDroppedMessageReason(err)
outboundEventsLabels = prometheus.Labels{eventLabel: eventType, reasonLabel: reason, urlLabel: url, outcomeLabel: failureOutcome}
outboundEventsLabels = prometheus.Labels{schemeLabel: scheme, reasonLabel: reason, outcomeLabel: failureOutcome}
if errors.Is(err, ErrorUnsupportedEvent) {
d.logger.Debug("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.String(urlLabel, url), zap.Error(err))
d.logger.Debug("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, reason), zap.Error(err))
} else {
d.logger.Error("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.String(urlLabel, url), zap.Error(err))
d.logger.Error("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, reason), zap.Error(err))
}

d.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: reason, urlLabel: url}).Add(1.0)
d.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: messageDroppedCode, reasonLabel: reason}).Add(1.0)
} else {
outboundEventsLabels = prometheus.Labels{eventLabel: eventType, reasonLabel: noErrReason, urlLabel: url, outcomeLabel: successOutcome}
outboundEventsLabels = prometheus.Labels{schemeLabel: scheme, reasonLabel: noErrReason, outcomeLabel: successOutcome}
}

d.outboundEvents.With(outboundEventsLabels).Add(1.0)
}

func (d *eventDispatcher) routeMessageReceivedEvent(event *device.Event) (scheme string, err error) {
routable, ok := event.Message.(wrp.Routable)
if !ok {
return "", errors.New("wrp event message is not routable")
}

destination := routable.To()
contentType := event.Format.ContentType()
var l wrp.Locator
if l, err = wrp.ParseLocator(destination); err != nil {
return "", err
}

scheme = l.Scheme
eventType := l.Authority
switch scheme {
case wrp.SchemeEvent:
_, err = d.dispatchEvent(eventType, contentType, event.Contents)
case wrp.SchemeDNS:
url := l.Authority + l.Ignored
// `l.Authority + l.Ignored` is used because incoming dns events are expected to have the format `dns:some_url` or dns:some_scheme://some_url.
_, err = d.dispatchTo(url, contentType, event.Contents)
default:
scheme = unknown
err = ErrorUnroutableDestination
}

if err != nil {
d.logger.Error("Error dispatching event", zap.String(schemeLabel, scheme), zap.Any("destination", destination), zap.Error(err))
}

return scheme, err
}

// send wraps the given request in an outboundEnvelope together with a cancellable context,
// then asynchronously sends that request to the outbounds channel. This method will
// block on the outbound channel only as long as the context is not canceled, i.e. does not time out.
Expand Down Expand Up @@ -214,18 +221,18 @@ func (d *eventDispatcher) newRequest(url, contentType string, body io.Reader) (*
return request, nil
}

func (d *eventDispatcher) dispatchEvent(eventType, contentType string, contents []byte) (string, error) {
func (d *eventDispatcher) dispatchEvent(authority, contentType string, contents []byte) (string, error) {
url := unknown
endpoints, ok := d.eventMap.Get(eventType, DefaultEventType)
endpoints, ok := d.eventMap.Get(authority, DefaultEventType)
if !ok {
// allow no endpoints, but log an error since this means that we're dropping
// traffic explicitly because of configuration
return url, fmt.Errorf("%w: %s", ErrorNoEndpointConfiguredForEvent, eventType)
return url, fmt.Errorf("%w: %s", ErrorNoEndpointConfiguredForEvent, authority)
}

ctx := context.WithValue(
context.Background(), eventTypeContextKey{},
eventType,
context.Background(), schemeContextKey{},
wrp.SchemeEvent,
)

for _, url = range endpoints {
Expand Down Expand Up @@ -264,7 +271,7 @@ func (d *eventDispatcher) encodeAndDispatchEvent(eventType string, format wrp.Fo
return url, nil
}

func (d *eventDispatcher) dispatchTo(unfiltered string, contentType string, contents []byte, eventType string) (string, error) {
func (d *eventDispatcher) dispatchTo(unfiltered string, contentType string, contents []byte) (string, error) {
var (
err error
url = unfiltered
Expand All @@ -281,7 +288,8 @@ func (d *eventDispatcher) dispatchTo(unfiltered string, contentType string, cont
}

return request.URL.String(), d.send(
context.WithValue(context.Background(), eventTypeContextKey{}, eventType),
context.WithValue(context.Background(), schemeContextKey{},
wrp.SchemeDNS),
request,
)
}
Expand Down
Loading

0 comments on commit 6c3cc9c

Please sign in to comment.