diff --git a/pkg/workers/send_work.go b/pkg/workers/send_work.go new file mode 100644 index 00000000..1bdf0002 --- /dev/null +++ b/pkg/workers/send_work.go @@ -0,0 +1,170 @@ +package workers + +import ( + "fmt" + "sync" + "time" + + masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/pkg/pubsub" + "github.com/masa-finance/masa-oracle/pkg/workers/messages" + "github.com/multiformats/go-multiaddr" + + "github.com/asynkron/protoactor-go/actor" + + pubsub2 "github.com/libp2p/go-libp2p-pubsub" + "github.com/sirupsen/logrus" +) + +const ( + workerTimeout = 30 * time.Second + queueTimeout = 8 * time.Second +) + +func SendWork(node *masa.OracleNode, m *pubsub2.Message) { + logrus.Infof("Sending work to node %s", node.Host.ID()) + var wg sync.WaitGroup + props := actor.PropsFromProducer(NewWorker(node)) + pid := node.ActorEngine.Spawn(props) + message := createWorkMessage(m, pid) + + responseCollector := make(chan *pubsub2.Message, 100) + timeout := time.After(queueTimeout) + + if node.IsStaked && node.IsWorker() { + wg.Add(1) + go handleLocalWorker(node, pid, message, &wg, responseCollector) + } + + handleRemoteWorkers(node, message, props, &wg, responseCollector) + + go queueResponses(responseCollector, timeout) + + wg.Wait() +} + +func createWorkMessage(m *pubsub2.Message, pid *actor.PID) *messages.Work { + return &messages.Work{ + Data: string(m.Data), + Sender: pid, + Id: m.ReceivedFrom.String(), + Type: int64(pubsub.CategoryTwitter), + } +} + +func handleLocalWorker(node *masa.OracleNode, pid *actor.PID, message *messages.Work, wg *sync.WaitGroup, responseCollector chan<- *pubsub2.Message) { + defer wg.Done() + logrus.Info("Sending work to local worker") + future := node.ActorEngine.RequestFuture(pid, message, workerTimeout) + result, err := future.Result() + if err != nil { + handleWorkerError(err, responseCollector) + return + } + processWorkerResponse(result, node.Host.ID(), responseCollector) +} + +func handleRemoteWorkers(node *masa.OracleNode, message *messages.Work, props *actor.Props, wg *sync.WaitGroup, responseCollector chan<- *pubsub2.Message) { + logrus.Info("Sending work to remote workers") + peers := node.NodeTracker.GetAllNodeData() + for _, p := range peers { + for _, addr := range p.Multiaddrs { + ipAddr, _ := addr.ValueForProtocol(multiaddr.P_IP4) + if isEligibleRemoteWorker(p, node, message) { + wg.Add(1) + go handleRemoteWorker(node, p, ipAddr, props, message, wg, responseCollector) + } + } + } +} + +func isEligibleRemoteWorker(p pubsub.NodeData, node *masa.OracleNode, message *messages.Work) bool { + return (p.PeerId.String() != node.Host.ID().String()) && + p.IsStaked && + node.NodeTracker.GetNodeData(p.PeerId.String()).CanDoWork(pubsub.WorkerCategory(message.Type)) +} + +func handleRemoteWorker(node *masa.OracleNode, p pubsub.NodeData, ipAddr string, props *actor.Props, message *messages.Work, wg *sync.WaitGroup, responseCollector chan<- *pubsub2.Message) { + defer wg.Done() + logrus.WithFields(logrus.Fields{ + "ip": ipAddr, + "peer": p.PeerId, + }).Info("Handling remote worker") + + spawned, err := node.ActorRemote.SpawnNamed(fmt.Sprintf("%s:4001", ipAddr), "worker", "peer", -1) + if err != nil { + logrus.WithError(err).WithField("ip", ipAddr).Error("Failed to spawn remote worker") + handleWorkerError(err, responseCollector) + return + } + + spawnedPID := spawned.Pid + if spawnedPID == nil { + err := fmt.Errorf("failed to spawn remote worker: PID is nil for IP %s", ipAddr) + logrus.WithFields(logrus.Fields{ + "ip": ipAddr, + "peer": p.PeerId, + }).Error(err) + handleWorkerError(err, responseCollector) + return + } + + client := node.ActorEngine.Spawn(props) + node.ActorEngine.Send(spawnedPID, &messages.Connect{Sender: client}) + + future := node.ActorEngine.RequestFuture(spawnedPID, message, workerTimeout) + result, err := future.Result() + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "ip": ipAddr, + "peer": p.PeerId, + }).Error("Error getting result from remote worker") + handleWorkerError(err, responseCollector) + return + } + + logrus.WithFields(logrus.Fields{ + "ip": ipAddr, + "peer": p.PeerId, + }).Info("Successfully processed remote worker response") + processWorkerResponse(result, p.PeerId, responseCollector) +} + +func handleWorkerError(err error, responseCollector chan<- *pubsub2.Message) { + logrus.Errorf("[-] Error with worker: %v", err) + responseCollector <- &pubsub2.Message{ + ValidatorData: map[string]interface{}{"error": err.Error()}, + } +} + +func processWorkerResponse(result interface{}, workerID interface{}, responseCollector chan<- *pubsub2.Message) { + response, ok := result.(*messages.Response) + if !ok { + logrus.Errorf("[-] Invalid response type from worker") + return + } + msg, err := getResponseMessage(response) + if err != nil { + logrus.Errorf("[-] Error converting worker response: %v", err) + return + } + logrus.Infof("Received response from worker %v, sending to responseCollector", workerID) + responseCollector <- msg +} + +func queueResponses(responseCollector <-chan *pubsub2.Message, timeout <-chan time.Time) { + var responses []*pubsub2.Message + for { + select { + case response := <-responseCollector: + responses = append(responses, response) + logrus.Infof("Adding response from %s to responses list. Total responses: %d", response.ReceivedFrom, len(responses)) + case <-timeout: + logrus.Infof("Timeout reached, sending all responses to workerDoneCh. Total responses: %d", len(responses)) + for _, resp := range responses { + workerDoneCh <- resp + } + return + } + } +} diff --git a/pkg/workers/types.go b/pkg/workers/types.go new file mode 100644 index 00000000..47248d01 --- /dev/null +++ b/pkg/workers/types.go @@ -0,0 +1,66 @@ +package workers + +import ( + "github.com/asynkron/protoactor-go/actor" + pubsub2 "github.com/libp2p/go-libp2p-pubsub" + masa "github.com/masa-finance/masa-oracle/pkg" +) + +type WorkerType string + +const ( + Discord WorkerType = "discord" + DiscordProfile WorkerType = "discord-profile" + DiscordChannelMessages WorkerType = "discord-channel-messages" + DiscordSentiment WorkerType = "discord-sentiment" + TelegramSentiment WorkerType = "telegram-sentiment" + TelegramChannelMessages WorkerType = "telegram-channel-messages" + DiscordGuildChannels WorkerType = "discord-guild-channels" + DiscordUserGuilds WorkerType = "discord-user-guilds" + LLMChat WorkerType = "llm-chat" + Twitter WorkerType = "twitter" + TwitterFollowers WorkerType = "twitter-followers" + TwitterProfile WorkerType = "twitter-profile" + TwitterSentiment WorkerType = "twitter-sentiment" + TwitterTrends WorkerType = "twitter-trends" + Web WorkerType = "web" + WebSentiment WorkerType = "web-sentiment" + Test WorkerType = "test" +) + +var WORKER = struct { + Discord, DiscordProfile, DiscordChannelMessages, DiscordSentiment, TelegramSentiment, TelegramChannelMessages, DiscordGuildChannels, DiscordUserGuilds, LLMChat, Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends, Web, WebSentiment, Test WorkerType +}{ + Discord: Discord, + DiscordProfile: DiscordProfile, + DiscordChannelMessages: DiscordChannelMessages, + DiscordSentiment: DiscordSentiment, + TelegramSentiment: TelegramSentiment, + TelegramChannelMessages: TelegramChannelMessages, + DiscordGuildChannels: DiscordGuildChannels, + DiscordUserGuilds: DiscordUserGuilds, + LLMChat: LLMChat, + Twitter: Twitter, + TwitterFollowers: TwitterFollowers, + TwitterProfile: TwitterProfile, + TwitterSentiment: TwitterSentiment, + TwitterTrends: TwitterTrends, + Web: Web, + WebSentiment: WebSentiment, + Test: Test, +} + +var ( + clients = actor.NewPIDSet() + workerStatusCh = make(chan *pubsub2.Message) + workerDoneCh = make(chan *pubsub2.Message) +) + +type ChanResponse struct { + Response map[string]interface{} + ChannelId string +} + +type Worker struct { + Node *masa.OracleNode +} diff --git a/pkg/workers/workers.go b/pkg/workers/workers.go index 28da302b..1e0b674e 100644 --- a/pkg/workers/workers.go +++ b/pkg/workers/workers.go @@ -3,8 +3,6 @@ package workers import ( "context" "encoding/json" - "fmt" - "sync" "time" "github.com/libp2p/go-libp2p/core/peer" @@ -15,8 +13,6 @@ import ( "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/workers/messages" - "github.com/multiformats/go-multiaddr" - "github.com/asynkron/protoactor-go/actor" "github.com/ipfs/go-cid" @@ -26,87 +22,35 @@ import ( "github.com/sirupsen/logrus" ) -type WorkerType string - -const ( - Discord WorkerType = "discord" - DiscordProfile WorkerType = "discord-profile" - DiscordChannelMessages WorkerType = "discord-channel-messages" - DiscordSentiment WorkerType = "discord-sentiment" - TelegramSentiment WorkerType = "telegram-sentiment" - TelegramChannelMessages WorkerType = "telegram-channel-messages" - DiscordGuildChannels WorkerType = "discord-guild-channels" - DiscordUserGuilds WorkerType = "discord-user-guilds" - LLMChat WorkerType = "llm-chat" - Twitter WorkerType = "twitter" - TwitterFollowers WorkerType = "twitter-followers" - TwitterProfile WorkerType = "twitter-profile" - TwitterSentiment WorkerType = "twitter-sentiment" - TwitterTrends WorkerType = "twitter-trends" - Web WorkerType = "web" - WebSentiment WorkerType = "web-sentiment" - Test WorkerType = "test" -) - -var WORKER = struct { - Discord, DiscordProfile, DiscordChannelMessages, DiscordSentiment, TelegramSentiment, TelegramChannelMessages, DiscordGuildChannels, DiscordUserGuilds, LLMChat, Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends, Web, WebSentiment, Test WorkerType -}{ - Discord: Discord, - DiscordProfile: DiscordProfile, - DiscordChannelMessages: DiscordChannelMessages, - DiscordSentiment: DiscordSentiment, - TelegramSentiment: TelegramSentiment, - TelegramChannelMessages: TelegramChannelMessages, - DiscordGuildChannels: DiscordGuildChannels, - DiscordUserGuilds: DiscordUserGuilds, - LLMChat: LLMChat, - Twitter: Twitter, - TwitterFollowers: TwitterFollowers, - TwitterProfile: TwitterProfile, - TwitterSentiment: TwitterSentiment, - TwitterTrends: TwitterTrends, - Web: Web, - WebSentiment: WebSentiment, - Test: Test, -} - -var ( - clients = actor.NewPIDSet() - workerStatusCh = make(chan *pubsub2.Message) - workerDoneCh = make(chan *pubsub2.Message) -) - // WorkerTypeToCategory maps WorkerType to WorkerCategory func WorkerTypeToCategory(wt WorkerType) pubsub.WorkerCategory { + logrus.Infof("Mapping WorkerType %s to WorkerCategory", wt) switch wt { case Discord, DiscordProfile, DiscordChannelMessages, DiscordSentiment, DiscordGuildChannels, DiscordUserGuilds: + logrus.Info("WorkerType is related to Discord") return pubsub.CategoryDiscord case TelegramSentiment, TelegramChannelMessages: + logrus.Info("WorkerType is related to Telegram") return pubsub.CategoryTelegram case Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends: + logrus.Info("WorkerType is related to Twitter") return pubsub.CategoryTwitter case Web, WebSentiment: + logrus.Info("WorkerType is related to Web") return pubsub.CategoryWeb default: + logrus.Warn("WorkerType is invalid or not recognized") return -1 // Invalid category } } -type ChanResponse struct { - Response map[string]interface{} - ChannelId string -} - -type Worker struct { - Node *masa.OracleNode -} - // NewWorker creates a new instance of the Worker actor. // It implements the actor.Receiver interface, allowing it to receive and handle messages. // // Returns: // - An instance of the Worker struct that implements the actor.Receiver interface. func NewWorker(node *masa.OracleNode) actor.Producer { + logrus.Info("Creating a new Worker actor") return func() actor.Actor { return &Worker{Node: node} } @@ -115,39 +59,48 @@ func NewWorker(node *masa.OracleNode) actor.Producer { // Receive is the message handling method for the Worker actor. // It receives messages through the actor context and processes them based on their type. func (a *Worker) Receive(ctx actor.Context) { + logrus.Infof("Worker received a message of type %T from %s", ctx.Message(), ctx.Sender()) switch m := ctx.Message().(type) { case *messages.Connect: + logrus.Info("Handling Connect message") a.HandleConnect(ctx, m) case *actor.Started: + logrus.Info("Actor has started") if a.Node.IsWorker() { a.HandleLog(ctx, "[+] Actor started") } case *actor.Stopping: + logrus.Info("Actor is stopping") if a.Node.IsWorker() { a.HandleLog(ctx, "[+] Actor stopping") } case *actor.Stopped: + logrus.Info("Actor has stopped") if a.Node.IsWorker() { a.HandleLog(ctx, "[+] Actor stopped") } case *messages.Work: + logrus.Info("Handling Work message") if a.Node.IsWorker() { a.HandleWork(ctx, m, a.Node) } case *messages.Response: + logrus.Info("Handling Response message") msg := &pubsub2.Message{} err := json.Unmarshal([]byte(m.Value), msg) if err != nil { + logrus.Info("Unmarshalling Response message failed, attempting to get response message directly") msg, err = getResponseMessage(m) if err != nil { logrus.Errorf("[-] Error getting response message: %v", err) return } } + logrus.Infof("Successfully handled Response message from %s, sending to workerDoneCh", msg.ReceivedFrom) workerDoneCh <- msg ctx.Poison(ctx.Self()) default: - logrus.Warningf("[+] Received unknown message: %T", m) + logrus.Warningf("[+] Received unknown message: %T from %s", m, ctx.Sender()) } } @@ -164,13 +117,16 @@ func (a *Worker) Receive(ctx actor.Context) { // It then creates a CID (version 1) from the multihash and returns the CID as a string. // If an error occurs during the multihash computation or CID creation, it is returned. func computeCid(str string) (string, error) { + logrus.Infof("Computing CID for string: %s", str) // Create a multihash from the string mhHash, err := mh.Sum([]byte(str), mh.SHA2_256, -1) if err != nil { + logrus.Errorf("Error computing multihash for string: %s, error: %v", str, err) return "", err } // Create a CID from the multihash cidKey := cid.NewCidV1(cid.Raw, mhHash).String() + logrus.Infof("Computed CID: %s", cidKey) return cidKey, nil } @@ -185,10 +141,12 @@ func computeCid(str string) (string, error) { // - A pointer to a pubsub2.Message object constructed from the response data. // - An error if there is an issue with unmarshalling the response data. func getResponseMessage(response *messages.Response) (*pubsub2.Message, error) { + logrus.Info("Converting Response message to pubsub2.Message") responseData := map[string]interface{}{} err := json.Unmarshal([]byte(response.Value), &responseData) if err != nil { + logrus.Errorf("Error unmarshalling Response message: %v", err) return nil, err } msg := &pubsub2.Message{ @@ -197,142 +155,10 @@ func getResponseMessage(response *messages.Response) (*pubsub2.Message, error) { ValidatorData: responseData["ValidatorData"], Local: responseData["Local"].(bool), } + logrus.Infof("Successfully converted Response message to pubsub2.Message from %s", msg.ReceivedFrom) return msg, nil } -// SendWork is a function that sends work to a node. It takes two parameters: -// node: A pointer to a masa.OracleNode object. This is the node to which the work will be sent. -// m: A pointer to a pubsub2.Message object. This is the message that contains the work to be sent. -func SendWork(node *masa.OracleNode, m *pubsub2.Message) { - var wg sync.WaitGroup - props := actor.PropsFromProducer(NewWorker(node)) - pid := node.ActorEngine.Spawn(props) - message := &messages.Work{Data: string(m.Data), Sender: pid, Id: m.ReceivedFrom.String(), Type: int64(pubsub.CategoryTwitter)} - n := 0 - - responseCollector := make(chan *pubsub2.Message, 100) // Buffered channel to collect responses - timeout := time.After(8 * time.Second) - - // Local worker - if node.IsStaked && node.IsWorker() { - wg.Add(1) - go func() { - defer wg.Done() - future := node.ActorEngine.RequestFuture(pid, message, 30*time.Second) - result, err := future.Result() - if err != nil { - logrus.Errorf("[-] Error receiving response from local worker: %v", err) - responseCollector <- &pubsub2.Message{ - ValidatorData: map[string]interface{}{"error": err.Error()}, - } - return - } - response := result.(*messages.Response) - msg := &pubsub2.Message{} - rErr := json.Unmarshal([]byte(response.Value), msg) - if rErr != nil { - gMsg, gErr := getResponseMessage(result.(*messages.Response)) - if gErr != nil { - logrus.Errorf("[-] Error getting response message: %v", gErr) - responseCollector <- &pubsub2.Message{ - ValidatorData: map[string]interface{}{"error": gErr.Error()}, - } - return - } - msg = gMsg - } - responseCollector <- msg - n++ - }() - } - - // Remote workers - peers := node.NodeTracker.GetAllNodeData() - for _, p := range peers { - for _, addr := range p.Multiaddrs { - ipAddr, _ := addr.ValueForProtocol(multiaddr.P_IP4) - if (p.PeerId.String() != node.Host.ID().String()) && - p.IsStaked && - node.NodeTracker.GetNodeData(p.PeerId.String()).CanDoWork(pubsub.WorkerCategory(message.Type)) { - logrus.Infof("[+] Worker Address: %s", ipAddr) - wg.Add(1) - go func(p pubsub.NodeData) { - defer wg.Done() - spawned, err := node.ActorRemote.SpawnNamed(fmt.Sprintf("%s:4001", ipAddr), "worker", "peer", -1) - if err != nil { - logrus.Debugf("[-] Error spawning remote worker: %v", err) - responseCollector <- &pubsub2.Message{ - ValidatorData: map[string]interface{}{"error": err.Error()}, - } - return - } - spawnedPID := spawned.Pid - logrus.Infof("[+] Worker Address: %s", spawnedPID) - if spawnedPID == nil { - logrus.Errorf("[-] Spawned PID is nil for IP: %s", ipAddr) - responseCollector <- &pubsub2.Message{ - ValidatorData: map[string]interface{}{"error": "Spawned PID is nil"}, - } - return - } - client := node.ActorEngine.Spawn(props) - node.ActorEngine.Send(spawnedPID, &messages.Connect{Sender: client}) - future := node.ActorEngine.RequestFuture(spawnedPID, message, 30*time.Second) - result, fErr := future.Result() - if fErr != nil { - logrus.Debugf("[-] Error receiving response from remote worker: %v", fErr) - responseCollector <- &pubsub2.Message{ - ValidatorData: map[string]interface{}{"error": fErr.Error()}, - } - return - } - response := result.(*messages.Response) - msg := &pubsub2.Message{} - rErr := json.Unmarshal([]byte(response.Value), &msg) - if rErr != nil { - gMsg, gErr := getResponseMessage(response) - if gErr != nil { - logrus.Errorf("[-] Error getting response message: %v", gErr) - responseCollector <- &pubsub2.Message{ - ValidatorData: map[string]interface{}{"error": gErr.Error()}, - } - return - } - if gMsg != nil { - msg = gMsg - } - } - responseCollector <- msg - n++ - // cap at 3 for performance - if n == len(peers) || n == 3 { - logrus.Info("[+] All workers have responded") - responseCollector <- msg - } - }(p) - } - } - } - - // Queue responses and send to workerDoneCh - go func() { - var responses []*pubsub2.Message - for { - select { - case response := <-responseCollector: - responses = append(responses, response) - case <-timeout: - for _, resp := range responses { - workerDoneCh <- resp - } - return - } - } - }() - - wg.Wait() -} - // MonitorWorkers monitors worker data by subscribing to the completed work topic, // computing a CID for each received data, and writing the data to the database. // @@ -346,6 +172,7 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { // marshals the data to JSON, and writes it to the database using the WriteData function. // The monitoring continues until the context is done. func MonitorWorkers(ctx context.Context, node *masa.OracleNode) { + logrus.Info("Starting MonitorWorkers to monitor worker data") node.WorkerTracker = &pubsub.WorkerEventTracker{WorkerStatusCh: workerStatusCh} err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.WorkerTopic), node.WorkerTracker, true) if err != nil { @@ -353,6 +180,7 @@ func MonitorWorkers(ctx context.Context, node *masa.OracleNode) { } // Register self as a remote node for the network + logrus.Info("Registering self as a remote node for the network") node.ActorRemote.Register("peer", actor.PropsFromProducer(NewWorker(node))) if node.WorkerTracker == nil { @@ -384,31 +212,38 @@ func MonitorWorkers(ctx context.Context, node *masa.OracleNode) { startTime = time.Now() go SendWork(node, work) case data := <-workerDoneCh: + logrus.Infof("Processing data from workerDoneCh, received from %s", data.ReceivedFrom) validatorDataMap, ok := data.ValidatorData.(map[string]interface{}) if !ok { logrus.Errorf("[-] Error asserting type: %v", ok) continue } - if ch, ok := rcm.Get(validatorDataMap["ChannelId"].(string)); ok { - validatorData, err := json.Marshal(validatorDataMap["Response"]) - if err != nil { - logrus.Errorf("[-] Error marshalling data.ValidatorData: %v", err) + if validatorDataMap["ChannelId"] != nil { + if ch, ok := rcm.Get(validatorDataMap["ChannelId"].(string)); ok { + validatorData, err := json.Marshal(validatorDataMap["Response"]) + if err != nil { + logrus.Errorf("[-] Error marshalling data.ValidatorData: %v", err) + continue + } + ch <- validatorData + defer close(ch) + } else { + logrus.Debugf("Channel not found for ChannelId: %v", validatorDataMap["ChannelId"]) continue } - ch <- validatorData - defer close(ch) } else { - logrus.Debugf("Error processing data.ValidatorData: %v", data.ValidatorData) + logrus.Debugf("ChannelId is nil in validatorDataMap: %v", validatorDataMap) continue } processValidatorData(data, validatorDataMap, &startTime, node) case <-ticker.C: - logrus.Info("[+] worker tick") + logrus.Info("[+] Worker tick") case <-ctx.Done(): + logrus.Info("Context done, stopping MonitorWorkers") return } } @@ -423,7 +258,8 @@ func MonitorWorkers(ctx context.Context, node *masa.OracleNode) { * @param {masa.OracleNode} node - The OracleNode instance. */ func processValidatorData(data *pubsub2.Message, validatorDataMap map[string]interface{}, startTime *time.Time, node *masa.OracleNode) { - logrus.Infof("[+] Work validatorDataMap %s", validatorDataMap) + logrus.Infof("[+] Processing validator data from %s: %s", data.ReceivedFrom, validatorDataMap) + logrus.Infof("[+] Processing validator data: %s", validatorDataMap) if response, ok := validatorDataMap["Response"].(map[string]interface{}); ok { if _, ok := response["error"].(string); ok { logrus.Infof("[+] Work failed %s", response["error"])