Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benny/opensea streaming #1343

Merged
merged 34 commits into from
Jan 11, 2024
Merged

Benny/opensea streaming #1343

merged 34 commits into from
Jan 11, 2024

Conversation

benny-conn
Copy link
Collaborator

@benny-conn benny-conn commented Jan 3, 2024

Changes:

  • Added a new singleton service called opensea-streamer that listens with a websocket to all token transfers and sends them to the token-processing service
  • Added a handler on the token processing service for the webhook
  • Added some non-blocking and unrelated email digest adjustments

@github-actions github-actions bot added the query label Jan 4, 2024
opensea-streamer/main.go Fixed Show fixed Hide fixed
Copy link
Contributor

@radazen radazen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! The email digest stuff looks A-OK to me. For OpenSea streaming, I left a few comments about how we store and update the bloom filter, but everything else looks great!

emails/digest.go Outdated
@@ -218,6 +202,13 @@ func getDigest(c context.Context, stg *storage.Client, f *publicapi.FeedAPI, q *
includeCommunities = *overrides.IncludeTopCommunities
}

topPostCount, _ := util.FindFirst([]int{overrides.FirstPostCount, defaultFirstPostCount, 5}, func(i int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these top posts or first posts? There seems to be a mix of terminology.

emails/digest.go Outdated
@@ -227,11 +218,44 @@ func getDigest(c context.Context, stg *storage.Client, f *publicapi.FeedAPI, q *
Selected: selectedCollections,
Include: includeCommunities,
},
FirstPostCount: topPostCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, re: top posts vs first posts

emails/digest.go Outdated
}
return result, nil
}

func contractToUserFacing(ctx context.Context, q *coredb.Queries, l *dataloader.Loaders, collection coredb.Contract) UserFacingContract {
func getOverrides(c context.Context, stg *storage.Client) (DigestValueOverrides, error) {
_, err := stg.Bucket(env.GetString("CONFIGURATION_BUCKET")).Object(overrideFile).Attrs(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a variable for stg.Bucket(env.GetString("CONFIGURATION_BUCKET")).Object(overrideFile) since we're using it several times in this function?

for _, p := range second.TopPosts {
seenPostPositions[p.Position] = true
}
for _, p := range second.TopCommunities {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this loop for TopGalleries and TopFirstPosts, too?

@@ -357,7 +359,7 @@ func sendDigestEmailsToAllUsers(c context.Context, v DigestValues, queries *core
})
}

func sendDigestEmailToUser(c context.Context, u coredb.PiiUserView, emailRecipient persist.Email, digestValues DigestValues, s *sendgrid.Client, sendRealEmail bool) (*rest.Response, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the net effect of removing the sendRealEmail parameter? For example, if we get here on dev, would we send emails to all the dummy PII-scrubbed email addresses we have in the database?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized it was unnecessary because on anything but prod we only send emails to Admin verified email accounts.

_, message, err := conn.ReadMessage()
if err != nil {
logger.For(ctx).Error(err)
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume we error here and this break is executed. What happens? Would it be the case that the service keeps running (because of the ticker loop below), but we wouldn't be listening for messages anymore? Seems like we'd want to log the error and restart the connection or something along those lines.


var oe openseaEvent
err = json.Unmarshal(message, &oe)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's an unmarshaling error, I feel like we should log it so we know what happened

Copy link
Collaborator Author

@benny-conn benny-conn Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is an interesting one because with the JS SDK we are able to filter down to specific events but because we are doing this in go we get all incoming events from every type of event. Because of this, most of them error out here simply because they are the wrong event type and don't match the JSON schema we defined. It will get really spammy if we log this.

c.String(http.StatusOK, "OK")
})

router.GET("/updateBloomFilter", func(c *gin.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used by anything, or is it just for debugging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just used for debugging for now but I imagine can be used when a user signs up if we want to add that functionality later?

continue
}

bf, err = resetBloomFilter(ctx, queries, bloomCache)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating bf here won't change the reference being used by streamOpenseaTransfers. It'll change what the outer bf variable in main() points to, but it's a pointer -- not a pointer to a pointer -- so it isn't going to change what streamOpenseaTransfers sees.

Assuming it's quick to update the bloom filter -- and I think that's true, right? -- we probably don't need to use Redis at all. I was thinking we'd want to use it because it has its own bloom filter implementation and we wouldn't need to store everything in memory on a server if we were using Redis. But given that we're actually storing everything in memory here, and given that rebuilding the filter is probably pretty quick, I don't see much benefit to using Redis. At best, it saves us a small amount of time at startup, but only if there hasn't been a change in the number of active wallets we're tracking.

Instead, let's get rid of Redis entirely. We can use an atomic.Pointer[bloomFilter] to track the active bloom filter, and when we update the bloom filter, use the atomic pointer's Store function to point to the newly created filter. In your processing loop, you'd just want to load the current reference every time you're going to use the bloom filter. There's some overhead, but using atomic is generally much more efficient than using locks, and I think for our purposes it'll be okay. Something like:

// check if the wallet is in the bloom filter
chainAddress, err := persist.NewL1ChainAddress(persist.Address(win.Payload.ToAccount.Address.String()), win.Payload.Item.NFTID.Chain).MarshalJSON()
if err != nil {
	logger.For(ctx).Error(err)
	continue
}
bf := myAtomicPointer.Load()
if !bf.Test(chainAddress) {
	continue
}

I think you'll also want to put the atomic.Pointer in the global scope to make it easier to ensure that all methods are using the same reference to the same atomic pointer.



-- name: CountActiveWallets :one
select count(w.*) from users u join wallets w on w.id = any(u.wallets) where not u.deleted and not w.deleted and not u.universal;
Copy link
Contributor

@radazen radazen Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One edge case here: if one wallet gets added and a different wallet gets deleted, the count stays the same but our active wallets have changed. We could either:

  • skip this check for now and just update the filter every hour, or
  • use a query that checks whether any active wallets have been added since <timestamp>, and check that every hour

I think either is fine; if updating the bloom filter is fast, updating it hourly (even if it hasn't changed) sounds okay to me. As more and more people start using Gallery, we probably will end up seeing a change in our active wallets every hour anyway.

@benny-conn benny-conn merged commit 7a70e2a into main Jan 11, 2024
6 checks passed
@benny-conn benny-conn deleted the benny/opensea-streaming branch January 11, 2024 22:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants