Skip to content

Commit

Permalink
Merge pull request #16 from fishcakeday/reuse-relay-connection
Browse files Browse the repository at this point in the history
Reuse relay connection
  • Loading branch information
believethehype authored Apr 25, 2023
2 parents 049602e + ab13d1f commit f1f40de
Showing 1 changed file with 130 additions and 17 deletions.
147 changes: 130 additions & 17 deletions nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"strings"
"sync"
Expand All @@ -32,6 +34,21 @@ var nip57Receipt nostr.Event
var zapEventSerializedStr string
var nip57ReceiptRelays []string

// Relay connections
type RelayConnection struct {
URL string
relay *nostr.Relay
lastUsed time.Time
closeChan chan bool
}

var relayConnections = make(map[string]*RelayConnection)
var relayConnectionsMutex sync.Mutex
var connectionTimeout = 30 * time.Minute
var ignoreRelayDuration = 5 * time.Minute
var ignoredRelays = make(map[string]time.Time)
var ignoredRelaysMutex sync.Mutex

func Nip57DescriptionHash(zapEventSerialized string) string {
hash := sha256.Sum256([]byte(zapEventSerialized))
hashString := hex.EncodeToString(hash[:])
Expand Down Expand Up @@ -195,6 +212,90 @@ func GetNostrProfileMetaData(npub string) (nostr.ProfileMetadata, error) {

}

func ignoreRelay(url string) {
ignoredRelaysMutex.Lock()
defer ignoredRelaysMutex.Unlock()
ignoredRelays[url] = time.Now()
}

func isRelayIgnored(url string) bool {
ignoredRelaysMutex.Lock()
defer ignoredRelaysMutex.Unlock()

if t, ok := ignoredRelays[url]; ok {
if time.Since(t) < ignoreRelayDuration {
return true
}
delete(ignoredRelays, url)
}
return false
}

func isBrokenPipeError(err error) bool {
var netErr net.Error
if errors.As(err, &netErr) {
if strings.Contains(netErr.Error(), "write: broken pipe") {
return true
}
}
return false
}

func getRelayConnection(url string) (*nostr.Relay, error) {
if isRelayIgnored(url) {
return nil, fmt.Errorf("relay %s is being ignored", url)
}

relayConnectionsMutex.Lock()
defer relayConnectionsMutex.Unlock()

if relayConn, ok := relayConnections[url]; ok {
relayConn.lastUsed = time.Now()
return relayConn.relay, nil
}

ctx := context.WithValue(context.Background(), "url", url)
relay, err := nostr.RelayConnect(ctx, url)
if err != nil {
ignoreRelay(url)
return nil, err
}

relayConn := &RelayConnection{
URL: url,
relay: relay,
lastUsed: time.Now(),
closeChan: make(chan bool),
}
relayConnections[url] = relayConn

go func() {
select {
case <-time.After(connectionTimeout):
relayConnectionsMutex.Lock()
if time.Since(relayConn.lastUsed) >= connectionTimeout {
relay.Close()
delete(relayConnections, url)
}
relayConnectionsMutex.Unlock()
case <-relayConn.closeChan:
}
}()

return relay, nil
}

func closeRelayConnection(url string) {
relayConnectionsMutex.Lock()
defer relayConnectionsMutex.Unlock()

if relayConn, ok := relayConnections[url]; ok {
relayConn.closeChan <- true
relayConn.relay.Close()
delete(relayConnections, url)
}
}

func publishNostrEvent(ev nostr.Event, relays []string) {
// Add more relays, remove trailing slashes, and ensure unique relays
relays = uniqueSlice(cleanUrls(append(relays, Relays...)))
Expand All @@ -209,24 +310,36 @@ func publishNostrEvent(ev nostr.Event, relays []string) {
go func(url string) {
defer wg.Done()

ctx := context.WithValue(context.Background(), "url", url)
relay, err := nostr.RelayConnect(ctx, url)
if err != nil {
log.Printf("Error connecting to relay %s: %v", url, err)
return
var err error
var relay *nostr.Relay
var status nostr.Status
maxRetries := 3

for i := 0; i < maxRetries; i++ {
relay, err = getRelayConnection(url)
if err != nil {
log.Printf("Error connecting to relay %s: %v", url, err)
return
}

time.Sleep(3 * time.Second)

ctx := context.WithValue(context.Background(), "url", url)
status, err = relay.Publish(ctx, ev)
if err != nil {
log.Printf("Error publishing to relay %s: %v", url, err)

if isBrokenPipeError(err) {
closeRelayConnection(url) // Close the broken connection
continue // Retry connection and publish
}
} else {
log.Printf("[NOSTR] published to %s: %s", url, status.String()) // Convert the nostr.Status value to a string
break
}

time.Sleep(3 * time.Second)
}
defer relay.Close()

time.Sleep(3 * time.Second)

status, err := relay.Publish(ctx, ev)
if err != nil {
log.Printf("Error publishing to relay %s: %v", url, err)
} else {
log.Printf("[NOSTR] published to %s: %s", url, status)
}

time.Sleep(3 * time.Second)
}(url)
}

Expand Down

0 comments on commit f1f40de

Please sign in to comment.