diff --git a/cmd/replicatrd/replicatr/adding.go b/cmd/replicatrd/replicatr/adding.go index 11eafe52..82fa857c 100644 --- a/cmd/replicatrd/replicatr/adding.go +++ b/cmd/replicatrd/replicatr/adding.go @@ -6,19 +6,19 @@ import ( "github.com/Hubmakerlabs/replicatr/pkg/eventstore" "github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize" - "github.com/nbd-wtf/go-nostr" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/tag" ) // AddEvent sends an event through then normal add pipeline, as if it was // received from a websocket. -func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) { +func (rl *Relay) AddEvent(ctx Ctx, evt *Event) (e error) { if evt == nil { e = err.New("error: event is nil") rl.E.Ln(e) return } - for _, rejectors := range rl.RejectEvent { - if reject, msg := rejectors(ctx, evt); reject { + for _, rej := range rl.RejectEvent { + if reject, msg := rej(ctx, evt); reject { if msg == "" { e = err.New("blocked: no reason") rl.E.Ln(e) @@ -36,8 +36,11 @@ func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) { if evt.Kind == 0 || evt.Kind == 3 || (10000 <= evt.Kind && evt.Kind < 20000) { // replaceable event, delete before storing for _, query := range rl.QueryEvents { - var ch chan *nostr.Event - ch, e = query(ctx, &nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}}) + var ch chan *Event + ch, e = query(ctx, &Filter{ + Authors: tag.T{evt.PubKey}, + Kinds: []int{evt.Kind}, + }) if rl.E.Chk(e) { continue } @@ -52,11 +55,11 @@ func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) { d := evt.Tags.GetFirst([]string{"d", ""}) if d != nil { for _, query := range rl.QueryEvents { - var ch chan *nostr.Event - if ch, e = query(ctx, &nostr.Filter{ - Authors: []string{evt.PubKey}, + var ch chan *Event + if ch, e = query(ctx, &Filter{ + Authors: tag.T{evt.PubKey}, Kinds: []int{evt.Kind}, - Tags: nostr.TagMap{"d": []string{d.Value()}}, + Tags: TagMap{"d": []string{d.Value()}}, }); rl.E.Chk(e) { continue } diff --git a/cmd/replicatrd/replicatr/broadcasting.go b/cmd/replicatrd/replicatr/broadcasting.go index b232030c..ec955b70 100644 --- a/cmd/replicatrd/replicatr/broadcasting.go +++ b/cmd/replicatrd/replicatr/broadcasting.go @@ -2,4 +2,4 @@ package replicatr // BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions // it also doesn't attempt to store the event or trigger any reactions or callbacks -func (rl *Relay) BroadcastEvent(evt Event) { notifyListeners(evt) } +func (rl *Relay) BroadcastEvent(evt *Event) { notifyListeners(evt) } diff --git a/cmd/replicatrd/replicatr/deleting.go b/cmd/replicatrd/replicatr/deleting.go index 3f415a97..cf7a1fa1 100644 --- a/cmd/replicatrd/replicatr/deleting.go +++ b/cmd/replicatrd/replicatr/deleting.go @@ -3,17 +3,17 @@ package replicatr import ( "fmt" - "github.com/nbd-wtf/go-nostr" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/tag" ) -func (rl *Relay) handleDeleteRequest(ctx Ctx, evt Event) (e error) { +func (rl *Relay) handleDeleteRequest(ctx Ctx, evt *Event) (e error) { // event deletion -- nip09 - for _, tag := range evt.Tags { - if len(tag) >= 2 && tag[0] == "e" { + for _, t := range evt.Tags { + if len(t) >= 2 && t[0] == "e" { // first we fetch the event for _, query := range rl.QueryEvents { - var ch chan *nostr.Event - if ch, e = query(ctx, &nostr.Filter{IDs: []string{tag[1]}}); rl.E.Chk(e) { + var ch chan *Event + if ch, e = query(ctx, &Filter{IDs: tag.T{t[1]}}); rl.E.Chk(e) { continue } target := <-ch diff --git a/cmd/replicatrd/replicatr/handlers.go b/cmd/replicatrd/replicatr/handlers.go index 8040c6da..5aa53b8e 100644 --- a/cmd/replicatrd/replicatr/handlers.go +++ b/cmd/replicatrd/replicatr/handlers.go @@ -19,7 +19,7 @@ import ( ) // ServeHTTP implements http.Handler interface. -func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (rl *Relay) ServeHTTP(w ResponseWriter, r *Request) { if rl.ServiceURL == "" { rl.ServiceURL = getServiceBaseURL(r) } @@ -32,9 +32,9 @@ func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { +func (rl *Relay) HandleWebsocket(w ResponseWriter, r *Request) { var e error - var conn *websocket.Conn + var conn *Conn conn, e = rl.upgrader.Upgrade(w, r, nil) if rl.E.Chk(e) { rl.E.F("failed to upgrade websocket: %v\n", e) @@ -49,7 +49,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ws := &WebSocket{ conn: conn, Request: r, - Challenge: hex.EncodeToString(challenge), + Challenge: encodeToHex(challenge), } ctx, cancel := context.WithCancel( context.WithValue( @@ -81,12 +81,12 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { return } switch env := envelope.(type) { - case *nostr.EventEnvelope: + case *EventEnvelope: // check id hash := sha256.Sum256(env.Event.Serialize()) id := hex.EncodeToString(hash[:]) if id != env.Event.ID { - rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "invalid: id is computed incorrectly", @@ -96,14 +96,14 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { // check signature var ok bool if ok, e = env.Event.CheckSignature(); rl.E.Chk(e) { - rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "error: failed to verify signature"}, )) return } else if !ok { - rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "invalid: signature is invalid"}, @@ -127,14 +127,14 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { } else { ok = true } - rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(OKEnvelope{ EventID: env.Event.ID, OK: ok, Reason: reason, })) - case *nostr.CountEnvelope: + case *CountEnvelope: if rl.CountEvents == nil { - rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{ + rl.E.Chk(ws.WriteJSON(ClosedEnvelope{ SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"}, )) @@ -144,11 +144,11 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { for _, filter := range env.Filters { total += rl.handleCountRequest(ctx, ws, &filter) } - rl.E.Chk(ws.WriteJSON(nostr.CountEnvelope{ + rl.E.Chk(ws.WriteJSON(CountEnvelope{ SubscriptionID: env.SubscriptionID, Count: &total, })) - case *nostr.ReqEnvelope: + case *ReqEnvelope: eose := sync.WaitGroup{} eose.Add(len(env.Filters)) // a context just for the "stored events" request handler @@ -164,7 +164,7 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { if strings.HasPrefix(reason, "auth-required:") { RequestAuth(ctx) } - rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{ + rl.E.Chk(ws.WriteJSON(ClosedEnvelope{ SubscriptionID: env.SubscriptionID, Reason: reason}, )) @@ -177,12 +177,12 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { // we can cancel the context and fire the EOSE message eose.Wait() cancelReqCtx(nil) - rl.E.Chk(ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))) + rl.E.Chk(ws.WriteJSON(EOSEEnvelope(env.SubscriptionID))) }() setListener(env.SubscriptionID, ws, env.Filters, cancelReqCtx) - case *nostr.CloseEnvelope: + case *CloseEnvelope: removeListenerId(ws, string(*env)) - case *nostr.AuthEnvelope: + case *AuthEnvelope: wsBaseUrl := strings.Replace(rl.ServiceURL, "http", "ws", 1) if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok { ws.AuthedPublicKey = pubkey @@ -192,12 +192,12 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { ws.Authed = nil } ws.authLock.Unlock() - rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(OKEnvelope{ EventID: env.Event.ID, OK: true}, )) } else { - rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate"}, @@ -205,7 +205,7 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { } } } -func (rl *Relay) readMessages(ctx Ctx, kill func(), ws *WebSocket, conn *websocket.Conn, r *http.Request) { +func (rl *Relay) readMessages(ctx Ctx, kill func(), ws *WebSocket, conn *Conn, r *Request) { defer kill() conn.SetReadLimit(rl.MaxMessageSize) rl.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait))) diff --git a/cmd/replicatrd/replicatr/helpers.go b/cmd/replicatrd/replicatr/helpers.go index dec21394..cf176012 100644 --- a/cmd/replicatrd/replicatr/helpers.go +++ b/cmd/replicatrd/replicatr/helpers.go @@ -3,7 +3,6 @@ package replicatr import ( "encoding/hex" "hash/maphash" - "net/http" "strconv" "strings" "unsafe" @@ -21,12 +20,12 @@ func pointerHasher[V any](_ maphash.Seed, k *V) uint64 { return uint64(uintptr(unsafe.Pointer(k))) } -func isOlder(prev, next Event) bool { +func isOlder(prev, next *Event) bool { p, n := prev.CreatedAt, next.CreatedAt return p < n || (p == n && prev.ID > next.ID) } -func getServiceBaseURL(r *http.Request) string { +func getServiceBaseURL(r *Request) string { host := r.Header.Get("X-Forwarded-Host") if host == "" { host = r.Host diff --git a/cmd/replicatrd/replicatr/listener.go b/cmd/replicatrd/replicatr/listener.go index 90b45d58..22ddc466 100644 --- a/cmd/replicatrd/replicatr/listener.go +++ b/cmd/replicatrd/replicatr/listener.go @@ -1,7 +1,6 @@ package replicatr import ( - "context" "fmt" "github.com/nbd-wtf/go-nostr" @@ -9,16 +8,17 @@ import ( ) type Listener struct { - filters nostr.Filters - cancel context.CancelCauseFunc + filters Filters + cancel CancelCauseFunc } -var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket]) +var listeners = xsync.NewTypedMapOf[*WebSocket, + ListenerMap](pointerHasher[WebSocket]) -func GetListeningFilters() (respFilters nostr.Filters) { - respFilters = make(nostr.Filters, 0, listeners.Size()*2) +func GetListeningFilters() (respFilters Filters) { + respFilters = make(Filters, 0, listeners.Size()*2) // here we go through all the existing listeners - listeners.Range(func(_ *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { + listeners.Range(func(_ *WebSocket, subs ListenerMap) bool { subs.Range(func(_ string, listener *Listener) bool { for _, listenerFilter := range listener.filters { for _, respFilter := range respFilters { @@ -40,8 +40,8 @@ func GetListeningFilters() (respFilters nostr.Filters) { return } -func setListener(id string, ws *WebSocket, f Filters, c context.CancelCauseFunc) { - subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] { +func setListener(id string, ws *WebSocket, f Filters, c CancelCauseFunc) { + subs, _ := listeners.LoadOrCompute(ws, func() ListenerMap { return xsync.NewMapOf[*Listener]() }) subs.Store(id, &Listener{filters: f, cancel: c}) @@ -64,13 +64,16 @@ func removeListenerId(ws *WebSocket, id string) { // (no need to cancel contexts as they are all inherited from the main connection context) func removeListener(ws *WebSocket) { listeners.Delete(ws) } -func notifyListeners(event Event) { - listeners.Range(func(ws *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { +func notifyListeners(event *Event) { + listeners.Range(func(ws *WebSocket, subs ListenerMap) bool { subs.Range(func(id string, listener *Listener) bool { if !listener.filters.Match(event) { return true } - log.E.Chk(ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})) + log.E.Chk(ws.WriteJSON(EventEnvelope{ + SubscriptionID: &id, + Event: *event}, + )) return true }) return true diff --git a/cmd/replicatrd/replicatr/relay.go b/cmd/replicatrd/replicatr/relay.go index 4f0d690c..68bea848 100644 --- a/cmd/replicatrd/replicatr/relay.go +++ b/cmd/replicatrd/replicatr/relay.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "os" + "sync" "time" log2 "github.com/Hubmakerlabs/replicatr/pkg/log" @@ -23,29 +24,51 @@ const ( MaxMessageSize int64 = 512000 // ??? ) +// aliases so we can swap out to another package with only here changed type ( - Ctx = context.Context - Filter = *nostr.Filter - Event = *nostr.Event - Info = *nip11.RelayInformationDocument - Filters = nostr.Filters - RejectEvent func(ctx Ctx, event Event) (reject bool, msg string) - RejectFilter func(ctx Ctx, filter Filter) (reject bool, msg string) - OverwriteFilter func(ctx Ctx, filter Filter) - OverwriteDeletionOutcome func(ctx Ctx, target Event, del Event) (accept bool, msg string) - OverwriteResponseEvent func(ctx Ctx, event Event) - Events func(ctx Ctx, event Event) error + Ctx = context.Context + Info = nip11.RelayInformationDocument + Event = nostr.Event + Filter = nostr.Filter + Filters = nostr.Filters + TagMap = nostr.TagMap + EventEnvelope = nostr.EventEnvelope + OKEnvelope = nostr.OKEnvelope + CountEnvelope = nostr.CountEnvelope + ClosedEnvelope = nostr.ClosedEnvelope + ReqEnvelope = nostr.ReqEnvelope + EOSEEnvelope = nostr.EOSEEnvelope + CloseEnvelope = nostr.CloseEnvelope + AuthEnvelope = nostr.AuthEnvelope + NoticeEnvelope = nostr.NoticeEnvelope + Conn = websocket.Conn + Request = http.Request + ResponseWriter = http.ResponseWriter + Mutex = sync.Mutex + WaitGroup = sync.WaitGroup + CancelCauseFunc = context.CancelCauseFunc + ListenerMap = *xsync.MapOf[string, *Listener] +) + +// function types used in the relay state +type ( + RejectEvent func(ctx Ctx, event *Event) (reject bool, msg string) + RejectFilter func(ctx Ctx, filter *Filter) (reject bool, msg string) + OverwriteFilter func(ctx Ctx, filter *Filter) + OverwriteDeletionOutcome func(ctx Ctx, target *Event, del *Event) (accept bool, msg string) + OverwriteResponseEvent func(ctx Ctx, event *Event) + Events func(ctx Ctx, event *Event) error Hook func(ctx Ctx) - OverwriteRelayInformation func(ctx Ctx, r *http.Request, info Info) Info - QueryEvents func(ctx Ctx, filter Filter) (eventC chan Event, e error) - CountEvents func(ctx Ctx, filter Filter) (c int64, e error) - OnEventSaved func(ctx Ctx, event Event) + OverwriteRelayInformation func(ctx Ctx, r *Request, info *Info) *Info + QueryEvents func(ctx Ctx, filter *Filter) (eventC chan *Event, e error) + CountEvents func(ctx Ctx, filter *Filter) (c int64, e error) + OnEventSaved func(ctx Ctx, event *Event) ) func NewRelay(appName string) (r *Relay) { r = &Relay{ Log: log2.New(os.Stderr, appName, 0), - Info: &nip11.RelayInformationDocument{ + Info: &Info{ Software: "https://github.com/Hubmakerlabs/replicatr/cmd/khatru", Version: "n/a", SupportedNIPs: make([]int, 0), @@ -83,7 +106,7 @@ type Relay struct { OnDisconnect []Hook OnEventSaved []OnEventSaved // editing info will affect - Info Info + Info *Info *log2.Log // for establishing websockets upgrader websocket.Upgrader diff --git a/cmd/replicatrd/replicatr/responding.go b/cmd/replicatrd/replicatr/responding.go index f81ea537..a9a70f15 100644 --- a/cmd/replicatrd/replicatr/responding.go +++ b/cmd/replicatrd/replicatr/responding.go @@ -1,16 +1,14 @@ package replicatr import ( - "context" - "errors" - "sync" + err "errors" "github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize" - "github.com/nbd-wtf/go-nostr" + "github.com/pkg/errors" ) -func (rl *Relay) handleRequest(ctx context.Context, id string, - eose *sync.WaitGroup, ws *WebSocket, f *nostr.Filter) (e error) { +func (rl *Relay) handleRequest(ctx Ctx, id string, + eose *WaitGroup, ws *WebSocket, f *Filter) (e error) { defer eose.Done() // overwrite the filter (for example, to eliminate some kinds or that we @@ -29,26 +27,26 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, // filter we can just reject it) for _, reject := range rl.RejectFilter { if rej, msg := reject(ctx, f); rej { - rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(msg))) - return errors.New(normalize.OKMessage(msg, "blocked")) + rl.E.Chk(ws.WriteJSON(NoticeEnvelope(msg))) + return err.New(normalize.OKMessage(msg, "blocked")) } } // run the functions to query events (generally just one, // but we might be fetching stuff from multiple places) eose.Add(len(rl.QueryEvents)) for _, query := range rl.QueryEvents { - var ch chan *nostr.Event + var ch chan *Event if ch, e = query(ctx, f); rl.E.Chk(e) { - rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(e.Error()))) + rl.E.Chk(ws.WriteJSON(NoticeEnvelope(e.Error()))) eose.Done() continue } - go func(ch chan *nostr.Event) { + go func(ch chan *Event) { for event := range ch { for _, ovw := range rl.OverwriteResponseEvent { ovw(ctx, event) } - rl.E.Chk(ws.WriteJSON(nostr.EventEnvelope{ + rl.E.Chk(ws.WriteJSON(EventEnvelope{ SubscriptionID: &id, Event: *event, })) @@ -59,8 +57,9 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, return nil } -func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, - filter *nostr.Filter) (subtotal int64) { +func (rl *Relay) handleCountRequest(ctx Ctx, ws *WebSocket, + filter *Filter) (subtotal int64) { + // overwrite the filter (for example, to eliminate some kinds or tags that // we know we don't support) for _, ovw := range rl.OverwriteCountFilter { @@ -69,7 +68,7 @@ func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, // then check if we'll reject this filter for _, reject := range rl.RejectCountFilter { if rej, msg := reject(ctx, filter); rej { - rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(msg))) + rl.E.Chk(ws.WriteJSON(NoticeEnvelope(msg))) return 0 } } @@ -78,7 +77,7 @@ func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, var res int64 for _, count := range rl.CountEvents { if res, e = count(ctx, filter); rl.E.Chk(e) { - rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(e.Error()))) + rl.E.Chk(ws.WriteJSON(NoticeEnvelope(e.Error()))) } subtotal += res } diff --git a/cmd/replicatrd/replicatr/start.go b/cmd/replicatrd/replicatr/start.go index 5a468781..8b6452cf 100644 --- a/cmd/replicatrd/replicatr/start.go +++ b/cmd/replicatrd/replicatr/start.go @@ -1,7 +1,6 @@ package replicatr import ( - "context" "errors" "net" "net/http" @@ -44,9 +43,8 @@ func (rl *Relay) Start(host string, port int, started ...chan bool) (e error) { } // Shutdown sends a websocket close control message to all connected clients. -func (rl *Relay) Shutdown(ctx context.Context) { - rl.httpServer.Shutdown(ctx) - +func (rl *Relay) Shutdown(ctx Ctx) { + rl.Log.E.Chk(rl.httpServer.Shutdown(ctx)) rl.clients.Range(func(conn *websocket.Conn, _ struct{}) bool { rl.E.Chk(conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))) rl.E.Chk(conn.Close()) diff --git a/cmd/replicatrd/replicatr/utils.go b/cmd/replicatrd/replicatr/utils.go index 4544f853..88b14f54 100644 --- a/cmd/replicatrd/replicatr/utils.go +++ b/cmd/replicatrd/replicatr/utils.go @@ -30,7 +30,7 @@ func GetSubscriptionID(ctx Ctx) string { return ctx.Value(subscriptionIdKey).(st func GetOpenSubscriptions(ctx Ctx) Filters { if subs, ok := listeners.Load(GetConnection(ctx)); ok { - res := make([]nostr.Filter, 0, listeners.Size()*2) + res := make(Filters, 0, listeners.Size()*2) subs.Range(func(_ string, sub *Listener) bool { res = append(res, sub.filters...) return true diff --git a/cmd/replicatrd/replicatr/websocket.go b/cmd/replicatrd/replicatr/websocket.go index 383617b9..0b203b0e 100644 --- a/cmd/replicatrd/replicatr/websocket.go +++ b/cmd/replicatrd/replicatr/websocket.go @@ -1,22 +1,15 @@ package replicatr -import ( - "net/http" - "sync" - - "github.com/fasthttp/websocket" -) - // WebSocket is a wrapper around a fasthttp/websocket with mutex locking and // NIP-42 Auth support type WebSocket struct { - conn *websocket.Conn - mutex sync.Mutex - Request *http.Request // original request - Challenge string // nip42 + conn *Conn + mutex Mutex + Request *Request // original request + Challenge string // nip42 AuthedPublicKey string Authed chan struct{} - authLock sync.Mutex + authLock Mutex } // WriteJSON writes an object as JSON to the websocket