From f92b206f8d475c450397b2a13aabb3ecc67dd168 Mon Sep 17 00:00:00 2001 From: mleku Date: Wed, 3 Jan 2024 10:07:50 +0000 Subject: [PATCH] some type alieses for neater function signatures --- cmd/replicatrd/main.go | 8 +-- cmd/replicatrd/replicatr/adding.go | 37 +++++++------ cmd/replicatrd/replicatr/broadcasting.go | 6 +-- cmd/replicatrd/replicatr/deleting.go | 11 ++-- cmd/replicatrd/replicatr/handlers.go | 68 ++++++++++++------------ cmd/replicatrd/replicatr/helpers.go | 7 ++- cmd/replicatrd/replicatr/listener.go | 41 ++++++-------- cmd/replicatrd/replicatr/relay.go | 62 ++++++++++++--------- cmd/replicatrd/replicatr/responding.go | 49 +++++++++-------- cmd/replicatrd/replicatr/start.go | 16 +++--- cmd/replicatrd/replicatr/utils.go | 24 +++------ cmd/replicatrd/replicatr/websocket.go | 19 ++++--- pkg/log/log.go | 10 ++-- pkg/nostr/normalize/normalize.go | 9 ++++ pkg/nostr/pool.go | 5 +- pkg/relay/add-event.go | 10 ++-- pkg/relay/handlers.go | 42 +++++++-------- pkg/relay/serve-req.go | 14 ++--- pkg/relay/start.go | 10 ++-- 19 files changed, 225 insertions(+), 223 deletions(-) diff --git a/cmd/replicatrd/main.go b/cmd/replicatrd/main.go index f10adec2..4181fb26 100644 --- a/cmd/replicatrd/main.go +++ b/cmd/replicatrd/main.go @@ -15,14 +15,14 @@ func main() { log2.SetLogLevel(log2.Trace) rl := replicatr.NewRelay(appName) db := &badger.BadgerBackend{Path: "/tmp/replicatr-badger"} - if e:= db.Init(); rl.Log.E.Chk(e) { - rl.Log.E.F("unable to start database: '%s'", e) + if e:= db.Init(); rl.E.Chk(e) { + rl.E.F("unable to start database: '%s'", e) os.Exit(1) } rl.StoreEvent = append(rl.StoreEvent, db.SaveEvent) rl.QueryEvents = append(rl.QueryEvents, db.QueryEvents) rl.CountEvents = append(rl.CountEvents, db.CountEvents) rl.DeleteEvent = append(rl.DeleteEvent, db.DeleteEvent) - rl.Log.I.Ln("running on :3334") - rl.Log.E.Chk(http.ListenAndServe(":3334", rl)) + rl.I.Ln("running on :3334") + rl.E.Chk(http.ListenAndServe(":3334", rl)) } diff --git a/cmd/replicatrd/replicatr/adding.go b/cmd/replicatrd/replicatr/adding.go index 9edd92c8..11eafe52 100644 --- a/cmd/replicatrd/replicatr/adding.go +++ b/cmd/replicatrd/replicatr/adding.go @@ -1,28 +1,31 @@ package replicatr import ( - "context" - "errors" + err "errors" "fmt" - "github.com/fiatjaf/eventstore" + "github.com/Hubmakerlabs/replicatr/pkg/eventstore" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize" "github.com/nbd-wtf/go-nostr" ) -// AddEvent sends an event through then normal add pipeline, as if it was received from a websocket. -func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (e error) { +// AddEvent sends an event through then normal add pipeline, as if it was +// received from a websocket. +func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) { if evt == nil { - return errors.New("error: event is nil") + e = err.New("error: event is nil") + rl.E.Ln(e) + return } for _, rejectors := range rl.RejectEvent { if reject, msg := rejectors(ctx, evt); reject { if msg == "" { - e = errors.New("blocked: no reason") - rl.Log.E.Ln(e) + e = err.New("blocked: no reason") + rl.E.Ln(e) return } else { - e = errors.New(nostr.NormalizeOKMessage(msg, "blocked")) - rl.Log.E.Ln(e) + e = err.New(normalize.OKMessage(msg, "blocked")) + rl.E.Ln(e) return } } @@ -35,12 +38,12 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (e error) { for _, query := range rl.QueryEvents { var ch chan *nostr.Event ch, e = query(ctx, &nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}}) - if rl.Log.E.Chk(e) { + if rl.E.Chk(e) { continue } if previous := <-ch; previous != nil && isOlder(previous, evt) { for _, del := range rl.DeleteEvent { - rl.Log.D.Chk(del(ctx, previous)) + rl.D.Chk(del(ctx, previous)) } } } @@ -54,12 +57,12 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (e error) { Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}, Tags: nostr.TagMap{"d": []string{d.Value()}}, - }); rl.Log.E.Chk(e) { + }); rl.E.Chk(e) { continue } if previous := <-ch; previous != nil && isOlder(previous, evt) { for _, del := range rl.DeleteEvent { - rl.Log.E.Chk(del(ctx, previous)) + rl.E.Chk(del(ctx, previous)) } } } @@ -67,12 +70,12 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (e error) { } // store for _, store := range rl.StoreEvent { - if saveErr := store(ctx, evt); rl.Log.E.Chk(saveErr){ + if saveErr := store(ctx, evt); rl.E.Chk(saveErr) { switch { - case errors.Is(saveErr, eventstore.ErrDupEvent): + case err.Is(saveErr, eventstore.ErrDupEvent): return nil default: - return fmt.Errorf(nostr.NormalizeOKMessage(saveErr.Error(), "error")) + return fmt.Errorf(normalize.OKMessage(saveErr.Error(), "error")) } } } diff --git a/cmd/replicatrd/replicatr/broadcasting.go b/cmd/replicatrd/replicatr/broadcasting.go index 6a8dd450..b232030c 100644 --- a/cmd/replicatrd/replicatr/broadcasting.go +++ b/cmd/replicatrd/replicatr/broadcasting.go @@ -1,9 +1,5 @@ package replicatr -import ( - "github.com/nbd-wtf/go-nostr" -) - // BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions // it also doesn't attempt to store the event or trigger any reactions or callbacks -func (rl *Relay) BroadcastEvent(evt *nostr.Event) { notifyListeners(evt) } +func (rl *Relay) BroadcastEvent(evt Event) { notifyListeners(evt) } diff --git a/cmd/replicatrd/replicatr/deleting.go b/cmd/replicatrd/replicatr/deleting.go index 8153d175..3f415a97 100644 --- a/cmd/replicatrd/replicatr/deleting.go +++ b/cmd/replicatrd/replicatr/deleting.go @@ -1,20 +1,19 @@ package replicatr import ( - "context" "fmt" "github.com/nbd-wtf/go-nostr" ) -func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) (e error) { +func (rl *Relay) handleDeleteRequest(ctx Ctx, evt Event) (e error) { // event deletion -- nip09 for _, tag := range evt.Tags { if len(tag) >= 2 && tag[0] == "e" { // first we fetch the event for _, query := range rl.QueryEvents { var ch chan *nostr.Event - if ch, e = query(ctx, &nostr.Filter{IDs: []string{tag[1]}}); rl.Log.E.Chk(e) { + if ch, e = query(ctx, &nostr.Filter{IDs: []string{tag[1]}}); rl.E.Chk(e) { continue } target := <-ch @@ -34,20 +33,18 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) (e e if acceptDeletion { // delete it for _, del := range rl.DeleteEvent { - rl.Log.E.Chk(del(ctx, target)) + rl.E.Chk(del(ctx, target)) } } else { // fail and stop here e = fmt.Errorf("blocked: %s", msg) - rl.Log.E.Chk(e) + rl.E.Ln(e) return } - // don't try to query this same event again break } } } - return nil } diff --git a/cmd/replicatrd/replicatr/handlers.go b/cmd/replicatrd/replicatr/handlers.go index 9f3237a8..8040c6da 100644 --- a/cmd/replicatrd/replicatr/handlers.go +++ b/cmd/replicatrd/replicatr/handlers.go @@ -6,7 +6,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" - "errors" + err "errors" "net/http" "strings" "sync" @@ -36,8 +36,8 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { var e error var conn *websocket.Conn conn, e = rl.upgrader.Upgrade(w, r, nil) - if rl.Log.E.Chk(e) { - rl.Log.E.F("failed to upgrade websocket: %v\n", e) + if rl.E.Chk(e) { + rl.E.F("failed to upgrade websocket: %v\n", e) return } rl.clients.Store(conn, struct{}{}) @@ -45,7 +45,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { // NIP-42 challenge challenge := make([]byte, 8) _, e = rand.Read(challenge) - rl.Log.E.Chk(e) + rl.E.Chk(e) ws := &WebSocket{ conn: conn, Request: r, @@ -64,7 +64,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ticker.Stop() cancel() if _, ok := rl.clients.Load(conn); ok { - rl.Log.E.Chk(conn.Close()) + _ = conn.Close() rl.clients.Delete(conn) removeListener(ws) } @@ -73,7 +73,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { go rl.watcher(ctx, kill, ticker, ws) } -func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSocket) { +func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) { var e error envelope := nostr.ParseMessage(message) if envelope == nil { @@ -86,7 +86,7 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc hash := sha256.Sum256(env.Event.Serialize()) id := hex.EncodeToString(hash[:]) if id != env.Event.ID { - rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "invalid: id is computed incorrectly", @@ -95,15 +95,15 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc } // check signature var ok bool - if ok, e = env.Event.CheckSignature(); rl.Log.E.Chk(e) { - rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + if ok, e = env.Event.CheckSignature(); rl.E.Chk(e) { + rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "error: failed to verify signature"}, )) return } else if !ok { - rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "invalid: signature is invalid"}, @@ -119,22 +119,22 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc writeErr = rl.AddEvent(ctx, &env.Event) } var reason string - if !rl.Log.E.Chk(writeErr) { - ok = true - } else { + if ok = !rl.E.Chk(writeErr); !ok { reason = writeErr.Error() if strings.HasPrefix(reason, "auth-required:") { RequestAuth(ctx) } + } else { + ok = true } - rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ EventID: env.Event.ID, OK: ok, Reason: reason, })) case *nostr.CountEnvelope: if rl.CountEvents == nil { - rl.Log.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{ SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"}, )) @@ -144,7 +144,7 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc for _, filter := range env.Filters { total += rl.handleCountRequest(ctx, ws, &filter) } - rl.Log.E.Chk(ws.WriteJSON(nostr.CountEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.CountEnvelope{ SubscriptionID: env.SubscriptionID, Count: &total, })) @@ -158,17 +158,17 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc // handle each filter separately -- dispatching events as they're loaded from databases for _, filter := range env.Filters { e = rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, &filter) - if rl.Log.E.Chk(e) { + if rl.E.Chk(e) { // fail everything if any filter is rejected reason := e.Error() if strings.HasPrefix(reason, "auth-required:") { RequestAuth(ctx) } - rl.Log.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{ SubscriptionID: env.SubscriptionID, Reason: reason}, )) - cancelReqCtx(errors.New("filter rejected")) + cancelReqCtx(err.New("filter rejected")) return } } @@ -177,7 +177,7 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc // we can cancel the context and fire the EOSE message eose.Wait() cancelReqCtx(nil) - rl.Log.E.Chk(ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))) + rl.E.Chk(ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID))) }() setListener(env.SubscriptionID, ws, env.Filters, cancelReqCtx) case *nostr.CloseEnvelope: @@ -192,12 +192,12 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc ws.Authed = nil } ws.authLock.Unlock() - rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ EventID: env.Event.ID, OK: true}, )) } else { - rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate"}, @@ -205,13 +205,13 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc } } } -func (rl *Relay) readMessages(ctx context.Context, kill func(), ws *WebSocket, conn *websocket.Conn, r *http.Request) { +func (rl *Relay) readMessages(ctx Ctx, kill func(), ws *WebSocket, conn *websocket.Conn, r *http.Request) { defer kill() conn.SetReadLimit(rl.MaxMessageSize) - rl.Log.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait))) + rl.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait))) conn.SetPongHandler(func(string) (e error) { e = conn.SetReadDeadline(time.Now().Add(rl.PongWait)) - rl.Log.E.Chk(e) + rl.E.Chk(e) return }) for _, onConnect := range rl.OnConnect { @@ -222,7 +222,7 @@ func (rl *Relay) readMessages(ctx context.Context, kill func(), ws *WebSocket, c var typ int var message []byte typ, message, e = conn.ReadMessage() - if rl.Log.E.Chk(e) { + if e != nil { if websocket.IsUnexpectedCloseError( e, websocket.CloseNormalClosure, // 1000 @@ -230,20 +230,20 @@ func (rl *Relay) readMessages(ctx context.Context, kill func(), ws *WebSocket, c websocket.CloseNoStatusReceived, // 1005 websocket.CloseAbnormalClosure, // 1006 ) { - rl.Log.E.F("unexpected close error from %s: %v\n", + rl.E.F("unexpected close error from %s: %v\n", r.Header.Get("X-Forwarded-For"), e) } return } if typ == websocket.PingMessage { - rl.Log.E.Chk(ws.WriteMessage(websocket.PongMessage, nil)) + rl.E.Chk(ws.WriteMessage(websocket.PongMessage, nil)) continue } go rl.processMessages(message, ctx, ws) } } -func (rl *Relay) watcher(ctx context.Context, kill func(), ticker *time.Ticker, ws *WebSocket) { +func (rl *Relay) watcher(ctx Ctx, kill func(), ticker *time.Ticker, ws *WebSocket) { var e error defer kill() for { @@ -251,9 +251,9 @@ func (rl *Relay) watcher(ctx context.Context, kill func(), ticker *time.Ticker, case <-ctx.Done(): return case <-ticker.C: - if e = ws.WriteMessage(websocket.PingMessage, nil); rl.Log.E.Chk(e) { + if e = ws.WriteMessage(websocket.PingMessage, nil); rl.E.Chk(e) { if !strings.HasSuffix(e.Error(), "use of closed network connection") { - rl.Log.E.F("error writing ping: %v; closing websocket\n", e) + rl.E.F("error writing ping: %v; closing websocket\n", e) } return } @@ -263,9 +263,9 @@ func (rl *Relay) watcher(ctx context.Context, kill func(), ticker *time.Ticker, func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/nostr+json") - info := *rl.Info - for _, ovw := range rl.OverwriteRelayInformation { + info := rl.Info + for _, ovw := range rl.OverwriteRelayInfo { info = ovw(r.Context(), r, info) } - rl.Log.E.Chk(json.NewEncoder(w).Encode(info)) + rl.E.Chk(json.NewEncoder(w).Encode(info)) } diff --git a/cmd/replicatrd/replicatr/helpers.go b/cmd/replicatrd/replicatr/helpers.go index 6fae0ed6..dec21394 100644 --- a/cmd/replicatrd/replicatr/helpers.go +++ b/cmd/replicatrd/replicatr/helpers.go @@ -8,7 +8,6 @@ import ( "strings" "unsafe" - "github.com/nbd-wtf/go-nostr" log2 "mleku.online/git/log" ) @@ -22,9 +21,9 @@ func pointerHasher[V any](_ maphash.Seed, k *V) uint64 { return uint64(uintptr(unsafe.Pointer(k))) } -func isOlder(previous, next *nostr.Event) bool { - return previous.CreatedAt < next.CreatedAt || - (previous.CreatedAt == next.CreatedAt && previous.ID > next.ID) +func isOlder(prev, next Event) bool { + p, n := prev.CreatedAt, next.CreatedAt + return p < n || (p == n && prev.ID > next.ID) } func getServiceBaseURL(r *http.Request) string { diff --git a/cmd/replicatrd/replicatr/listener.go b/cmd/replicatrd/replicatr/listener.go index c2a728cb..90b45d58 100644 --- a/cmd/replicatrd/replicatr/listener.go +++ b/cmd/replicatrd/replicatr/listener.go @@ -15,43 +15,36 @@ type Listener struct { var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket]) -func GetListeningFilters() nostr.Filters { - respfilters := make(nostr.Filters, 0, listeners.Size()*2) - +func GetListeningFilters() (respFilters nostr.Filters) { + respFilters = make(nostr.Filters, 0, listeners.Size()*2) // here we go through all the existing listeners listeners.Range(func(_ *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { subs.Range(func(_ string, listener *Listener) bool { - for _, listenerfilter := range listener.filters { - for _, respfilter := range respfilters { - // check if this filter specifically is already added to respfilters - if nostr.FilterEqual(listenerfilter, respfilter) { - goto nextconn + for _, listenerFilter := range listener.filters { + for _, respFilter := range respFilters { + // check if this filter specifically is already added to respFilters + if nostr.FilterEqual(listenerFilter, respFilter) { + goto next } } - - // field not yet present on respfilters, add it - respfilters = append(respfilters, listenerfilter) - + // field not yet present on respFilters, add it + respFilters = append(respFilters, listenerFilter) // continue to the next filter - nextconn: + next: continue } - return true }) - return true }) - - // respfilters will be a slice with all the distinct filter we currently have active - return respfilters + return } -func setListener(id string, ws *WebSocket, filters nostr.Filters, cancel context.CancelCauseFunc) { +func setListener(id string, ws *WebSocket, f Filters, c context.CancelCauseFunc) { subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] { return xsync.NewMapOf[*Listener]() }) - subs.Store(id, &Listener{filters: filters, cancel: cancel}) + subs.Store(id, &Listener{filters: f, cancel: c}) } // remove a specific subscription id from listeners for a given ws client @@ -69,17 +62,15 @@ func removeListenerId(ws *WebSocket, id string) { // remove WebSocket conn from listeners // (no need to cancel contexts as they are all inherited from the main connection context) -func removeListener(ws *WebSocket) { - listeners.Delete(ws) -} +func removeListener(ws *WebSocket) { listeners.Delete(ws) } -func notifyListeners(event *nostr.Event) { +func notifyListeners(event Event) { listeners.Range(func(ws *WebSocket, subs *xsync.MapOf[string, *Listener]) bool { subs.Range(func(id string, listener *Listener) bool { if !listener.filters.Match(event) { return true } - ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}) + log.E.Chk(ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})) return true }) return true diff --git a/cmd/replicatrd/replicatr/relay.go b/cmd/replicatrd/replicatr/relay.go index d4db9c82..4f0d690c 100644 --- a/cmd/replicatrd/replicatr/relay.go +++ b/cmd/replicatrd/replicatr/relay.go @@ -7,7 +7,7 @@ import ( "time" log2 "github.com/Hubmakerlabs/replicatr/pkg/log" - + "github.com/fasthttp/websocket" "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/nip11" @@ -23,6 +23,25 @@ const ( MaxMessageSize int64 = 512000 // ??? ) +type ( + Ctx = context.Context + Filter = *nostr.Filter + Event = *nostr.Event + Info = *nip11.RelayInformationDocument + Filters = nostr.Filters + RejectEvent func(ctx Ctx, event Event) (reject bool, msg string) + RejectFilter func(ctx Ctx, filter Filter) (reject bool, msg string) + OverwriteFilter func(ctx Ctx, filter Filter) + OverwriteDeletionOutcome func(ctx Ctx, target Event, del Event) (accept bool, msg string) + OverwriteResponseEvent func(ctx Ctx, event Event) + Events func(ctx Ctx, event Event) error + Hook func(ctx Ctx) + OverwriteRelayInformation func(ctx Ctx, r *http.Request, info Info) Info + QueryEvents func(ctx Ctx, filter Filter) (eventC chan Event, e error) + CountEvents func(ctx Ctx, filter Filter) (c int64, e error) + OnEventSaved func(ctx Ctx, event Event) +) + func NewRelay(appName string) (r *Relay) { r = &Relay{ Log: log2.New(os.Stderr, appName, 0), @@ -46,31 +65,26 @@ func NewRelay(appName string) (r *Relay) { return } -type ( - RejectEvent func(ctx context.Context, event *nostr.Event) (reject bool, msg string) - RejectFilter func(ctx context.Context, filter *nostr.Filter) (reject bool, msg string) -) - type Relay struct { - ServiceURL string - RejectEvent []RejectEvent - RejectFilter []RejectFilter - RejectCountFilter []func(ctx context.Context, filter *nostr.Filter) (reject bool, msg string) - OverwriteDeletionOutcome []func(ctx context.Context, target *nostr.Event, deletion *nostr.Event) (acceptDeletion bool, msg string) - OverwriteResponseEvent []func(ctx context.Context, event *nostr.Event) - OverwriteFilter []func(ctx context.Context, filter *nostr.Filter) - OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter) - OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info nip11.RelayInformationDocument) nip11.RelayInformationDocument - StoreEvent []func(ctx context.Context, event *nostr.Event) error - DeleteEvent []func(ctx context.Context, event *nostr.Event) error - QueryEvents []func(ctx context.Context, filter *nostr.Filter) (chan *nostr.Event, error) - CountEvents []func(ctx context.Context, filter *nostr.Filter) (int64, error) - OnConnect []func(ctx context.Context) - OnDisconnect []func(ctx context.Context) - OnEventSaved []func(ctx context.Context, event *nostr.Event) + ServiceURL string + RejectEvent []RejectEvent + RejectFilter []RejectFilter + RejectCountFilter []RejectFilter + OverwriteDeletionOutcome []OverwriteDeletionOutcome + OverwriteResponseEvent []OverwriteResponseEvent + OverwriteFilter []OverwriteFilter + OverwriteCountFilter []OverwriteFilter + OverwriteRelayInfo []OverwriteRelayInformation + StoreEvent []Events + DeleteEvent []Events + QueryEvents []QueryEvents + CountEvents []CountEvents + OnConnect []Hook + OnDisconnect []Hook + OnEventSaved []OnEventSaved // editing info will affect - Info *nip11.RelayInformationDocument - Log *log2.Logger + Info Info + *log2.Log // for establishing websockets upgrader websocket.Upgrader // keep a connection reference to all connected clients for Server.Shutdown diff --git a/cmd/replicatrd/replicatr/responding.go b/cmd/replicatrd/replicatr/responding.go index 29ede36e..f81ea537 100644 --- a/cmd/replicatrd/replicatr/responding.go +++ b/cmd/replicatrd/replicatr/responding.go @@ -5,21 +5,22 @@ import ( "errors" "sync" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize" "github.com/nbd-wtf/go-nostr" ) -func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGroup, ws *WebSocket, - filter *nostr.Filter) (e error) { +func (rl *Relay) handleRequest(ctx context.Context, id string, + eose *sync.WaitGroup, ws *WebSocket, f *nostr.Filter) (e error) { defer eose.Done() - // overwrite the filter (for example, to eliminate some kinds or - // that we know we don't support) + // overwrite the filter (for example, to eliminate some kinds or that we + // know we don't support) for _, ovw := range rl.OverwriteFilter { - ovw(ctx, filter) + ovw(ctx, f) } - if filter.Limit < 0 { + if f.Limit < 0 { e = errors.New("blocked: filter invalidated") - rl.Log.E.Chk(e) + rl.E.Chk(e) return } // then check if we'll reject this filter (we apply this after overwriting @@ -27,9 +28,9 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr // that we know we don't support, and then if the end result is an empty // filter we can just reject it) for _, reject := range rl.RejectFilter { - if reject, msg := reject(ctx, filter); reject { - rl.Log.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(msg))) - return errors.New(nostr.NormalizeOKMessage(msg, "blocked")) + if rej, msg := reject(ctx, f); rej { + rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(msg))) + return errors.New(normalize.OKMessage(msg, "blocked")) } } // run the functions to query events (generally just one, @@ -37,8 +38,8 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr eose.Add(len(rl.QueryEvents)) for _, query := range rl.QueryEvents { var ch chan *nostr.Event - if ch, e = query(ctx, filter); rl.Log.E.Chk(e) { - rl.Log.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(e.Error()))) + if ch, e = query(ctx, f); rl.E.Chk(e) { + rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(e.Error()))) eose.Done() continue } @@ -47,7 +48,10 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr for _, ovw := range rl.OverwriteResponseEvent { ovw(ctx, event) } - rl.Log.E.Chk(ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event})) + rl.E.Chk(ws.WriteJSON(nostr.EventEnvelope{ + SubscriptionID: &id, + Event: *event, + })) } eose.Done() }(ch) @@ -55,29 +59,28 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr return nil } -func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter *nostr.Filter) int64 { - // overwrite the filter (for example, to eliminate some kinds or tags that we know we don't support) +func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, + filter *nostr.Filter) (subtotal int64) { + // overwrite the filter (for example, to eliminate some kinds or tags that + // we know we don't support) for _, ovw := range rl.OverwriteCountFilter { ovw(ctx, filter) } - // then check if we'll reject this filter for _, reject := range rl.RejectCountFilter { - if rejecting, msg := reject(ctx, filter); rejecting { - ws.WriteJSON(nostr.NoticeEnvelope(msg)) + if rej, msg := reject(ctx, filter); rej { + rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(msg))) return 0 } } - // run the functions to count (generally it will be just one) - var subtotal int64 = 0 var e error var res int64 for _, count := range rl.CountEvents { - if res, e = count(ctx, filter); rl.Log.E.Chk(e) { - rl.Log.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(e.Error()))) + if res, e = count(ctx, filter); rl.E.Chk(e) { + rl.E.Chk(ws.WriteJSON(nostr.NoticeEnvelope(e.Error()))) } subtotal += res } - return subtotal + return } diff --git a/cmd/replicatrd/replicatr/start.go b/cmd/replicatrd/replicatr/start.go index f4029865..5a468781 100644 --- a/cmd/replicatrd/replicatr/start.go +++ b/cmd/replicatrd/replicatr/start.go @@ -20,7 +20,7 @@ func (rl *Relay) Router() *http.ServeMux { func (rl *Relay) Start(host string, port int, started ...chan bool) (e error) { addr := net.JoinHostPort(host, strconv.Itoa(port)) var ln net.Listener - if ln, e = net.Listen("tcp", addr); rl.Log.E.Chk(e) { + if ln, e = net.Listen("tcp", addr); rl.E.Chk(e) { return } rl.Addr = ln.Addr().String() @@ -32,13 +32,13 @@ func (rl *Relay) Start(host string, port int, started ...chan bool) (e error) { IdleTimeout: 30 * time.Second, } // notify caller that we're starting - for _, started := range started { - close(started) + for _, s := range started { + close(s) } - if e := rl.httpServer.Serve(ln); errors.Is(e, http.ErrServerClosed) { + if e = rl.httpServer.Serve(ln); errors.Is(e, http.ErrServerClosed) { return nil - } else if e != nil { - return e + } else if rl.Log.E.Chk(e) { + return } return } @@ -48,8 +48,8 @@ func (rl *Relay) Shutdown(ctx context.Context) { rl.httpServer.Shutdown(ctx) rl.clients.Range(func(conn *websocket.Conn, _ struct{}) bool { - rl.Log.E.Chk(conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))) - rl.Log.E.Chk(conn.Close()) + rl.E.Chk(conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))) + rl.E.Chk(conn.Close()) rl.clients.Delete(conn) return true }) diff --git a/cmd/replicatrd/replicatr/utils.go b/cmd/replicatrd/replicatr/utils.go index 7d4fbf30..4544f853 100644 --- a/cmd/replicatrd/replicatr/utils.go +++ b/cmd/replicatrd/replicatr/utils.go @@ -1,8 +1,6 @@ package replicatr import ( - "context" - "github.com/nbd-wtf/go-nostr" "github.com/sebest/xff" ) @@ -12,33 +10,25 @@ const ( subscriptionIdKey ) -func RequestAuth(ctx context.Context) { +func RequestAuth(ctx Ctx) { ws := GetConnection(ctx) ws.authLock.Lock() if ws.Authed == nil { ws.Authed = make(chan struct{}) } ws.authLock.Unlock() - ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge}) + log.E.Chk(ws.WriteJSON(nostr.AuthEnvelope{Challenge: &ws.Challenge})) } -func GetConnection(ctx context.Context) *WebSocket { - return ctx.Value(wsKey).(*WebSocket) -} +func GetConnection(ctx Ctx) *WebSocket { return ctx.Value(wsKey).(*WebSocket) } -func GetAuthed(ctx context.Context) string { - return GetConnection(ctx).AuthedPublicKey -} +func GetAuthed(ctx Ctx) string { return GetConnection(ctx).AuthedPublicKey } -func GetIP(ctx context.Context) string { - return xff.GetRemoteAddr(GetConnection(ctx).Request) -} +func GetIP(ctx Ctx) string { return xff.GetRemoteAddr(GetConnection(ctx).Request) } -func GetSubscriptionID(ctx context.Context) string { - return ctx.Value(subscriptionIdKey).(string) -} +func GetSubscriptionID(ctx Ctx) string { return ctx.Value(subscriptionIdKey).(string) } -func GetOpenSubscriptions(ctx context.Context) []nostr.Filter { +func GetOpenSubscriptions(ctx Ctx) Filters { if subs, ok := listeners.Load(GetConnection(ctx)); ok { res := make([]nostr.Filter, 0, listeners.Size()*2) subs.Range(func(_ string, sub *Listener) bool { diff --git a/cmd/replicatrd/replicatr/websocket.go b/cmd/replicatrd/replicatr/websocket.go index 23f70705..383617b9 100644 --- a/cmd/replicatrd/replicatr/websocket.go +++ b/cmd/replicatrd/replicatr/websocket.go @@ -7,27 +7,26 @@ import ( "github.com/fasthttp/websocket" ) +// WebSocket is a wrapper around a fasthttp/websocket with mutex locking and +// NIP-42 Auth support type WebSocket struct { - conn *websocket.Conn - mutex sync.Mutex - - // original request - Request *http.Request - - // nip42 - Challenge string + conn *websocket.Conn + mutex sync.Mutex + Request *http.Request // original request + Challenge string // nip42 AuthedPublicKey string Authed chan struct{} - - authLock sync.Mutex + authLock sync.Mutex } +// WriteJSON writes an object as JSON to the websocket func (ws *WebSocket) WriteJSON(any any) (e error) { ws.mutex.Lock() defer ws.mutex.Unlock() return ws.conn.WriteJSON(any) } +// WriteMessage writes a message with a given websocket type specifier func (ws *WebSocket) WriteMessage(t int, b []byte) (e error) { ws.mutex.Lock() defer ws.mutex.Unlock() diff --git a/pkg/log/log.go b/pkg/log/log.go index a63a22ec..73eed39f 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -69,7 +69,7 @@ var ( Trace: "TRC", } - // log is your generic Logger creation invocation that uses the version data + // log is your generic Log creation invocation that uses the version data // in version.go that provides the current compilation path prefix for making // relative paths for log printing code locations. lvlStrs = map[string]Level{ @@ -122,8 +122,8 @@ type ( LevelSpec struct { Name string } - // Logger is a set of log printers for the various Level items. - Logger struct { + // Log is a set of log printers for the various Level items. + Log struct { F, E, W, I, D, T LevelPrinter } ) @@ -139,8 +139,8 @@ func GetLoc(skip int) (output string) { // // this copies the interface of stdlib log but we don't respect the settings // because a logger without timestamps is retarded -func New(writer io.Writer, appID string, _ int) (l *Logger) { - return &Logger{ +func New(writer io.Writer, appID string, _ int) (l *Log) { + return &Log{ getOnePrinter(writer, appID, Fatal), getOnePrinter(writer, appID, Error), getOnePrinter(writer, appID, Warn), diff --git a/pkg/nostr/normalize/normalize.go b/pkg/nostr/normalize/normalize.go index bf48cf0c..1d206b55 100644 --- a/pkg/nostr/normalize/normalize.go +++ b/pkg/nostr/normalize/normalize.go @@ -88,3 +88,12 @@ func URL(u string) string { p.Path = strings.TrimRight(p.Path, "/") return p.String() } + +// OKMessage takes a string message that is to be sent in an `OK` or `CLOSED` command +// and prefixes it with ": " if it doesn't already have an acceptable prefix. +func OKMessage(reason string, prefix string) string { + if idx := strings.Index(reason, ": "); idx == -1 || strings.IndexByte(reason[0:idx], ' ') != -1 { + return prefix + ": " + reason + } + return reason +} diff --git a/pkg/nostr/pool.go b/pkg/nostr/pool.go index 571f7cd4..877beb93 100644 --- a/pkg/nostr/pool.go +++ b/pkg/nostr/pool.go @@ -3,11 +3,12 @@ package nostr import ( "context" "fmt" + "sync" + "time" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/nip1" "github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize" "github.com/fiatjaf/generic-ristretto/z" - "sync" - "time" "github.com/puzpuzpuz/xsync/v2" ) diff --git a/pkg/relay/add-event.go b/pkg/relay/add-event.go index 50259654..139a1ac5 100644 --- a/pkg/relay/add-event.go +++ b/pkg/relay/add-event.go @@ -35,12 +35,12 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nip1.Event) (e error) { if ch, e = query(ctx, &nip1.Filter{ Authors: []string{evt.PubKey}, Kinds: kinds.T{evt.Kind}, - }); rl.Log.E.Chk(e) { + }); rl.E.Chk(e) { continue } if previous := <-ch; previous != nil && isOlder(previous, evt) { for _, del := range rl.DeleteEvent { - rl.Log.E.Chk(del(ctx, previous)) + rl.E.Chk(del(ctx, previous)) } } } @@ -53,7 +53,7 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nip1.Event) (e error) { Authors: []string{evt.PubKey}, Kinds: kinds.T{evt.Kind}, Tags: nip1.TagMap{"d": []string{d.Value()}}, - }); rl.Log.E.Chk(e) { + }); rl.E.Chk(e) { continue } if previous := <-ch; previous != nil && isOlder(previous, evt) { @@ -95,7 +95,7 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nip1.Event) (e er if len(tag) >= 2 && tag[0] == "e" { // first we fetch the event for _, query := range rl.QueryEvents { - if ch, e = query(ctx, &nip1.Filter{IDs: []string{tag[1]}}); rl.Log.E.Chk(e) { + if ch, e = query(ctx, &nip1.Filter{IDs: []string{tag[1]}}); rl.E.Chk(e) { continue } target := <-ch @@ -115,7 +115,7 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nip1.Event) (e er if acceptDeletion { // delete it for _, del := range rl.DeleteEvent { - rl.Log.E.Chk(del(ctx, target)) + rl.E.Chk(del(ctx, target)) } } else { // fail and stop here diff --git a/pkg/relay/handlers.go b/pkg/relay/handlers.go index fedf4a40..b6fda7f0 100644 --- a/pkg/relay/handlers.go +++ b/pkg/relay/handlers.go @@ -38,7 +38,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { var e error var conn *websocket.Conn if conn, e = rl.upgrader.Upgrade(w, r, nil); fails(e) { - rl.Log.E.F("failed to upgrade websocket: %v\n", e) + rl.E.F("failed to upgrade websocket: %v\n", e) return } rl.clients.Store(conn, struct{}{}) @@ -49,7 +49,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { var n int if n, e = rand.Read(challenge); fails(e) { - rl.Log.E.F("only read %d bytes from system CSPRNG", n) + rl.E.F("only read %d bytes from system CSPRNG", n) } ws := &WebSocket{ conn: conn, @@ -70,7 +70,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ticker.Stop() cancel() if _, ok := rl.clients.Load(conn); ok { - rl.Log.D.Chk(conn.Close()) + rl.D.Chk(conn.Close()) rl.clients.Delete(conn) removeListener(ws) } @@ -78,9 +78,9 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { go func() { defer kill() conn.SetReadLimit(rl.MaxMessageSize) - rl.Log.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait))) + rl.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait))) conn.SetPongHandler(func(string) (e error) { - rl.Log.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait))) + rl.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait))) return nil }) for _, onConnect := range rl.OnConnect { @@ -99,13 +99,13 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { websocket.CloseNoStatusReceived, // 1005 websocket.CloseAbnormalClosure, // 1006 ) { - rl.Log.E.F("unexpected close error from %s: %v\n", r.Header.Get("X-Forwarded-For"), e) + rl.E.F("unexpected close error from %s: %v\n", r.Header.Get("X-Forwarded-For"), e) } return } - rl.Log.D.F("received message on websocket: '%s'", string(message)) + rl.D.F("received message on websocket: '%s'", string(message)) if typ == websocket.PingMessage { - rl.Log.D.Chk(ws.WriteMessage(websocket.PongMessage, nil)) + rl.D.Chk(ws.WriteMessage(websocket.PongMessage, nil)) continue } go func(message []byte) { @@ -121,7 +121,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { hash := sha256.Sum256(env.Event.ToCanonical().Bytes()) id := hex.EncodeToString(hash[:]) if nip1.EventID(id) != env.Event.ID { - rl.Log.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "invalid: id is computed incorrectly", @@ -129,15 +129,15 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { return } // check signature - if ok, e = env.Event.CheckSignature(); rl.Log.E.Chk(e) { - rl.Log.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ + if ok, e = env.Event.CheckSignature(); rl.E.Chk(e) { + rl.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "error: failed to verify signature", })) return } else if !ok { - rl.Log.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "invalid: signature is invalid", @@ -163,11 +163,11 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { RequestAuth(ctx) } } - rl.Log.D.Chk(ws.WriteJSON(nip1.OKEnvelope{ + rl.D.Chk(ws.WriteJSON(nip1.OKEnvelope{ EventID: env.Event.ID, OK: ok, Reason: reason})) case *nip45.CountRequestEnvelope: if rl.CountEvents == nil { - rl.Log.E.Chk(ws.WriteJSON(nip1.ClosedEnvelope{ + rl.E.Chk(ws.WriteJSON(nip1.ClosedEnvelope{ SubscriptionID: env.SubscriptionID, Reason: "unsupported: " + "this relay does not support NIP-45", @@ -178,7 +178,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { for _, filter := range env.Filters { total += rl.handleCountRequest(ctx, ws, filter) } - rl.Log.D.Chk(ws.WriteJSON(nip45.CountResponseEnvelope{ + rl.D.Chk(ws.WriteJSON(nip45.CountResponseEnvelope{ SubscriptionID: env.SubscriptionID, Count: total, })) @@ -201,7 +201,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { if strings.HasPrefix(reason, "auth-required:") { RequestAuth(ctx) } - rl.Log.D.Chk(ws.WriteJSON(nip1.ClosedEnvelope{ + rl.D.Chk(ws.WriteJSON(nip1.ClosedEnvelope{ SubscriptionID: env.SubscriptionID, Reason: reason, })) @@ -215,7 +215,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { // EOSE message eose.Wait() cancelReqCtx(nil) - rl.Log.E.Chk(ws.WriteJSON(nip1.EOSEEnvelope{ + rl.E.Chk(ws.WriteJSON(nip1.EOSEEnvelope{ SubscriptionID: env.SubscriptionID})) }() @@ -231,10 +231,10 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ws.AuthedPublicKey = pubkey close(ws.Authed) - rl.Log.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ EventID: env.Event.ID, OK: true})) } else { - rl.Log.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ + rl.E.Chk(ws.WriteJSON(nip1.OKEnvelope{ EventID: env.Event.ID, OK: false, Reason: "error: failed to authenticate", @@ -258,7 +258,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { if !strings.HasSuffix(e.Error(), "use of closed network connection", ) { - rl.Log.E.F("error writing ping: %v; "+ + rl.E.F("error writing ping: %v; "+ "closing websocket\n", e) } return @@ -274,5 +274,5 @@ func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) { for _, ovw := range rl.OverwriteRelayInformation { info = ovw(r.Context(), r, info) } - rl.Log.E.Chk(json.NewEncoder(w).Encode(info)) + rl.E.Chk(json.NewEncoder(w).Encode(info)) } diff --git a/pkg/relay/serve-req.go b/pkg/relay/serve-req.go index d64feabd..bcf2048c 100644 --- a/pkg/relay/serve-req.go +++ b/pkg/relay/serve-req.go @@ -26,7 +26,7 @@ func (rl *Relay) handleRequest(ctx context.Context, id nip1.SubscriptionID, // filter we can just reject it) for _, reject := range rl.RejectFilter { if reject, msg := reject(ctx, filter); reject { - rl.Log.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: msg})) + rl.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: msg})) return errors.New(nip1.OKMessage(nip1.OKBlocked, msg)) } } @@ -35,8 +35,8 @@ func (rl *Relay) handleRequest(ctx context.Context, id nip1.SubscriptionID, eose.Add(len(rl.QueryEvents)) for _, query := range rl.QueryEvents { var ch chan *nip1.Event - if ch, e = query(ctx, filter); rl.Log.E.Chk(e) { - rl.Log.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: e.Error()})) + if ch, e = query(ctx, filter); rl.E.Chk(e) { + rl.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: e.Error()})) eose.Done() continue } @@ -45,7 +45,7 @@ func (rl *Relay) handleRequest(ctx context.Context, id nip1.SubscriptionID, for _, ovw := range rl.OverwriteResponseEvent { ovw(ctx, event) } - rl.Log.D.Chk(ws.WriteJSON(nip1.EventEnvelope{SubscriptionID: id, Event: event})) + rl.D.Chk(ws.WriteJSON(nip1.EventEnvelope{SubscriptionID: id, Event: event})) } eose.Done() }(ch) @@ -61,7 +61,7 @@ func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter * // then check if we'll reject this filter for _, reject := range rl.RejectCountFilter { if rejecting, msg := reject(ctx, filter); rejecting { - rl.Log.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: msg})) + rl.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: msg})) return 0 } } @@ -70,8 +70,8 @@ func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter * for _, count := range rl.CountEvents { var e error var res int64 - if res, e = count(ctx, filter); rl.Log.E.Chk(e) { - rl.Log.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: e.Error()})) + if res, e = count(ctx, filter); rl.E.Chk(e) { + rl.D.Chk(ws.WriteJSON(nip1.NoticeEnvelope{Text: e.Error()})) } subtotal += res } diff --git a/pkg/relay/start.go b/pkg/relay/start.go index 3ea4b3fe..6961286a 100644 --- a/pkg/relay/start.go +++ b/pkg/relay/start.go @@ -20,7 +20,7 @@ func (rl *Relay) Router() *http.ServeMux { func (rl *Relay) Start(host string, port int, started ...chan bool) (e error) { addr := net.JoinHostPort(host, strconv.Itoa(port)) var ln net.Listener - if ln, e = net.Listen("tcp", addr); rl.Log.E.Chk(e) { + if ln, e = net.Listen("tcp", addr); rl.E.Chk(e) { return } rl.Addr = ln.Addr().String() @@ -37,7 +37,7 @@ func (rl *Relay) Start(host string, port int, started ...chan bool) (e error) { } if e = rl.httpServer.Serve(ln); errors.Is(e, http.ErrServerClosed) { return nil - } else if rl.Log.E.Chk(e) { + } else if rl.E.Chk(e) { return } else { return nil @@ -46,10 +46,10 @@ func (rl *Relay) Start(host string, port int, started ...chan bool) (e error) { // Shutdown sends a websocket close control message to all connected clients. func (rl *Relay) Shutdown(ctx context.Context) { - rl.Log.E.Chk(rl.httpServer.Shutdown(ctx)) + rl.E.Chk(rl.httpServer.Shutdown(ctx)) rl.clients.Range(func(conn *websocket.Conn, _ struct{}) bool { - rl.Log.E.Chk(conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))) - rl.Log.E.Chk(conn.Close()) + rl.E.Chk(conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))) + rl.E.Chk(conn.Close()) rl.clients.Delete(conn) return true })