From ca0f00966a8c487185900bef4c7c00e6c4e937a6 Mon Sep 17 00:00:00 2001 From: Bob Stevens <35038919+restevens402@users.noreply.github.com> Date: Wed, 21 Aug 2024 15:01:05 -0700 Subject: [PATCH 01/10] Refactor network address handling and improve IP retrieval Restructure how multiaddresses and IP addresses are handled within the codebase to improve maintainability and efficiency. Added logic for obtaining GCP external IP addresses and restructured public address retrieval. Moved HTTP client functionality to a new package and updated references accordingly. --- cmd/masa-node/main.go | 8 +- pkg/network/address.go | 182 ++++++++++++------ .../handlers => network}/http_client.go | 2 +- pkg/network/kdht.go | 20 +- pkg/oracle_node.go | 20 +- pkg/pubsub/node_data.go | 39 ++-- pkg/workers/handlers/llm.go | 3 +- 7 files changed, 168 insertions(+), 106 deletions(-) rename pkg/{workers/handlers => network}/http_client.go (99%) diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index ff934f8f..7caf3762 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -6,6 +6,8 @@ import ( "os/signal" "syscall" + "github.com/multiformats/go-multiaddr" + "github.com/masa-finance/masa-oracle/internal/versioning" "github.com/masa-finance/masa-oracle/pkg/workers" @@ -133,10 +135,10 @@ func main() { }() // Get the multiaddress and IP address of the node - multiAddr := node.GetMultiAddrs().String() // Get the multiaddress - ipAddr := node.Host.Addrs()[0].String() // Get the IP address + multiAddr := node.GetMultiAddrs() // Get the multiaddress + ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address // Display the welcome message with the multiaddress and IP address - config.DisplayWelcomeMessage(multiAddr, ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) + config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) <-ctx.Done() } diff --git a/pkg/network/address.go b/pkg/network/address.go index 22533824..7b7995db 100644 --- a/pkg/network/address.go +++ b/pkg/network/address.go @@ -2,28 +2,19 @@ package network import ( "fmt" + "io" "net" - "os" + "net/http" "strings" - "github.com/chyeh/pubip" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" ) -// getOutboundIP returns the outbound IP address of the current machine 172.17.0.2 10.0.0.2 etc -func getOutboundIP() string { - conn, err := net.Dial("udp", "8.8.8.8:80") - if err != nil { - logrus.Warn("[-] Error getting outbound IP") - } - defer conn.Close() - localAddr := conn.LocalAddr().String() - idx := strings.LastIndex(localAddr, ":") - return localAddr[0:idx] -} +// The URL of the GCP metadata server for the external IP +const externalIPURL = "http://metadata.google.internal/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip" // GetMultiAddressesForHost returns the multiaddresses for the host func GetMultiAddressesForHost(host host.Host) ([]multiaddr.Multiaddr, error) { @@ -56,82 +47,165 @@ func GetMultiAddressesForHostQuiet(host host.Host) []multiaddr.Multiaddr { return ma } -// GetPriorityAddress returns the best public or private IP address -func GetPriorityAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { - bestAddr := getBestPublicAddress(addrs) - if bestAddr != nil { - return bestAddr +// getPublicMultiAddress returns the best public IP address +func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { + ipBytes, err := Get("https://api.ipify.org?format=text", nil) + externalIP := net.ParseIP(string(ipBytes)) + if err != nil { + logrus.Warnf("[-] Failed to get public IP: %v", err) + return nil + } + if externalIP == nil || externalIP.IsPrivate() { + return nil + } + + var addrToCopy multiaddr.Multiaddr + for _, addr := range addrs { + addrToCopy = addr + break } + publicMultiaddr, err := replaceIPComponent(addrToCopy, externalIP.String()) + if err != nil { + logrus.Warnf("[-] Failed to create multiaddr with public IP: %v", err) + return nil + } + return publicMultiaddr +} +// GetPriorityAddress returns the best public or private IP address +func GetPriorityAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { var bestPrivateAddr multiaddr.Multiaddr + bestPublicAddr := getPublicMultiAddress(addrs) for _, addr := range addrs { ipComponent, err := addr.ValueForProtocol(multiaddr.P_IP4) if err != nil { - continue // Not an IP address + ipComponent, err = addr.ValueForProtocol(multiaddr.P_IP6) + if err != nil { + continue // Not an IP address + } } - ip := net.ParseIP(ipComponent) if ip == nil || ip.IsLoopback() { continue // Skip invalid or loopback addresses } - if ip.IsPrivate() { + // If it's the first private address found or if it's preferred over the current best, keep it if bestPrivateAddr == nil || isPreferredAddress(addr) { bestPrivateAddr = addr } + } else { + // If it's the first public address found or if it's preferred over the current best, keep it + if bestPublicAddr == nil || isPreferredAddress(addr) { + bestPublicAddr = addr + } } } - - if bestPrivateAddr != nil { - return bestPrivateAddr + var baseAddr multiaddr.Multiaddr + // Prefer public addresses over private ones + if bestPublicAddr != nil { + baseAddr = bestPublicAddr + } else if bestPrivateAddr != nil { + baseAddr = bestPrivateAddr + } else { + logrus.Warn("No address matches the priority criteria, returning the first entry") + baseAddr = addrs[0] } - - if len(addrs) > 0 { - logrus.Warn("[-] No suitable address found, returning the first entry") - return addrs[0] + logrus.Infof("Best public address: %s", bestPublicAddr) + logrus.Debugf("Best private address: %s", bestPrivateAddr) + logrus.Debugf("Base address: %s", baseAddr) + gcpAddr := replaceGCPAddress(baseAddr) + if gcpAddr != nil { + baseAddr = gcpAddr } - - return nil + return baseAddr } -// getBestPublicAddress returns the best public IP address -func getBestPublicAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { - var externalIP net.IP +func replaceGCPAddress(addr multiaddr.Multiaddr) multiaddr.Multiaddr { + // After finding the best address, try to get the GCP external IP var err error - - if os.Getenv("ENV") == "local" { - externalIP = net.ParseIP(getOutboundIP()) - } else { - externalIP, err = pubip.Get() + var bestAddr multiaddr.Multiaddr + gotExternalIP, externalIP := getGCPExternalIP() + if gotExternalIP && externalIP != "" { + bestAddr, err = replaceIPComponent(addr, externalIP) if err != nil { - logrus.Warnf("[-] Failed to get public IP: %v", err) + logrus.Warnf("Failed to replace IP component: %s", err) return nil } } + logrus.Debug("Got external IP: ", gotExternalIP) + logrus.Debug("Address after replacing IP component: ", bestAddr) + return bestAddr +} - if externalIP == nil || externalIP.IsPrivate() { - return nil +func replaceIPComponent(maddr multiaddr.Multiaddr, newIP string) (multiaddr.Multiaddr, error) { + var components []multiaddr.Multiaddr + for _, component := range multiaddr.Split(maddr) { + if component.Protocols()[0].Code == multiaddr.P_IP4 || component.Protocols()[0].Code == multiaddr.P_IP6 { + // Create a new IP component + newIPComponent, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s", newIP)) + if err != nil { + return nil, err + } + components = append(components, newIPComponent) + } else { + components = append(components, component) + } } + return multiaddr.Join(components...), nil +} + +//func contains(slice []multiaddr.Multiaddr, item multiaddr.Multiaddr) bool { +// for _, a := range slice { +// if a.Equal(item) { +// return true +// } +// } +// return false +//} + +func getGCPExternalIP() (bool, string) { + + // Create a new HTTP client with a specific timeout + client := &http.Client{} - // Create a new multiaddr with the public IP - publicAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s", externalIP.String())) + // Make a request to the metadata server + req, err := http.NewRequest("GET", externalIPURL, nil) if err != nil { - logrus.Warnf("[-] Failed to create multiaddr with public IP: %v", err) - return nil + return false, "" } - // Find a suitable port from existing addresses - for _, addr := range addrs { - if strings.HasPrefix(addr.String(), "/ip4/") { - port, err := addr.ValueForProtocol(multiaddr.P_TCP) - if err == nil { - return publicAddr.Encapsulate(multiaddr.StringCast("/tcp/" + port)) - } - } + // GCP metadata server requires this specific header + req.Header.Add("Metadata-Flavor", "Google") + + // Perform the request + resp, err := client.Do(req) + if err != nil { + return false, "" } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + logrus.Error(err) + } + }(resp.Body) - return publicAddr + // Check if the metadata server returns a successful status code + if resp.StatusCode != http.StatusOK { + logrus.Debug("Metadata server response status: ", resp.StatusCode) + return false, "" + } + //Read the external IP from the response + body, err := io.ReadAll(resp.Body) + if err != nil { + return true, "" + } + // Check that the response is a valid IP address + if net.ParseIP(string(body)) == nil { + return false, "" + } + logrus.Debug("External IP from metadata server: ", string(body)) + return true, string(body) } // isPreferredAddress checks if the multiaddress contains the UDP protocol diff --git a/pkg/workers/handlers/http_client.go b/pkg/network/http_client.go similarity index 99% rename from pkg/workers/handlers/http_client.go rename to pkg/network/http_client.go index 50f873b2..4a5ffa18 100644 --- a/pkg/workers/handlers/http_client.go +++ b/pkg/network/http_client.go @@ -1,4 +1,4 @@ -package handlers +package network import ( "bytes" diff --git a/pkg/network/kdht.go b/pkg/network/kdht.go index 5bb8a78e..baf4de40 100644 --- a/pkg/network/kdht.go +++ b/pkg/network/kdht.go @@ -2,6 +2,7 @@ package network import ( "context" + "encoding/json" "strings" "sync" "time" @@ -11,9 +12,9 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" + "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/pubsub" ) @@ -29,8 +30,8 @@ type dbValidator struct{} func (dbValidator) Validate(_ string, _ []byte) error { return nil } func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } -func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, - protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) { +func WithDht(ctx context.Context, host host.Host, protocolId, prefix protocol.ID, + peerChan chan PeerEvent, nodeData *pubsub.NodeData) (*dht.IpfsDHT, error) { options := make([]dht.Option, 0) options = append(options, dht.BucketSize(100)) // Adjust bucket size options = append(options, dht.Concurrency(100)) // Increase concurrency @@ -39,6 +40,11 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul options = append(options, dht.ProtocolPrefix(prefix)) options = append(options, dht.NamespacedValidator("db", dbValidator{})) + bootstrapNodes, err := GetBootNodesMultiAddress(config.GetInstance().Bootnodes) + if err != nil { + return nil, err + } + kademliaDHT, err := dht.New(ctx, host, options...) if err != nil { return nil, err @@ -122,13 +128,13 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul } }(stream) // Close the stream when done - multiaddr, err := GetMultiAddressesForHost(host) + // Convert NodeData to JSON + jsonData, err := json.Marshal(nodeData) if err != nil { - logrus.Errorf("[-] Error getting multiaddresses for host: %s", err) + logrus.Error("[-] Error marshalling NodeData:", err) return } - multaddrString := GetPriorityAddress(multiaddr) - _, err = stream.Write(pubsub.GetSelfNodeDataJson(host, isStaked, multaddrString.String())) + _, err = stream.Write(jsonData) if err != nil { logrus.Errorf("[-] Error writing to stream: %s", err) return diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index 1089c56f..8baa70dc 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -176,11 +176,6 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { func (node *OracleNode) Start() (err error) { logrus.Infof("[+] Starting node with ID: %s", node.GetMultiAddrs().String()) - bootNodeAddrs, err := myNetwork.GetBootNodesMultiAddress(config.GetInstance().Bootnodes) - if err != nil { - return err - } - node.Host.SetStreamHandler(node.Protocol, node.handleStream) node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData) @@ -192,7 +187,9 @@ func (node *OracleNode) Start() (err error) { go node.ListenToNodeTracker() go node.handleDiscoveredPeers() - node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked) + myNodeData := pubsub2.GetSelfNodeData(node.Host, node.IsStaked, node.priorityAddrs) + + node.DHT, err = myNetwork.WithDht(node.Context, node.Host, node.Protocol, config.MasaPrefix, node.PeerChan, myNodeData) if err != nil { return err } @@ -205,18 +202,9 @@ func (node *OracleNode) Start() (err error) { nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String()) if nodeData == nil { - publicKeyHex := masacrypto.KeyManagerInstance().EthAddress - ma := myNetwork.GetMultiAddressesForHostQuiet(node.Host) - nodeData = pubsub2.NewNodeData(ma[0], node.Host.ID(), publicKeyHex, pubsub2.ActivityJoined) - nodeData.IsStaked = node.IsStaked + nodeData = myNodeData nodeData.SelfIdentified = true - nodeData.IsDiscordScraper = node.IsDiscordScraper - nodeData.IsTelegramScraper = node.IsTelegramScraper - nodeData.IsTwitterScraper = node.IsTwitterScraper - nodeData.IsWebScraper = node.IsWebScraper - nodeData.IsValidator = node.IsValidator } - nodeData.Joined() node.NodeTracker.HandleNodeData(*nodeData) diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index 65ae139d..3a35d80a 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -179,7 +179,7 @@ func (n *NodeData) Joined() { n.Version = config.GetInstance().Version - logMessage := fmt.Sprintf("[+] %s node joined: %s", map[bool]string{true: "Staked", false: "Unstaked"}[n.IsStaked], n.Address()) + logMessage := fmt.Sprintf("[+] %s node joined: %s", map[bool]string{true: "Staked", false: "Unstaked"}[n.IsStaked], n.MultiaddrsString) if n.IsStaked { logrus.Info(logMessage) } else { @@ -255,31 +255,22 @@ func (n *NodeData) UpdateAccumulatedUptime() { n.AccumulatedUptimeStr = n.AccumulatedUptime.String() } -// GetSelfNodeDataJson converts the local node's data into a JSON byte array. +// GetSelfNodeData converts the local node's data into a JSON byte array. // It populates a NodeData struct with the node's ID, staking status, and Ethereum address. // The NodeData struct is then marshalled into a JSON byte array. // Returns nil if there is an error marshalling to JSON. -func GetSelfNodeDataJson(host host.Host, isStaked bool, multiaddrString string) []byte { +func GetSelfNodeData(host host.Host, isStaked bool, addr multiaddr.Multiaddr) *NodeData { // Create and populate NodeData - nodeData := NodeData{ - PeerId: host.ID(), - MultiaddrsString: multiaddrString, - IsStaked: isStaked, - EthAddress: masacrypto.KeyManagerInstance().EthAddress, - IsTwitterScraper: config.GetInstance().TwitterScraper, - IsDiscordScraper: config.GetInstance().DiscordScraper, - IsTelegramScraper: config.GetInstance().TelegramScraper, - IsWebScraper: config.GetInstance().WebScraper, - IsValidator: config.GetInstance().Validator, - IsActive: true, - Version: versioning.ProtocolVersion, - } - - // Convert NodeData to JSON - jsonData, err := json.Marshal(nodeData) - if err != nil { - logrus.Error("[-] Error marshalling NodeData:", err) - return nil - } - return jsonData + nodeData := NewNodeData(addr, host.ID(), masacrypto.KeyManagerInstance().EthAddress, ActivityJoined) + nodeData.MultiaddrsString = addr.String() + nodeData.IsStaked = isStaked + nodeData.IsTwitterScraper = config.GetInstance().TwitterScraper + nodeData.IsDiscordScraper = config.GetInstance().DiscordScraper + nodeData.IsTelegramScraper = config.GetInstance().TelegramScraper + nodeData.IsWebScraper = config.GetInstance().WebScraper + nodeData.IsValidator = config.GetInstance().Validator + nodeData.IsActive = true + nodeData.Version = versioning.ProtocolVersion + + return nodeData } diff --git a/pkg/workers/handlers/llm.go b/pkg/workers/handlers/llm.go index 658fcf77..28aa8da1 100644 --- a/pkg/workers/handlers/llm.go +++ b/pkg/workers/handlers/llm.go @@ -7,6 +7,7 @@ import ( "github.com/sirupsen/logrus" "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/network" "github.com/masa-finance/masa-oracle/pkg/workers/types" ) @@ -40,7 +41,7 @@ func (h *LLMChatHandler) HandleWork(data []byte) data_types.WorkResponse { if err != nil { return data_types.WorkResponse{Error: fmt.Sprintf("unable to marshal LLM chat data: %v", err)} } - resp, err := Post(uri, jsnBytes, nil) + resp, err := network.Post(uri, jsnBytes, nil) if err != nil { return data_types.WorkResponse{Error: fmt.Sprintf("unable to post LLM chat data: %v", err)} } From 10d3512832b00eeafc3f9baeb9c058f6162dd7b3 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Wed, 21 Aug 2024 21:39:22 -0700 Subject: [PATCH 02/10] Update max remote workers --- pkg/workers/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workers/config.go b/pkg/workers/config.go index 7da59bef..7ce75acd 100644 --- a/pkg/workers/config.go +++ b/pkg/workers/config.go @@ -23,7 +23,7 @@ var DefaultConfig = WorkerConfig{ MaxRetries: 1, MaxSpawnAttempts: 1, WorkerBufferSize: 100, - MaxRemoteWorkers: 1, + MaxRemoteWorkers: 10, } var workerConfig *WorkerConfig From 6b4f94cbdc3d4fd477e19c63af023a205d358d12 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Wed, 21 Aug 2024 22:05:46 -0700 Subject: [PATCH 03/10] Improve worker selection resilience - Handle invalid multiaddresses gracefully - Continue searching for eligible workers on errors - Add more detailed logging for debugging - Prevent potential nil pointer dereference - Log warning if no workers found --- pkg/workers/worker_selection.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index 9e1d6dcb..661ddce9 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -11,57 +11,64 @@ import ( masa "github.com/masa-finance/masa-oracle/pkg" "github.com/masa-finance/masa-oracle/pkg/pubsub" - "github.com/masa-finance/masa-oracle/pkg/workers/types" + data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) // GetEligibleWorkers Uses the new NodeTracker method to get the eligible workers for a given message type // I'm leaving this returning an array so that we can easily increase the number of workers in the future func GetEligibleWorkers(node *masa.OracleNode, category pubsub.WorkerCategory, config *WorkerConfig) ([]data_types.Worker, *data_types.Worker) { - var workers []data_types.Worker nodes := node.NodeTracker.GetEligibleWorkerNodes(category) var localWorker *data_types.Worker - // Shuffle the node list first to avoid always selecting the same node rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }) - logrus.Info("checking connections to eligible workers") + logrus.Info("Checking connections to eligible workers") start := time.Now() for _, eligible := range nodes { if eligible.PeerId.String() == node.Host.ID().String() { localWorker = &data_types.Worker{IsLocal: true, NodeData: eligible} continue } + addr, err := multiaddr.NewMultiaddr(eligible.MultiaddrsString) if err != nil { - logrus.Errorf("error creating multiaddress: %s", err.Error()) + logrus.Warnf("Error creating multiaddress for peer %s: %v", eligible.PeerId.String(), err) continue } + peerInfo, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { - logrus.Errorf("Failed to get peer info: %s", err) + logrus.Warnf("Failed to get peer info for %s: %v", eligible.PeerId.String(), err) continue } + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), config.ConnectionTimeout) - defer cancel() // Cancel the context when done to release resources - if err := node.Host.Connect(ctxWithTimeout, *peerInfo); err != nil { - logrus.Debugf("Failed to connect to peer: %v", err) + err = node.Host.Connect(ctxWithTimeout, *peerInfo) + cancel() + if err != nil { + logrus.Warnf("Failed to connect to peer %s: %v", eligible.PeerId.String(), err) continue } + workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible, AddrInfo: peerInfo}) - // print duration of worker selection in seconds with floating point precision dur := time.Since(start).Milliseconds() logrus.Infof("Worker selection took %v milliseconds", dur) break } - // make sure we get the local node in the list + if localWorker == nil { nd := node.NodeTracker.GetNodeData(node.Host.ID().String()) - if nd.CanDoWork(category) { + if nd != nil && nd.CanDoWork(category) { localWorker = &data_types.Worker{IsLocal: true, NodeData: *nd} } } + + if len(workers) == 0 && localWorker == nil { + logrus.Warn("No eligible workers found, including local worker") + } + return workers, localWorker } From f0ce842ef801d48710d06c83b9332e6840e65486 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Wed, 21 Aug 2024 22:54:32 -0700 Subject: [PATCH 04/10] Remove IsActive and timeout checks from CanDoWork method This commit simplifies the worker eligibility criteria in the CanDoWork method of the NodeData struct. The following changes were made: - Removed the check for node active status (IsActive) - Removed the worker timeout check - Retained the check for staked status (IsStaked) The method now only considers if a node is staked and configured for the specific worker type when determining eligibility. This change allows for more inclusive worker participation, as nodes are no longer excluded based on active status or timeout conditions. --- pkg/pubsub/node_data.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index 3a35d80a..815e0293 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -119,12 +119,7 @@ func (wc WorkerCategory) String() string { // CanDoWork checks if the node can perform work of the specified WorkerType. // It returns true if the node is configured for the given worker type, false otherwise. func (n *NodeData) CanDoWork(workerType WorkerCategory) bool { - - if !n.WorkerTimeout.IsZero() && time.Since(n.WorkerTimeout) < 16*time.Minute { - logrus.Infof("[+] Skipping worker %s due to timeout", n.PeerId) - return false - } - if !(n.IsStaked && n.IsActive) { + if !n.IsStaked { return false } switch workerType { From e973942e6f8d1208a99d37d17c26bfcce09405f2 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Wed, 21 Aug 2024 23:45:31 -0700 Subject: [PATCH 05/10] extend context deadline timeout for a connection attempt --- pkg/workers/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workers/config.go b/pkg/workers/config.go index 7ce75acd..ba266114 100644 --- a/pkg/workers/config.go +++ b/pkg/workers/config.go @@ -19,7 +19,7 @@ type WorkerConfig struct { var DefaultConfig = WorkerConfig{ WorkerTimeout: 55 * time.Second, WorkerResponseTimeout: 30 * time.Second, - ConnectionTimeout: 1 * time.Second, + ConnectionTimeout: 10 * time.Second, MaxRetries: 1, MaxSpawnAttempts: 1, WorkerBufferSize: 100, From 039dc8db79deed1407c51772cb1ba245029b974a Mon Sep 17 00:00:00 2001 From: Bob Stevens <35038919+restevens402@users.noreply.github.com> Date: Fri, 23 Aug 2024 04:46:00 -0700 Subject: [PATCH 06/10] Add `MergeMultiaddresses` method and update nodes management Introduce a `MergeMultiaddresses` method to `NodeData` to handle multiaddress management more efficiently. Update the oracle node logic to use this method for merging incoming multiaddresses instead of replacing them, and add a new `NewWorker` function to initialize workers, enhancing logging and error handling for multiaddress processing. --- pkg/oracle_node.go | 6 +----- pkg/pubsub/node_data.go | 13 +++++++++++++ pkg/workers/types/request_response.go | 27 +++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index 8baa70dc..9a4361b2 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -261,12 +261,8 @@ func (node *OracleNode) handleStream(stream network.Stream) { logrus.Warnf("[-] Received data from unexpected peer %s", remotePeer) return } + nodeData.MergeMultiaddresses(stream.Conn().RemoteMultiaddr()) - multiAddr := stream.Conn().RemoteMultiaddr() - nodeData.Multiaddrs = []pubsub2.JSONMultiaddr{{Multiaddr: multiAddr}} - - // newNodeData := pubsub2.NewNodeData(multiAddr, remotePeer, nodeData.EthAddress, pubsub2.ActivityJoined) - // newNodeData.IsStaked = nodeData.IsStaked err = node.NodeTracker.AddOrUpdateNodeData(&nodeData, false) if err != nil { logrus.Error("[-] Error adding or updating node data: ", err) diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index 815e0293..2471bcb0 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -269,3 +269,16 @@ func GetSelfNodeData(host host.Host, isStaked bool, addr multiaddr.Multiaddr) *N return nodeData } + +func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr) { + addrExists := false + for _, existingAddr := range n.Multiaddrs { + if addr.Equal(existingAddr.Multiaddr) { + addrExists = true + break + } + } + if !addrExists { + n.Multiaddrs = append(n.Multiaddrs, JSONMultiaddr{Multiaddr: addr}) + } +} diff --git a/pkg/workers/types/request_response.go b/pkg/workers/types/request_response.go index 359c63cb..902a5577 100644 --- a/pkg/workers/types/request_response.go +++ b/pkg/workers/types/request_response.go @@ -2,6 +2,8 @@ package data_types import ( "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/sirupsen/logrus" masa "github.com/masa-finance/masa-oracle/pkg" "github.com/masa-finance/masa-oracle/pkg/pubsub" @@ -15,6 +17,31 @@ type Worker struct { Node *masa.OracleNode } +func NewWorker(isLocal bool, nd *pubsub.NodeData) *Worker { + var ma multiaddr.Multiaddr + if len(nd.Multiaddrs) > 0 { + ma = nd.Multiaddrs[0].Multiaddr + } else { + var err error + ma, err = multiaddr.NewMultiaddr(nd.MultiaddrsString) + if err != nil { + logrus.Error(err) + return nil + } + } + ip, err := ma.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + logrus.Error(err) + } + return &Worker{ + IsLocal: isLocal, + IPAddr: ip, + AddrInfo: nil, + NodeData: pubsub.NodeData{}, + Node: nil, + } +} + type WorkRequest struct { WorkType WorkerType `json:"workType,omitempty"` RequestId string `json:"requestId,omitempty"` From b49a1e88427ab7e89eb119882cf77c59bbfa1c90 Mon Sep 17 00:00:00 2001 From: Bob Stevens <35038919+restevens402@users.noreply.github.com> Date: Fri, 23 Aug 2024 04:50:44 -0700 Subject: [PATCH 07/10] Switch to DHT for peer address lookup Replaced the multiaddress-based peer information retrieval with a DHT lookup to simplify the process. Removed unnecessary imports and optimized the code for finding and connecting to peers in the Distributed Hash Table (DHT). --- pkg/workers/worker_selection.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index 661ddce9..237616aa 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -5,8 +5,6 @@ import ( "math/rand/v2" "time" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" masa "github.com/masa-finance/masa-oracle/pkg" @@ -33,27 +31,22 @@ func GetEligibleWorkers(node *masa.OracleNode, category pubsub.WorkerCategory, c continue } - addr, err := multiaddr.NewMultiaddr(eligible.MultiaddrsString) + // Use the DHT to find the peer's address information + peerInfo, err := node.DHT.FindPeer(context.Background(), eligible.PeerId) if err != nil { - logrus.Warnf("Error creating multiaddress for peer %s: %v", eligible.PeerId.String(), err) - continue - } - - peerInfo, err := peer.AddrInfoFromP2pAddr(addr) - if err != nil { - logrus.Warnf("Failed to get peer info for %s: %v", eligible.PeerId.String(), err) + logrus.Warnf("Failed to find peer %s in DHT: %v", eligible.PeerId.String(), err) continue } ctxWithTimeout, cancel := context.WithTimeout(context.Background(), config.ConnectionTimeout) - err = node.Host.Connect(ctxWithTimeout, *peerInfo) + err = node.Host.Connect(ctxWithTimeout, peerInfo) cancel() if err != nil { logrus.Warnf("Failed to connect to peer %s: %v", eligible.PeerId.String(), err) continue } - workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible, AddrInfo: peerInfo}) + workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible, AddrInfo: &peerInfo}) dur := time.Since(start).Milliseconds() logrus.Infof("Worker selection took %v milliseconds", dur) break From be75d537b8f1bf978748c75622419a85738016c0 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Fri, 23 Aug 2024 22:42:25 -0700 Subject: [PATCH 08/10] Improve Twitter API rate limit error handling and propagation - Modified ScrapeTweetsByQuery to immediately return rate limit errors - Updated TwitterQueryHandler to properly propagate scraper errors - Adjusted handleWorkResponse to correctly handle and return errors to the client - Ensured rate limit errors are logged and returned with appropriate HTTP status codes - Improved error message clarity for better debugging and user feedback This commit enhances the system's ability to detect, log, and respond to Twitter API rate limit errors, providing clearer feedback to both developers and end-users when such limits are encountered. --- go.mod | 2 -- go.sum | 4 ---- pkg/api/handlers_data.go | 29 +++++++++++++---------------- pkg/scrapers/twitter/tweets.go | 22 ++++++++++------------ pkg/workers/handlers/twitter.go | 14 +++++++++++--- pkg/workers/worker_manager.go | 31 ++++++++++++++++++++++++------- 6 files changed, 58 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index b7f31610..b7e3cbcf 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.22.0 require ( github.com/cenkalti/backoff/v4 v4.3.0 - github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 github.com/dgraph-io/badger v1.6.2 github.com/ethereum/go-ethereum v1.14.8 @@ -128,7 +127,6 @@ require ( github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect github.com/jbenet/goprocess v0.1.4 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kennygrant/sanitize v1.2.4 // indirect github.com/klauspost/compress v1.17.9 // indirect diff --git a/go.sum b/go.sum index 0355289e..115d6b32 100644 --- a/go.sum +++ b/go.sum @@ -70,8 +70,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U= -github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c h1:++BhWlmSX+n8m3O4gPfy3S4PTZ0TMzH6nelerBLPUng= -github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c/go.mod h1:C7ma6h458jTWT65mXC58L1Q6hnEtr0unur8cMc0UEXM= github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= @@ -372,8 +370,6 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= -github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= -github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index c7c3950e..b7ab278a 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -26,7 +26,7 @@ import ( "github.com/masa-finance/masa-oracle/pkg/scrapers/discord" "github.com/masa-finance/masa-oracle/pkg/scrapers/telegram" "github.com/masa-finance/masa-oracle/pkg/workers" - "github.com/masa-finance/masa-oracle/pkg/workers/types" + data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) type LLMChat struct { @@ -122,25 +122,22 @@ func handleWorkResponse(c *gin.Context, responseCh chan data_types.WorkResponse, select { case response := <-responseCh: if response.Error != "" { - c.JSON(http.StatusExpectationFailed, response) + logrus.Errorf("[+] Work error: %s", response.Error) + if strings.Contains(response.Error, "Rate limit exceeded") { + c.JSON(http.StatusTooManyRequests, gin.H{"error": "Twitter API rate limit exceeded"}) + } else { + c.JSON(http.StatusInternalServerError, gin.H{"error": response.Error}) + } wg.Done() return } - if data, ok := response.Data.(string); ok && IsBase64(data) { - decodedData, err := base64.StdEncoding.DecodeString(response.Data.(string)) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to decode base64 data"}) - return - } - var jsonData map[string]interface{} - err = json.Unmarshal(decodedData, &jsonData) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to parse JSON data"}) - return - } - response.Data = jsonData + + if response.Data == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "No data returned"}) + wg.Done() + return } - response.WorkRequest = nil + c.JSON(http.StatusOK, response) wg.Done() return diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index c45d26bd..21c0b717 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + "strings" _ "github.com/lib/pq" @@ -128,6 +129,7 @@ func ScrapeTweetsForSentiment(query string, count int, model string) (string, st func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { scraper := auth() var tweets []*TweetResult + var lastError error if scraper == nil { return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") @@ -138,23 +140,19 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { // Perform the search with the specified query and count for tweetResult := range scraper.SearchTweets(context.Background(), query, count) { - var tweet TweetResult if tweetResult.Error != nil { - tweet = TweetResult{ - Tweet: nil, - Error: tweetResult.Error, - } - } else { - tweet = TweetResult{ - Tweet: &tweetResult.Tweet, - Error: nil, + lastError = tweetResult.Error + logrus.Warnf("[+] Error encountered while scraping tweet: %v", tweetResult.Error) + if strings.Contains(tweetResult.Error.Error(), "Rate limit exceeded") { + return nil, fmt.Errorf("Twitter API rate limit exceeded (429 error)") } + continue } - tweets = append(tweets, &tweet) + tweets = append(tweets, &TweetResult{Tweet: &tweetResult.Tweet, Error: nil}) } - if len(tweets) == 0 { - return nil, fmt.Errorf("no tweets found for the given query") + if len(tweets) == 0 && lastError != nil { + return nil, lastError } return tweets, nil diff --git a/pkg/workers/handlers/twitter.go b/pkg/workers/handlers/twitter.go index 6db38d2d..e16ce3d9 100644 --- a/pkg/workers/handlers/twitter.go +++ b/pkg/workers/handlers/twitter.go @@ -6,7 +6,7 @@ import ( "github.com/sirupsen/logrus" "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" - "github.com/masa-finance/masa-oracle/pkg/workers/types" + data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) type TwitterQueryHandler struct{} @@ -16,17 +16,25 @@ type TwitterSentimentHandler struct{} type TwitterTrendsHandler struct{} func (h *TwitterQueryHandler) HandleWork(data []byte) data_types.WorkResponse { - logrus.Infof("[+] TwitterQueryHandler %s", data) + logrus.Infof("[+] TwitterQueryHandler input: %s", data) dataMap, err := JsonBytesToMap(data) if err != nil { + logrus.Errorf("[+] TwitterQueryHandler error parsing data: %v", err) return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter query data: %v", err)} } count := int(dataMap["count"].(float64)) query := dataMap["query"].(string) + + logrus.Infof("[+] Scraping tweets for query: %s, count: %d", query, count) + resp, err := twitter.ScrapeTweetsByQuery(query, count) if err != nil { - return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter query: %v", err)} + logrus.Errorf("[+] TwitterQueryHandler error scraping tweets: %v", err) + return data_types.WorkResponse{Error: err.Error()} } + + logrus.Infof("[+] TwitterQueryHandler response: %d tweets found", len(resp)) + return data_types.WorkResponse{Data: resp} } diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index d4966cc6..c8c53a30 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -15,6 +15,7 @@ import ( masa "github.com/masa-finance/masa-oracle/pkg" "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) @@ -214,13 +215,29 @@ func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) ( go func() { startTime := time.Now() workResponse := handler.HandleWork(workRequest.Data) - if workResponse.Error == "" { - duration := time.Since(startTime) - whm.mu.Lock() - handlerInfo := whm.handlers[workRequest.WorkType] - handlerInfo.CallCount++ - handlerInfo.TotalRuntime += duration - whm.mu.Unlock() + duration := time.Since(startTime) + whm.mu.Lock() + handlerInfo := whm.handlers[workRequest.WorkType] + handlerInfo.CallCount++ + handlerInfo.TotalRuntime += duration + whm.mu.Unlock() + + if workResponse.Error != "" { + logrus.Errorf("[+] Work error for %s: %s", workRequest.WorkType, workResponse.Error) + } else if workResponse.Data == nil { + logrus.Warnf("[+] Work response for %s: No data returned", workRequest.WorkType) + } else { + switch data := workResponse.Data.(type) { + case []*twitter.TweetResult: + logrus.Infof("[+] Work response for %s: %d tweets returned", workRequest.WorkType, len(data)) + if len(data) > 0 && data[0].Tweet != nil { + tweet := data[0].Tweet + logrus.Infof("[+] First tweet: ID: %s, Text: %s, Author: %s, CreatedAt: %s", + tweet.ID, tweet.Text, tweet.Username, tweet.TimeParsed) + } + default: + logrus.Infof("[+] Work response for %s: Data successfully returned (type: %T)", workRequest.WorkType, workResponse.Data) + } } responseChan <- workResponse }() From c463858d2ef90f2ca8852e0bc28f124a511f8017 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Fri, 23 Aug 2024 22:44:15 -0700 Subject: [PATCH 09/10] chore: remove unused publishWorkRequest function - Deleted the publishWorkRequest function from pkg/api/handlers_data.go - This function was not being used in the current codebase - Removing it simplifies the code and reduces maintenance overhead --- pkg/api/handlers_data.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index b7ab278a..78bd11c8 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -78,30 +78,6 @@ func SendWorkRequest(api *API, requestID string, workType data_types.WorkerType, return nil } -// SendWorkRequest sends a work request to the PubSubManager for processing by a worker. -// It marshals the request details into JSON and publishes it to the configured topic. -// -// Parameters: -// - api: The API instance containing the Node and PubSubManager. -// - requestID: A unique identifier for the request. -// - request: The type of work to be performed by the worker. -// - bodyBytes: The request body in byte slice format. -// -// Returns: -// - error: An error object if the request could not be published, otherwise nil. -func publishWorkRequest(api *API, requestID string, request data_types.WorkerType, bodyBytes []byte) error { - workRequest := map[string]string{ - "request": string(request), - "request_id": requestID, - "body": string(bodyBytes), - } - jsn, err := json.Marshal(workRequest) - if err != nil { - return err - } - return api.Node.PubSubManager.Publish(config.TopicWithVersion(config.WorkerTopic), jsn) -} - // handleWorkResponse processes the response from a worker and sends it back to the client. // It listens on the provided response channel for a response or a timeout signal. // If a response is received within the timeout period, it unmarshals the JSON response and sends it back to the client. From 74236f72d6d53349453da6fc4b4c6d40a1f03a16 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Fri, 23 Aug 2024 23:11:35 -0700 Subject: [PATCH 10/10] refactor: handleWorkResponse with functional programming concepts - Decompose handleWorkResponse into smaller, focused functions - Introduce higher-order function for error response handling - Separate concerns for improved modularity and testability - Reduce mutable state and side effects where possible - Maintain idiomatic Go while incorporating functional principles - Improve error handling granularity and response structure --- pkg/api/handlers_data.go | 86 ++++++++++++++++++++++++++-------------- 1 file changed, 56 insertions(+), 30 deletions(-) diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index 78bd11c8..49bbe225 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -86,44 +86,70 @@ func SendWorkRequest(api *API, requestID string, workType data_types.WorkerType, // Parameters: // - c: The gin.Context object, which provides the context for the HTTP request. // - responseCh: A channel that receives the worker's response as a byte slice. -func handleWorkResponse(c *gin.Context, responseCh chan data_types.WorkResponse, wg *sync.WaitGroup) { +func handleWorkResponse(c *gin.Context, responseCh <-chan data_types.WorkResponse, wg *sync.WaitGroup) { cfg, err := LoadConfig() if err != nil { - logrus.Errorf("Failed to load API cfg: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) + handleError(c, "Failed to load API cfg", err) return } - for { - select { - case response := <-responseCh: - if response.Error != "" { - logrus.Errorf("[+] Work error: %s", response.Error) - if strings.Contains(response.Error, "Rate limit exceeded") { - c.JSON(http.StatusTooManyRequests, gin.H{"error": "Twitter API rate limit exceeded"}) - } else { - c.JSON(http.StatusInternalServerError, gin.H{"error": response.Error}) - } - wg.Done() - return - } + select { + case response := <-responseCh: + handleResponse(c, response, wg) + case <-time.After(cfg.WorkerResponseTimeout): + handleTimeout(c) + case <-c.Done(): + // Context cancelled, no action needed + } +} - if response.Data == nil { - c.JSON(http.StatusNotFound, gin.H{"error": "No data returned"}) - wg.Done() - return - } +func handleResponse(c *gin.Context, response data_types.WorkResponse, wg *sync.WaitGroup) { + defer wg.Done() - c.JSON(http.StatusOK, response) - wg.Done() - return - case <-time.After(cfg.WorkerResponseTimeout): - c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out in API layer"}) - return - case <-c.Done(): - return - } + if response.Error != "" { + handleErrorResponse(c, response) + return + } + + if response.Data == nil { + c.JSON(http.StatusNotFound, gin.H{ + "error": "No data returned", + "workerPeerId": response.WorkerPeerId, + }) + return + } + + c.JSON(http.StatusOK, response) +} + +func handleErrorResponse(c *gin.Context, response data_types.WorkResponse) { + logrus.Errorf("[+] Work error: %s", response.Error) + + errorResponse := func(status int, message string) { + c.JSON(status, gin.H{ + "error": message, + "details": response.Error, + "workerPeerId": response.WorkerPeerId, + }) } + + switch { + case strings.Contains(response.Error, "Twitter API rate limit exceeded (429 error)"): + errorResponse(http.StatusTooManyRequests, "Twitter API rate limit exceeded") + case strings.Contains(response.Error, "no workers could process"): + errorResponse(http.StatusServiceUnavailable, "No available workers to process the request") + default: + errorResponse(http.StatusInternalServerError, "An error occurred while processing the request") + } +} + +func handleError(c *gin.Context, message string, err error) { + logrus.Errorf("%s: %v", message, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"}) +} + +func handleTimeout(c *gin.Context) { + c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out in API layer"}) } // GetLLMModelsHandler returns a gin.HandlerFunc that retrieves the available LLM models.