Skip to content

Commit

Permalink
Merge pull request #82 from warp-contracts/twl/sommelier-writer-chunks
Browse files Browse the repository at this point in the history
Remove items with points 0, Fetch discords ids and roles in batches
  • Loading branch information
ppedziwiatr authored Apr 29, 2024
2 parents 0280393 + ec075d9 commit 0fb97f6
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 68 deletions.
8 changes: 6 additions & 2 deletions src/utils/model/warpy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type WarpySyncerAssets struct {
Chain string `json:"chain"`
}

type SenderDiscordIdPayload struct {
Key string `json:"key"`
type WalletDiscordIdPayload struct {
WalletToDiscordId map[string]string `json:"wallet_to_id"`
}

type DiscordIdRolesPayload struct {
IdToRoles map[string][]string `json:"id_to_roles"`
}
41 changes: 24 additions & 17 deletions src/utils/warpy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ import (
"context"
"encoding/json"
"errors"

"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"
"github.com/warp-contracts/syncer/src/utils/bundlr"
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/sequencer"
sequencer_types "github.com/warp-contracts/syncer/src/utils/sequencer/types"
)

func GetSenderRoles(httpClient *resty.Client, url string, senderDiscordId string, log *logrus.Entry) (roles *[]string, err error) {
func GetSendersRoles(httpClient *resty.Client, url string, senderDiscordIds *[]string, log *logrus.Entry) (roles *model.DiscordIdRolesPayload, err error) {
resp, err := httpClient.SetBaseURL(url).R().
SetResult([]string{}).
SetResult(model.DiscordIdRolesPayload{}).
ForceContentType("application/json").
SetQueryParams(map[string]string{
"id": senderDiscordId,
SetBody(map[string]interface{}{
"ids": *senderDiscordIds,
}).
SetHeader("Accept", "application/json").
Get("/v1/userRoles")
Post("/v1/usersRoles")

if err != nil {
log.WithError(err).Warn("Could not retrieve sender roles")
Expand All @@ -33,40 +33,40 @@ func GetSenderRoles(httpClient *resty.Client, url string, senderDiscordId string
return
}

roles, ok := resp.Result().(*[]string)
roles, ok := resp.Result().(*model.DiscordIdRolesPayload)
if !ok {
log.Warn("Failed to parse response")
return
}
return
}

func GetSenderDiscordId(httpClient *resty.Client, url string, sender string, log *logrus.Entry) (senderIdPayload *[]model.SenderDiscordIdPayload, err error) {
func GetWalletToDiscordIdMap(httpClient *resty.Client, url string, addresses *[]string, log *logrus.Entry) (senderIdPayload *model.WalletDiscordIdPayload, err error) {
if err != nil {
return
}

resp, err := httpClient.SetBaseURL(url).R().
SetResult([]model.SenderDiscordIdPayload{}).
SetResult(model.WalletDiscordIdPayload{}).
ForceContentType("application/json").
SetQueryParams(map[string]string{
"address": sender,
SetBody(map[string]interface{}{
"addresses": *addresses,
}).
SetHeader("Accept", "application/json").
Get("/warpy/user-id")
Post("/warpy/user-ids")

if err != nil {
return
}

if !resp.IsSuccess() {
log.WithField("statusCode", resp.StatusCode()).WithField("response", resp).WithField("sender", sender).
log.WithField("statusCode", resp.StatusCode()).WithField("response", resp).
Warn("Sender Discord id request has not been successful")
err = errors.New("sender Discord id request has not been successful")
return
}

senderIdPayload, ok := resp.Result().(*[]model.SenderDiscordIdPayload)
senderIdPayload, ok := resp.Result().(*model.WalletDiscordIdPayload)
if !ok {
log.Warn("Failed to parse response")

Expand All @@ -76,14 +76,21 @@ func GetSenderDiscordId(httpClient *resty.Client, url string, sender string, log
return
}

func WriteInteractionToWarpy(ctx context.Context, arweaveSigner string, input json.Marshaler, contractId string, log *logrus.Entry, sequencerClient *sequencer.Client) (interactionId string, err error) {
signer, err := bundlr.NewArweaveSigner(arweaveSigner)
func WriteInteractionToWarpy(ctx context.Context, config config.WarpySyncer, input json.Marshaler, log *logrus.Entry, sequencerClient *sequencer.Client) (interactionId string, err error) {
signer, err := bundlr.NewArweaveSigner(config.SyncerSigner)
if err != nil {
log.WithError(err).Error("Could not create Arweave Signer")
return
}

interactionId, err = sequencerClient.UploadInteraction(ctx, input, sequencer_types.WriteInteractionOptions{ContractTxId: contractId}, signer)
interactionId, err = sequencerClient.UploadInteraction(
ctx,
input,
sequencer_types.WriteInteractionOptions{ContractTxId: config.SyncerContractId, Tags: []bundlr.Tag{{
Name: "Chain",
Value: config.SyncerChain.String(),
}}},
signer)
if err != nil {
log.WithError(err).Error("Could not write interaction to Warpy")
return
Expand Down
131 changes: 82 additions & 49 deletions src/warpy_sync/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package warpy_sync
import (
"context"
"errors"
"strings"

"github.com/cenkalti/backoff"
"github.com/go-resty/resty/v2"
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/model"
"github.com/warp-contracts/syncer/src/utils/monitoring"
"github.com/warp-contracts/syncer/src/utils/sequencer"
"github.com/warp-contracts/syncer/src/utils/task"
Expand Down Expand Up @@ -67,66 +67,82 @@ func (self *Writer) writeInteraction(payloads *[]InteractionPayload) (err error)
return
}

counter := 0
members := make([]Member, chunkSize)
payloadChunk := make([]InteractionPayload, 0, chunkSize)

for _, payload := range *payloads {
if payload.Points == 0 {
self.Log.WithField("from_address", payload.FromAddress).Debug("Skipping from address, points 0")
continue
}

roles, err := self.discordRoles(payload.FromAddress)
if err != nil {
self.Log.WithError(err).Error("Failed to get roles")
return err
}
if roles == nil {
self.Log.WithField("from_address", payload.FromAddress).Debug("Skipping address, not registered in warpy")
continue
payloadChunk = append(payloadChunk, payload)
if len(payloadChunk) >= chunkSize {
err = self.sendInteractionChunk(&payloadChunk)
if err != nil {
self.Log.
WithField("chunk_size", len(payloadChunk)).
WithError(err).Error("Failed to send interaction chunk")
return err
}
payloadChunk = make([]InteractionPayload, 0, chunkSize)
}

members[counter] = Member{Id: payload.FromAddress, Roles: *roles, Points: payload.Points}
counter += 1
if counter >= chunkSize {
err = self.sendInteractionChunk(&members)
counter = 0
members = make([]Member, chunkSize)
}
}
if len(payloadChunk) > 0 {
err = self.sendInteractionChunk(&payloadChunk)
if err != nil {
self.Log.WithError(err).Error("Failed to send interaction chunk")
return err
}
}
if len(members) > 0 {
err = self.sendInteractionChunk(&members)
}

return
}

func (self *Writer) sendInteractionChunk(members *[]Member) (err error) {
func (self *Writer) sendInteractionChunk(interactions *[]InteractionPayload) (err error) {
self.Log.WithField("chunk_size", len(*interactions)).
Info("Attempting to send a chunk of interaction payload")

input := Input{
Function: "addPointsForAddress",
Points: 0,
AdminId: self.Config.WarpySyncer.SyncerInteractionAdminId,
Members: *members,
addressToRoles, err := self.walletAddressToDiscordRoles(interactions)
if err != nil {
self.Log.WithError(err).Error("Failed to get roles")
return err
}
if addressToRoles == nil || len(*addressToRoles) == 0 {
self.Log.WithField("chunk_size", len(*interactions)).
Debug("Skipping writing interactions, none of the addresses registered in warpy")
return
}

if len(*members) == 1 {
self.Log.WithField("from_address", (*members)[0].Id).
WithField("points", (*members)[0].Points).
Debug("Writing interaction to Warpy...")
members := make([]Member, 0, len(*interactions))
for _, p := range *interactions {
roles, ok := (*addressToRoles)[p.FromAddress]
if ok {
members = append(members, Member{Id: p.FromAddress, Roles: roles, Points: p.Points})
} else {
self.Log.WithField("from_address", p.FromAddress).Debug("Skipping address, not registered in warpy")
}
}

if len(members) == 1 {
self.Log.WithField("from_address", (members)[0].Id).
WithField("points", (members)[0].Points).
Debug("Writing interaction to Warpy...")
} else {
self.Log.WithField("chunk_size", len(*members)).
self.Log.WithField("chunk_size", len(members)).
WithField("points_default", 0).
Debug("Writing interaction to Warpy...")
}

input := Input{
Function: "addPointsForAddress",
Points: 0,
AdminId: self.Config.WarpySyncer.SyncerInteractionAdminId,
Members: members,
}

interactionId, err := warpy.WriteInteractionToWarpy(
self.Ctx, self.Config.WarpySyncer.SyncerSigner, input, self.Config.WarpySyncer.SyncerContractId, self.Log, self.sequencerClient)
self.Ctx, self.Config.WarpySyncer, input, self.Log, self.sequencerClient)
if err != nil {
return err
}
Expand All @@ -136,7 +152,14 @@ func (self *Writer) sendInteractionChunk(members *[]Member) (err error) {
return
}

func (self *Writer) discordRoles(fromAddress string) (roles *[]string, err error) {
// Returns a map of wallet address to a list of discord roles.
// If wallet not registered in warpy the map will not contain the key.
// If wallet found but no roles the map will point to an empty collection.
func (self *Writer) walletAddressToDiscordRoles(payloads *[]InteractionPayload) (walletToRoles *map[string][]string, err error) {
addresses := make([]string, 0, len(*payloads))
for _, p := range *payloads {
addresses = append(addresses, p.FromAddress)
}
err = task.NewRetry().
WithContext(self.Ctx).
// Retries infinitely until success
Expand All @@ -149,37 +172,47 @@ func (self *Writer) discordRoles(fromAddress string) (roles *[]string, err error
}

self.monitor.GetReport().WarpySyncer.Errors.WriterFailures.Inc()
self.Log.WithError(err).WithField("from_address", fromAddress).
self.Log.WithError(err).
Warn("Could not process assets sum, retrying...")
return err
}).
Run(func() error {
senderDiscordIdPayload, err := warpy.GetSenderDiscordId(self.httpClient, self.Config.WarpySyncer.SyncerDreUrl, fromAddress, self.Log)
result, err := warpy.GetWalletToDiscordIdMap(self.httpClient, self.Config.WarpySyncer.SyncerDreUrl, &addresses, self.Log)
if err != nil {
self.Log.WithError(err).Warn("Could not retrieve sender Discord id")
self.Log.WithError(err).Warn("Could not retrieve sender Discord ids")
return err
}

if senderDiscordIdPayload == nil || len(*senderDiscordIdPayload) == 0 {
self.Log.WithField("from_address", fromAddress).
Info("Address not registered in Warpy, exiting")
if result.WalletToDiscordId == nil || len(result.WalletToDiscordId) == 0 {
self.Log.
WithField("addresses", strings.Join(addresses, ",")).
Debug("No discord ids found for specified address, exiting")
return nil
}

senderDiscordId := []model.SenderDiscordIdPayload{}
senderDiscordId = append(senderDiscordId, *senderDiscordIdPayload...)

senderRoles, err := warpy.GetSenderRoles(self.httpClient, self.Config.WarpySyncer.SyncerWarpyApiUrl, senderDiscordId[0].Key, self.Log)
ids := make([]string, 0, len(result.WalletToDiscordId))
for _, w := range addresses {
id := result.WalletToDiscordId[w]
if len(id) > 15 {
ids = append(ids, id)
} else {
self.Log.WithField("from_address", w).
Info("Address not registered in Warpy, skipping")
}
}

walletToRoles = &map[string][]string{}
rolesPayload, err := warpy.GetSendersRoles(self.httpClient, self.Config.WarpySyncer.SyncerWarpyApiUrl, &ids, self.Log)
if err != nil {
self.Log.WithError(err).Warn("Could not retrieve sender roles")
self.Log.
WithError(err).Warn("Could not retrieve senders roles")
return err
}

if senderRoles != nil {
roles = senderRoles
} else {
roles = &[]string{}
for _, w := range addresses {
if id, ok := result.WalletToDiscordId[w]; ok {
(*walletToRoles)[w] = (*rolesPayload).IdToRoles[id]
}
}

return nil
Expand Down

0 comments on commit 0fb97f6

Please sign in to comment.