Skip to content

Commit

Permalink
all tidy and ready to start haxing
Browse files Browse the repository at this point in the history
  • Loading branch information
mleku committed Jan 3, 2024
1 parent f92b206 commit e0637e4
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 102 deletions.
23 changes: 13 additions & 10 deletions cmd/replicatrd/replicatr/adding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import (

"github.com/Hubmakerlabs/replicatr/pkg/eventstore"
"github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize"
"github.com/nbd-wtf/go-nostr"
"github.com/Hubmakerlabs/replicatr/pkg/nostr/tag"
)

// AddEvent sends an event through then normal add pipeline, as if it was
// received from a websocket.
func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) {
func (rl *Relay) AddEvent(ctx Ctx, evt *Event) (e error) {
if evt == nil {
e = err.New("error: event is nil")
rl.E.Ln(e)
return
}
for _, rejectors := range rl.RejectEvent {
if reject, msg := rejectors(ctx, evt); reject {
for _, rej := range rl.RejectEvent {
if reject, msg := rej(ctx, evt); reject {
if msg == "" {
e = err.New("blocked: no reason")
rl.E.Ln(e)
Expand All @@ -36,8 +36,11 @@ func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) {
if evt.Kind == 0 || evt.Kind == 3 || (10000 <= evt.Kind && evt.Kind < 20000) {
// replaceable event, delete before storing
for _, query := range rl.QueryEvents {
var ch chan *nostr.Event
ch, e = query(ctx, &nostr.Filter{Authors: []string{evt.PubKey}, Kinds: []int{evt.Kind}})
var ch chan *Event
ch, e = query(ctx, &Filter{
Authors: tag.T{evt.PubKey},
Kinds: []int{evt.Kind},
})
if rl.E.Chk(e) {
continue
}
Expand All @@ -52,11 +55,11 @@ func (rl *Relay) AddEvent(ctx Ctx, evt Event) (e error) {
d := evt.Tags.GetFirst([]string{"d", ""})
if d != nil {
for _, query := range rl.QueryEvents {
var ch chan *nostr.Event
if ch, e = query(ctx, &nostr.Filter{
Authors: []string{evt.PubKey},
var ch chan *Event
if ch, e = query(ctx, &Filter{
Authors: tag.T{evt.PubKey},
Kinds: []int{evt.Kind},
Tags: nostr.TagMap{"d": []string{d.Value()}},
Tags: TagMap{"d": []string{d.Value()}},
}); rl.E.Chk(e) {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/replicatrd/replicatr/broadcasting.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package replicatr

// BroadcastEvent emits an event to all listeners whose filters' match, skipping all filters and actions
// it also doesn't attempt to store the event or trigger any reactions or callbacks
func (rl *Relay) BroadcastEvent(evt Event) { notifyListeners(evt) }
func (rl *Relay) BroadcastEvent(evt *Event) { notifyListeners(evt) }
12 changes: 6 additions & 6 deletions cmd/replicatrd/replicatr/deleting.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package replicatr
import (
"fmt"

"github.com/nbd-wtf/go-nostr"
"github.com/Hubmakerlabs/replicatr/pkg/nostr/tag"
)

func (rl *Relay) handleDeleteRequest(ctx Ctx, evt Event) (e error) {
func (rl *Relay) handleDeleteRequest(ctx Ctx, evt *Event) (e error) {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 && tag[0] == "e" {
for _, t := range evt.Tags {
if len(t) >= 2 && t[0] == "e" {
// first we fetch the event
for _, query := range rl.QueryEvents {
var ch chan *nostr.Event
if ch, e = query(ctx, &nostr.Filter{IDs: []string{tag[1]}}); rl.E.Chk(e) {
var ch chan *Event
if ch, e = query(ctx, &Filter{IDs: tag.T{t[1]}}); rl.E.Chk(e) {
continue
}
target := <-ch
Expand Down
40 changes: 20 additions & 20 deletions cmd/replicatrd/replicatr/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// ServeHTTP implements http.Handler interface.
func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (rl *Relay) ServeHTTP(w ResponseWriter, r *Request) {
if rl.ServiceURL == "" {
rl.ServiceURL = getServiceBaseURL(r)
}
Expand All @@ -32,9 +32,9 @@ func (rl *Relay) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
func (rl *Relay) HandleWebsocket(w ResponseWriter, r *Request) {
var e error
var conn *websocket.Conn
var conn *Conn
conn, e = rl.upgrader.Upgrade(w, r, nil)
if rl.E.Chk(e) {
rl.E.F("failed to upgrade websocket: %v\n", e)
Expand All @@ -49,7 +49,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
ws := &WebSocket{
conn: conn,
Request: r,
Challenge: hex.EncodeToString(challenge),
Challenge: encodeToHex(challenge),
}
ctx, cancel := context.WithCancel(
context.WithValue(
Expand Down Expand Up @@ -81,12 +81,12 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
return
}
switch env := envelope.(type) {
case *nostr.EventEnvelope:
case *EventEnvelope:
// check id
hash := sha256.Sum256(env.Event.Serialize())
id := hex.EncodeToString(hash[:])
if id != env.Event.ID {
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: id is computed incorrectly",
Expand All @@ -96,14 +96,14 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
// check signature
var ok bool
if ok, e = env.Event.CheckSignature(); rl.E.Chk(e) {
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to verify signature"},
))
return
} else if !ok {
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "invalid: signature is invalid"},
Expand All @@ -127,14 +127,14 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
} else {
ok = true
}
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(OKEnvelope{
EventID: env.Event.ID,
OK: ok,
Reason: reason,
}))
case *nostr.CountEnvelope:
case *CountEnvelope:
if rl.CountEvents == nil {
rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{
rl.E.Chk(ws.WriteJSON(ClosedEnvelope{
SubscriptionID: env.SubscriptionID,
Reason: "unsupported: this relay does not support NIP-45"},
))
Expand All @@ -144,11 +144,11 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
for _, filter := range env.Filters {
total += rl.handleCountRequest(ctx, ws, &filter)
}
rl.E.Chk(ws.WriteJSON(nostr.CountEnvelope{
rl.E.Chk(ws.WriteJSON(CountEnvelope{
SubscriptionID: env.SubscriptionID,
Count: &total,
}))
case *nostr.ReqEnvelope:
case *ReqEnvelope:
eose := sync.WaitGroup{}
eose.Add(len(env.Filters))
// a context just for the "stored events" request handler
Expand All @@ -164,7 +164,7 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
if strings.HasPrefix(reason, "auth-required:") {
RequestAuth(ctx)
}
rl.E.Chk(ws.WriteJSON(nostr.ClosedEnvelope{
rl.E.Chk(ws.WriteJSON(ClosedEnvelope{
SubscriptionID: env.SubscriptionID,
Reason: reason},
))
Expand All @@ -177,12 +177,12 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
// we can cancel the context and fire the EOSE message
eose.Wait()
cancelReqCtx(nil)
rl.E.Chk(ws.WriteJSON(nostr.EOSEEnvelope(env.SubscriptionID)))
rl.E.Chk(ws.WriteJSON(EOSEEnvelope(env.SubscriptionID)))
}()
setListener(env.SubscriptionID, ws, env.Filters, cancelReqCtx)
case *nostr.CloseEnvelope:
case *CloseEnvelope:
removeListenerId(ws, string(*env))
case *nostr.AuthEnvelope:
case *AuthEnvelope:
wsBaseUrl := strings.Replace(rl.ServiceURL, "http", "ws", 1)
if pubkey, ok := nip42.ValidateAuthEvent(&env.Event, ws.Challenge, wsBaseUrl); ok {
ws.AuthedPublicKey = pubkey
Expand All @@ -192,20 +192,20 @@ func (rl *Relay) processMessages(message []byte, ctx Ctx, ws *WebSocket) {
ws.Authed = nil
}
ws.authLock.Unlock()
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(OKEnvelope{
EventID: env.Event.ID,
OK: true},
))
} else {
rl.E.Chk(ws.WriteJSON(nostr.OKEnvelope{
rl.E.Chk(ws.WriteJSON(OKEnvelope{
EventID: env.Event.ID,
OK: false,
Reason: "error: failed to authenticate"},
))
}
}
}
func (rl *Relay) readMessages(ctx Ctx, kill func(), ws *WebSocket, conn *websocket.Conn, r *http.Request) {
func (rl *Relay) readMessages(ctx Ctx, kill func(), ws *WebSocket, conn *Conn, r *Request) {
defer kill()
conn.SetReadLimit(rl.MaxMessageSize)
rl.E.Chk(conn.SetReadDeadline(time.Now().Add(rl.PongWait)))
Expand Down
5 changes: 2 additions & 3 deletions cmd/replicatrd/replicatr/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package replicatr
import (
"encoding/hex"
"hash/maphash"
"net/http"
"strconv"
"strings"
"unsafe"
Expand All @@ -21,12 +20,12 @@ func pointerHasher[V any](_ maphash.Seed, k *V) uint64 {
return uint64(uintptr(unsafe.Pointer(k)))
}

func isOlder(prev, next Event) bool {
func isOlder(prev, next *Event) bool {
p, n := prev.CreatedAt, next.CreatedAt
return p < n || (p == n && prev.ID > next.ID)
}

func getServiceBaseURL(r *http.Request) string {
func getServiceBaseURL(r *Request) string {
host := r.Header.Get("X-Forwarded-Host")
if host == "" {
host = r.Host
Expand Down
27 changes: 15 additions & 12 deletions cmd/replicatrd/replicatr/listener.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package replicatr

import (
"context"
"fmt"

"github.com/nbd-wtf/go-nostr"
"github.com/puzpuzpuz/xsync/v2"
)

type Listener struct {
filters nostr.Filters
cancel context.CancelCauseFunc
filters Filters
cancel CancelCauseFunc
}

var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket])
var listeners = xsync.NewTypedMapOf[*WebSocket,
ListenerMap](pointerHasher[WebSocket])

func GetListeningFilters() (respFilters nostr.Filters) {
respFilters = make(nostr.Filters, 0, listeners.Size()*2)
func GetListeningFilters() (respFilters Filters) {
respFilters = make(Filters, 0, listeners.Size()*2)
// here we go through all the existing listeners
listeners.Range(func(_ *WebSocket, subs *xsync.MapOf[string, *Listener]) bool {
listeners.Range(func(_ *WebSocket, subs ListenerMap) bool {
subs.Range(func(_ string, listener *Listener) bool {
for _, listenerFilter := range listener.filters {
for _, respFilter := range respFilters {
Expand All @@ -40,8 +40,8 @@ func GetListeningFilters() (respFilters nostr.Filters) {
return
}

func setListener(id string, ws *WebSocket, f Filters, c context.CancelCauseFunc) {
subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] {
func setListener(id string, ws *WebSocket, f Filters, c CancelCauseFunc) {
subs, _ := listeners.LoadOrCompute(ws, func() ListenerMap {
return xsync.NewMapOf[*Listener]()
})
subs.Store(id, &Listener{filters: f, cancel: c})
Expand All @@ -64,13 +64,16 @@ func removeListenerId(ws *WebSocket, id string) {
// (no need to cancel contexts as they are all inherited from the main connection context)
func removeListener(ws *WebSocket) { listeners.Delete(ws) }

func notifyListeners(event Event) {
listeners.Range(func(ws *WebSocket, subs *xsync.MapOf[string, *Listener]) bool {
func notifyListeners(event *Event) {
listeners.Range(func(ws *WebSocket, subs ListenerMap) bool {
subs.Range(func(id string, listener *Listener) bool {
if !listener.filters.Match(event) {
return true
}
log.E.Chk(ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &id, Event: *event}))
log.E.Chk(ws.WriteJSON(EventEnvelope{
SubscriptionID: &id,
Event: *event},
))
return true
})
return true
Expand Down
57 changes: 40 additions & 17 deletions cmd/replicatrd/replicatr/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/http"
"os"
"sync"
"time"

log2 "github.com/Hubmakerlabs/replicatr/pkg/log"
Expand All @@ -23,29 +24,51 @@ const (
MaxMessageSize int64 = 512000 // ???
)

// aliases so we can swap out to another package with only here changed
type (
Ctx = context.Context
Filter = *nostr.Filter
Event = *nostr.Event
Info = *nip11.RelayInformationDocument
Filters = nostr.Filters
RejectEvent func(ctx Ctx, event Event) (reject bool, msg string)
RejectFilter func(ctx Ctx, filter Filter) (reject bool, msg string)
OverwriteFilter func(ctx Ctx, filter Filter)
OverwriteDeletionOutcome func(ctx Ctx, target Event, del Event) (accept bool, msg string)
OverwriteResponseEvent func(ctx Ctx, event Event)
Events func(ctx Ctx, event Event) error
Ctx = context.Context
Info = nip11.RelayInformationDocument
Event = nostr.Event
Filter = nostr.Filter
Filters = nostr.Filters
TagMap = nostr.TagMap
EventEnvelope = nostr.EventEnvelope
OKEnvelope = nostr.OKEnvelope
CountEnvelope = nostr.CountEnvelope
ClosedEnvelope = nostr.ClosedEnvelope
ReqEnvelope = nostr.ReqEnvelope
EOSEEnvelope = nostr.EOSEEnvelope
CloseEnvelope = nostr.CloseEnvelope
AuthEnvelope = nostr.AuthEnvelope
NoticeEnvelope = nostr.NoticeEnvelope
Conn = websocket.Conn
Request = http.Request
ResponseWriter = http.ResponseWriter
Mutex = sync.Mutex
WaitGroup = sync.WaitGroup
CancelCauseFunc = context.CancelCauseFunc
ListenerMap = *xsync.MapOf[string, *Listener]
)

// function types used in the relay state
type (
RejectEvent func(ctx Ctx, event *Event) (reject bool, msg string)
RejectFilter func(ctx Ctx, filter *Filter) (reject bool, msg string)
OverwriteFilter func(ctx Ctx, filter *Filter)
OverwriteDeletionOutcome func(ctx Ctx, target *Event, del *Event) (accept bool, msg string)
OverwriteResponseEvent func(ctx Ctx, event *Event)
Events func(ctx Ctx, event *Event) error
Hook func(ctx Ctx)
OverwriteRelayInformation func(ctx Ctx, r *http.Request, info Info) Info
QueryEvents func(ctx Ctx, filter Filter) (eventC chan Event, e error)
CountEvents func(ctx Ctx, filter Filter) (c int64, e error)
OnEventSaved func(ctx Ctx, event Event)
OverwriteRelayInformation func(ctx Ctx, r *Request, info *Info) *Info
QueryEvents func(ctx Ctx, filter *Filter) (eventC chan *Event, e error)
CountEvents func(ctx Ctx, filter *Filter) (c int64, e error)
OnEventSaved func(ctx Ctx, event *Event)
)

func NewRelay(appName string) (r *Relay) {
r = &Relay{
Log: log2.New(os.Stderr, appName, 0),
Info: &nip11.RelayInformationDocument{
Info: &Info{
Software: "https://github.com/Hubmakerlabs/replicatr/cmd/khatru",
Version: "n/a",
SupportedNIPs: make([]int, 0),
Expand Down Expand Up @@ -83,7 +106,7 @@ type Relay struct {
OnDisconnect []Hook
OnEventSaved []OnEventSaved
// editing info will affect
Info Info
Info *Info
*log2.Log
// for establishing websockets
upgrader websocket.Upgrader
Expand Down
Loading

0 comments on commit e0637e4

Please sign in to comment.