Skip to content

Commit

Permalink
some type alieses for neater function signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
mleku committed Jan 3, 2024
1 parent e027af6 commit f92b206
Show file tree
Hide file tree
Showing 19 changed files with 225 additions and 223 deletions.
8 changes: 4 additions & 4 deletions cmd/replicatrd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ func main() {
log2.SetLogLevel(log2.Trace)
rl := replicatr.NewRelay(appName)
db := &badger.BadgerBackend{Path: "/tmp/replicatr-badger"}
if e:= db.Init(); rl.Log.E.Chk(e) {
rl.Log.E.F("unable to start database: '%s'", e)
if e:= db.Init(); rl.E.Chk(e) {
rl.E.F("unable to start database: '%s'", e)
os.Exit(1)
}
rl.StoreEvent = append(rl.StoreEvent, db.SaveEvent)
rl.QueryEvents = append(rl.QueryEvents, db.QueryEvents)
rl.CountEvents = append(rl.CountEvents, db.CountEvents)
rl.DeleteEvent = append(rl.DeleteEvent, db.DeleteEvent)
rl.Log.I.Ln("running on :3334")
rl.Log.E.Chk(http.ListenAndServe(":3334", rl))
rl.I.Ln("running on :3334")
rl.E.Chk(http.ListenAndServe(":3334", rl))
}
37 changes: 20 additions & 17 deletions cmd/replicatrd/replicatr/adding.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
package replicatr

import (
"context"
"errors"
err "errors"
"fmt"

"github.com/fiatjaf/eventstore"
"github.com/Hubmakerlabs/replicatr/pkg/eventstore"
"github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize"
"github.com/nbd-wtf/go-nostr"
)

// AddEvent sends an event through then normal add pipeline, as if it was received from a websocket.
func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (e error) {
// AddEvent sends an event through then normal add pipeline, as if it was
// received from a websocket.
func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) {
if evt == nil {
return errors.New("error: event is nil")
e = err.New("error: event is nil")
rl.E.Ln(e)
return
}
for _, rejectors := range rl.RejectEvent {
if reject, msg := rejectors(ctx, evt); reject {
if msg == "" {
e = errors.New("blocked: no reason")
rl.Log.E.Ln(e)
e = err.New("blocked: no reason")
rl.E.Ln(e)
return
} else {
e = errors.New(nostr.NormalizeOKMessage(msg, "blocked"))
rl.Log.E.Ln(e)
e = err.New(normalize.OKMessage(msg, "blocked"))
rl.E.Ln(e)
return
}
}
Expand All @@ -35,12 +38,12 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (e error) {
for _, query := range rl.QueryEvents {
var ch chan *nostr.Event
ch, e = query(ctx, &nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}})
if rl.Log.E.Chk(e) {
if rl.E.Chk(e) {
continue
}
if previous := <-ch; previous != nil && isOlder(previous, evt) {
for _, del := range rl.DeleteEvent {
rl.Log.D.Chk(del(ctx, previous))
rl.D.Chk(del(ctx, previous))
}
}
}
Expand All @@ -54,25 +57,25 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *nostr.Event) (e error) {
Authors: []string{evt.PubKey},
Kinds: []int{evt.Kind},
Tags: nostr.TagMap{"d": []string{d.Value()}},
}); rl.Log.E.Chk(e) {
}); rl.E.Chk(e) {
continue
}
if previous := <-ch; previous != nil && isOlder(previous, evt) {
for _, del := range rl.DeleteEvent {
rl.Log.E.Chk(del(ctx, previous))
rl.E.Chk(del(ctx, previous))
}
}
}
}
}
// store
for _, store := range rl.StoreEvent {
if saveErr := store(ctx, evt); rl.Log.E.Chk(saveErr){
if saveErr := store(ctx, evt); rl.E.Chk(saveErr) {
switch {
case errors.Is(saveErr, eventstore.ErrDupEvent):
case err.Is(saveErr, eventstore.ErrDupEvent):
return nil
default:
return fmt.Errorf(nostr.NormalizeOKMessage(saveErr.Error(), "error"))
return fmt.Errorf(normalize.OKMessage(saveErr.Error(), "error"))
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions cmd/replicatrd/replicatr/broadcasting.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package replicatr

import (
"github.com/nbd-wtf/go-nostr"
)

// BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions
// it also doesn't attempt to store the event or trigger any reactions or callbacks
func (rl *Relay) BroadcastEvent(evt *nostr.Event) { notifyListeners(evt) }
func (rl *Relay) BroadcastEvent(evt Event) { notifyListeners(evt) }
11 changes: 4 additions & 7 deletions cmd/replicatrd/replicatr/deleting.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package replicatr

import (
"context"
"fmt"

"github.com/nbd-wtf/go-nostr"
)

func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) (e error) {
func (rl *Relay) handleDeleteRequest(ctx Ctx, evt Event) (e error) {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 && tag[0] == "e" {
// first we fetch the event
for _, query := range rl.QueryEvents {
var ch chan *nostr.Event
if ch, e = query(ctx, &nostr.Filter{IDs: []string{tag[1]}}); rl.Log.E.Chk(e) {
if ch, e = query(ctx, &nostr.Filter{IDs: []string{tag[1]}}); rl.E.Chk(e) {
continue
}
target := <-ch
Expand All @@ -34,20 +33,18 @@ func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *nostr.Event) (e e
if acceptDeletion {
// delete it
for _, del := range rl.DeleteEvent {
rl.Log.E.Chk(del(ctx, target))
rl.E.Chk(del(ctx, target))
}
} else {
// fail and stop here
e = fmt.Errorf("blocked: %s", msg)
rl.Log.E.Chk(e)
rl.E.Ln(e)
return
}

// don't try to query this same event again
break
}
}
}

return nil
}
68 changes: 34 additions & 34 deletions cmd/replicatrd/replicatr/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
err "errors"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -36,16 +36,16 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
var e error
var conn *websocket.Conn
conn, e = rl.upgrader.Upgrade(w, r, nil)
if rl.Log.E.Chk(e) {
rl.Log.E.F("failed to upgrade websocket: %v\n", e)
if rl.E.Chk(e) {
rl.E.F("failed to upgrade websocket: %v\n", e)
return
}
rl.clients.Store(conn, struct{}{})
ticker := time.NewTicker(rl.PingPeriod)
// NIP-42 challenge
challenge := make([]byte, 8)
_, e = rand.Read(challenge)
rl.Log.E.Chk(e)
rl.E.Chk(e)
ws := &WebSocket{
conn: conn,
Request: r,
Expand All @@ -64,7 +64,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ticker.Stop()
cancel()
if _, ok := rl.clients.Load(conn); ok {
rl.Log.E.Chk(conn.Close())
_ = conn.Close()
rl.clients.Delete(conn)
removeListener(ws)
}
Expand All @@ -73,7 +73,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
go rl.watcher(ctx, kill, ticker, ws)
}

func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSocket) {
func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
var e error
envelope := nostr.ParseMessage(message)
if envelope == nil {
Expand All @@ -86,7 +86,7 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc
hash := sha256.Sum256(env.Event.Serialize())
id := hex.EncodeToString(hash[:])
if id != env.Event.ID {
rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: id is computed incorrectly",
Expand All @@ -95,15 +95,15 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc
}
// check signature
var ok bool
if ok, e = env.Event.CheckSignature(); rl.Log.E.Chk(e) {
rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
if ok, e = env.Event.CheckSignature(); rl.E.Chk(e) {
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to verify signature"},
))
return
} else if !ok {
rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: signature is invalid"},
Expand All @@ -119,22 +119,22 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc
writeErr = rl.AddEvent(ctx, &env.Event)
}
var reason string
if !rl.Log.E.Chk(writeErr) {
ok = true
} else {
if ok = !rl.E.Chk(writeErr); !ok {
reason = writeErr.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
} else {
ok = true
}
rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: ok,
Reason: reason,
}))
case *nostr.CountEnvelope:
if rl.CountEvents == nil {
rl.Log.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{
SubscriptionID: env.SubscriptionID,
Reason: "unsupported: this relay does not support NIP-45"},
))
Expand All @@ -144,7 +144,7 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc
for _, filter := range env.Filters {
total += rl.handleCountRequest(ctx, ws, &filter)
}
rl.Log.E.Chk(ws.WriteJSON(nostr.CountEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.CountEnvelope{
SubscriptionID: env.SubscriptionID,
Count: &total,
}))
Expand All @@ -158,17 +158,17 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc
// handle each filter separately -- dispatching events as they're loaded from databases
for _, filter := range env.Filters {
e = rl.handleRequest(reqCtx, env.SubscriptionID, &eose, ws, &filter)
if rl.Log.E.Chk(e) {
if rl.E.Chk(e) {
// fail everything if any filter is rejected
reason := e.Error()
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
rl.Log.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{
SubscriptionID: env.SubscriptionID,
Reason: reason},
))
cancelReqCtx(errors.New("filter rejected"))
cancelReqCtx(err.New("filter rejected"))
return
}
}
Expand All @@ -177,7 +177,7 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc
// we can cancel the context and fire the EOSE message
eose.Wait()
cancelReqCtx(nil)
rl.Log.E.Chk(ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID)))
rl.E.Chk(ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID)))
}()
setListener(env.SubscriptionID, ws, env.Filters, cancelReqCtx)
case *nostr.CloseEnvelope:
Expand All @@ -192,26 +192,26 @@ func (rl *Relay) processMessages(message []byte, ctx context.Context, ws *WebSoc
ws.Authed = nil
}
ws.authLock.Unlock()
rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: true},
))
} else {
rl.Log.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to authenticate"},
))
}
}
}
func (rl *Relay) readMessages(ctx context.Context, kill func(), ws *WebSocket, conn *websocket.Conn, r *http.Request) {
func (rl *Relay) readMessages(ctx Ctx, kill func(), ws *WebSocket, conn *websocket.Conn, r *http.Request) {
defer kill()
conn.SetReadLimit(rl.MaxMessageSize)
rl.Log.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait)))
rl.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait)))
conn.SetPongHandler(func(string) (e error) {
e = conn.SetReadDeadline(time.Now().Add(rl.PongWait))
rl.Log.E.Chk(e)
rl.E.Chk(e)
return
})
for _, onConnect := range rl.OnConnect {
Expand All @@ -222,38 +222,38 @@ func (rl *Relay) readMessages(ctx context.Context, kill func(), ws *WebSocket, c
var typ int
var message []byte
typ, message, e = conn.ReadMessage()
if rl.Log.E.Chk(e) {
if e != nil {
if websocket.IsUnexpectedCloseError(
e,
websocket.CloseNormalClosure, // 1000
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
) {
rl.Log.E.F("unexpected close error from %s: %v\n",
rl.E.F("unexpected close error from %s: %v\n",
r.Header.Get("X-Forwarded-For"), e)
}
return
}
if typ == websocket.PingMessage {
rl.Log.E.Chk(ws.WriteMessage(websocket.PongMessage, nil))
rl.E.Chk(ws.WriteMessage(websocket.PongMessage, nil))
continue
}
go rl.processMessages(message, ctx, ws)
}
}

func (rl *Relay) watcher(ctx context.Context, kill func(), ticker *time.Ticker, ws *WebSocket) {
func (rl *Relay) watcher(ctx Ctx, kill func(), ticker *time.Ticker, ws *WebSocket) {
var e error
defer kill()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if e = ws.WriteMessage(websocket.PingMessage, nil); rl.Log.E.Chk(e) {
if e = ws.WriteMessage(websocket.PingMessage, nil); rl.E.Chk(e) {
if !strings.HasSuffix(e.Error(), "use of closed network connection") {
rl.Log.E.F("error writing ping: %v; closing websocket\n", e)
rl.E.F("error writing ping: %v; closing websocket\n", e)
}
return
}
Expand All @@ -263,9 +263,9 @@ func (rl *Relay) watcher(ctx context.Context, kill func(), ticker *time.Ticker,

func (rl *Relay) HandleNIP11(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/nostr+json")
info := *rl.Info
for _, ovw := range rl.OverwriteRelayInformation {
info := rl.Info
for _, ovw := range rl.OverwriteRelayInfo {
info = ovw(r.Context(), r, info)
}
rl.Log.E.Chk(json.NewEncoder(w).Encode(info))
rl.E.Chk(json.NewEncoder(w).Encode(info))
}
Loading

0 comments on commit f92b206

Please sign in to comment.