-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Benny/opensea streaming #1343
Benny/opensea streaming #1343
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! The email digest stuff looks A-OK to me. For OpenSea streaming, I left a few comments about how we store and update the bloom filter, but everything else looks great!
emails/digest.go
Outdated
@@ -218,6 +202,13 @@ func getDigest(c context.Context, stg *storage.Client, f *publicapi.FeedAPI, q * | |||
includeCommunities = *overrides.IncludeTopCommunities | |||
} | |||
|
|||
topPostCount, _ := util.FindFirst([]int{overrides.FirstPostCount, defaultFirstPostCount, 5}, func(i int) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these top posts or first posts? There seems to be a mix of terminology.
emails/digest.go
Outdated
@@ -227,11 +218,44 @@ func getDigest(c context.Context, stg *storage.Client, f *publicapi.FeedAPI, q * | |||
Selected: selectedCollections, | |||
Include: includeCommunities, | |||
}, | |||
FirstPostCount: topPostCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, re: top posts vs first posts
emails/digest.go
Outdated
} | ||
return result, nil | ||
} | ||
|
||
func contractToUserFacing(ctx context.Context, q *coredb.Queries, l *dataloader.Loaders, collection coredb.Contract) UserFacingContract { | ||
func getOverrides(c context.Context, stg *storage.Client) (DigestValueOverrides, error) { | ||
_, err := stg.Bucket(env.GetString("CONFIGURATION_BUCKET")).Object(overrideFile).Attrs(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a variable for stg.Bucket(env.GetString("CONFIGURATION_BUCKET")).Object(overrideFile)
since we're using it several times in this function?
for _, p := range second.TopPosts { | ||
seenPostPositions[p.Position] = true | ||
} | ||
for _, p := range second.TopCommunities { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do this loop for TopGalleries
and TopFirstPosts
, too?
@@ -357,7 +359,7 @@ func sendDigestEmailsToAllUsers(c context.Context, v DigestValues, queries *core | |||
}) | |||
} | |||
|
|||
func sendDigestEmailToUser(c context.Context, u coredb.PiiUserView, emailRecipient persist.Email, digestValues DigestValues, s *sendgrid.Client, sendRealEmail bool) (*rest.Response, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the net effect of removing the sendRealEmail
parameter? For example, if we get here on dev, would we send emails to all the dummy PII-scrubbed email addresses we have in the database?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized it was unnecessary because on anything but prod we only send emails to Admin verified email accounts.
_, message, err := conn.ReadMessage() | ||
if err != nil { | ||
logger.For(ctx).Error(err) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assume we error here and this break
is executed. What happens? Would it be the case that the service keeps running (because of the ticker loop below), but we wouldn't be listening for messages anymore? Seems like we'd want to log the error and restart the connection or something along those lines.
|
||
var oe openseaEvent | ||
err = json.Unmarshal(message, &oe) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's an unmarshaling error, I feel like we should log it so we know what happened
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is an interesting one because with the JS SDK we are able to filter down to specific events but because we are doing this in go we get all incoming events from every type of event. Because of this, most of them error out here simply because they are the wrong event type and don't match the JSON schema we defined. It will get really spammy if we log this.
c.String(http.StatusOK, "OK") | ||
}) | ||
|
||
router.GET("/updateBloomFilter", func(c *gin.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used by anything, or is it just for debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just used for debugging for now but I imagine can be used when a user signs up if we want to add that functionality later?
opensea-streamer/main.go
Outdated
continue | ||
} | ||
|
||
bf, err = resetBloomFilter(ctx, queries, bloomCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating bf
here won't change the reference being used by streamOpenseaTransfers
. It'll change what the outer bf
variable in main()
points to, but it's a pointer -- not a pointer to a pointer -- so it isn't going to change what streamOpenseaTransfers
sees.
Assuming it's quick to update the bloom filter -- and I think that's true, right? -- we probably don't need to use Redis at all. I was thinking we'd want to use it because it has its own bloom filter implementation and we wouldn't need to store everything in memory on a server if we were using Redis. But given that we're actually storing everything in memory here, and given that rebuilding the filter is probably pretty quick, I don't see much benefit to using Redis. At best, it saves us a small amount of time at startup, but only if there hasn't been a change in the number of active wallets we're tracking.
Instead, let's get rid of Redis entirely. We can use an atomic.Pointer[bloomFilter]
to track the active bloom filter, and when we update the bloom filter, use the atomic pointer's Store
function to point to the newly created filter. In your processing loop, you'd just want to load the current reference every time you're going to use the bloom filter. There's some overhead, but using atomic
is generally much more efficient than using locks, and I think for our purposes it'll be okay. Something like:
// check if the wallet is in the bloom filter
chainAddress, err := persist.NewL1ChainAddress(persist.Address(win.Payload.ToAccount.Address.String()), win.Payload.Item.NFTID.Chain).MarshalJSON()
if err != nil {
logger.For(ctx).Error(err)
continue
}
bf := myAtomicPointer.Load()
if !bf.Test(chainAddress) {
continue
}
I think you'll also want to put the atomic.Pointer in the global scope to make it easier to ensure that all methods are using the same reference to the same atomic pointer.
db/queries/core/query.sql
Outdated
|
||
|
||
-- name: CountActiveWallets :one | ||
select count(w.*) from users u join wallets w on w.id = any(u.wallets) where not u.deleted and not w.deleted and not u.universal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One edge case here: if one wallet gets added and a different wallet gets deleted, the count stays the same but our active wallets have changed. We could either:
- skip this check for now and just update the filter every hour, or
- use a query that checks whether any active wallets have been added since
<timestamp>
, and check that every hour
I think either is fine; if updating the bloom filter is fast, updating it hourly (even if it hasn't changed) sounds okay to me. As more and more people start using Gallery, we probably will end up seeing a change in our active wallets every hour anyway.
Changes:
opensea-streamer
that listens with a websocket to all token transfers and sends them to the token-processing service