From e6ef505166afee255b504b5f1bc9a51d00242121 Mon Sep 17 00:00:00 2001 From: Cronus <105345303+ice-cronus@users.noreply.github.com> Date: Fri, 2 Aug 2024 19:50:16 +0700 Subject: [PATCH] [IMPL] email queue processing (#176) (#187) --- application.yaml | 2 + auth/email_link/.testdata/docker-compose.yaml | 12 + auth/email_link/DDL.sql | 6 +- auth/email_link/contract.go | 12 +- auth/email_link/email_modify.go | 2 +- auth/email_link/emaillink.go | 35 ++- auth/email_link/link_start_auth.go | 152 +++++----- auth/email_link/queue.go | 267 ++++++++++++++++++ auth/email_link/users.go | 2 +- cmd/eskimo-hut/api/docs.go | 8 + cmd/eskimo-hut/api/swagger.json | 8 + cmd/eskimo-hut/api/swagger.yaml | 6 + cmd/eskimo-hut/auth.go | 5 +- cmd/eskimo-hut/contract.go | 4 +- cmd/eskimo-hut/eskimo_hut.go | 7 +- cmd/eskimo-hut/users.go | 2 +- ...rebase_phone_login_with_ice_email_login.go | 2 +- 17 files changed, 432 insertions(+), 100 deletions(-) create mode 100644 auth/email_link/.testdata/docker-compose.yaml create mode 100644 auth/email_link/queue.go diff --git a/application.yaml b/application.yaml index b530d773..1a383939 100644 --- a/application.yaml +++ b/application.yaml @@ -91,6 +91,8 @@ auth/telegram: auth/email-link: extraLoadBalancersCount: 2 wintr/connectors/storage/v2: *db + wintr/connectors/storage/v3: + url: redis://default:@localhost:6379/ fromEmailAddress: no-reply@ice.io fromEmailName: ice emailValidation: diff --git a/auth/email_link/.testdata/docker-compose.yaml b/auth/email_link/.testdata/docker-compose.yaml new file mode 100644 index 00000000..e9525650 --- /dev/null +++ b/auth/email_link/.testdata/docker-compose.yaml @@ -0,0 +1,12 @@ +# SPDX-License-Identifier: ice License 1.0 + +version: '3.7' +services: + eskimo-dfly: + container_name: eskimo-dfly + image: 'docker.dragonflydb.io/dragonflydb/dragonfly' + ulimits: + memlock: -1 + ports: + - 6379:6379 + command: "dragonfly --dbnum=1" \ No newline at end of file diff --git a/auth/email_link/DDL.sql b/auth/email_link/DDL.sql index 099b70d1..f2638e4d 100644 --- a/auth/email_link/DDL.sql +++ b/auth/email_link/DDL.sql @@ -9,6 +9,7 @@ CREATE TABLE IF NOT EXISTS email_link_sign_ins ( previously_issued_token_seq BIGINT DEFAULT 0 NOT NULL, confirmation_code_wrong_attempts_count BIGINT DEFAULT 0 NOT NULL, email TEXT NOT NULL, + language TEXT NOT NULL, confirmation_code TEXT, user_id TEXT, phone_number_to_email_migration_user_id TEXT, @@ -33,4 +34,7 @@ CREATE TABLE IF NOT EXISTS sign_ins_per_ip ( login_attempts BIGINT DEFAULT 0 NOT NULL CONSTRAINT sign_ins_per_ip_login_attempts_count CHECK (login_attempts <= 10), ip TEXT NOT NULL, PRIMARY KEY (login_session_number, ip) -); \ No newline at end of file +); + +ALTER TABLE email_link_sign_ins + ADD COLUMN IF NOT EXISTS language TEXT NOT NULL DEFAULT 'en'; \ No newline at end of file diff --git a/auth/email_link/contract.go b/auth/email_link/contract.go index 09589f30..b8c1e1ec 100644 --- a/auth/email_link/contract.go +++ b/auth/email_link/contract.go @@ -7,6 +7,7 @@ import ( "embed" "io" "mime/multipart" + "sync" "text/template" stdlibtime "time" @@ -17,6 +18,7 @@ import ( "github.com/ice-blockchain/eskimo/users" wintrauth "github.com/ice-blockchain/wintr/auth" "github.com/ice-blockchain/wintr/connectors/storage/v2" + storagev3 "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/email" "github.com/ice-blockchain/wintr/time" ) @@ -29,9 +31,10 @@ type ( } Client interface { IceUserIDClient - SendSignInLinkToEmail(ctx context.Context, emailValue, deviceUniqueID, language, clientIP string) (loginSession string, err error) + SendSignInLinkToEmail(ctx context.Context, emailValue, deviceUniqueID, language, clientIP string) (queuePos int64, rateLimit, loginSession string, err error) SignIn(ctx context.Context, loginFlowToken, confirmationCode string) (tokens *auth.Tokens, emailConfirmed bool, err error) UpdateMetadata(ctx context.Context, userID string, metadata *users.JSON) (*users.JSON, error) + CheckHealth(ctx context.Context) error RefreshToken(ctx context.Context, token *wintrauth.IceToken) (tokens *auth.Tokens, err error) } IceUserIDClient interface { @@ -83,18 +86,24 @@ const ( sameIPCheckRate = 24 * stdlibtime.Hour duplicatedSignInRequestsInLessThan = 2 * stdlibtime.Second + loginQueueKey = "login_queue" + loginRateLimitKey = "login_rate_limit" + initEmailRateLimit = "1000:1m" ) type ( languageCode = string client struct { + queueDB storagev3.DB db *storage.DB cfg *config shutdown func() error authClient wintrauth.Client userModifier UserModifier + cancel context.CancelFunc emailClients []email.Client fromRecipients []fromRecipient + queueWg sync.WaitGroup emailClientLBIndex uint64 } config struct { @@ -177,4 +186,5 @@ var ( modifyEmailType, notifyEmailChangedType, } + errAlreadyEnqueued = errors.New("already enqueued") ) diff --git a/auth/email_link/email_modify.go b/auth/email_link/email_modify.go index 412ed315..44d64aa9 100644 --- a/auth/email_link/email_modify.go +++ b/auth/email_link/email_modify.go @@ -44,7 +44,7 @@ func (c *client) handleEmailModification(ctx context.Context, els *emailLinkSign if notifyEmail != "" { now := time.Now() resetConfirmationCode := generateConfirmationCode() - uErr := c.upsertEmailLinkSignIn(ctx, oldEmail, els.DeviceUniqueID, resetConfirmationCode, now) + uErr := c.upsertEmailLinkSignIn(ctx, oldEmail, els.DeviceUniqueID, resetConfirmationCode, els.Language, now) if uErr != nil { return multierror.Append( //nolint:wrapcheck // . errors.Wrapf(c.resetEmailModification(ctx, usr.ID, oldEmail), "[reset] resetEmailModification failed for email:%v", oldEmail), diff --git a/auth/email_link/emaillink.go b/auth/email_link/emaillink.go index 706852fe..56a9ddf8 100644 --- a/auth/email_link/emaillink.go +++ b/auth/email_link/emaillink.go @@ -19,6 +19,7 @@ import ( "github.com/ice-blockchain/wintr/auth" appcfg "github.com/ice-blockchain/wintr/config" "github.com/ice-blockchain/wintr/connectors/storage/v2" + storagev3 "github.com/ice-blockchain/wintr/connectors/storage/v3" "github.com/ice-blockchain/wintr/email" "github.com/ice-blockchain/wintr/log" "github.com/ice-blockchain/wintr/time" @@ -29,15 +30,20 @@ func init() { loadEmailMagicLinkTranslationTemplates() } -func NewClient(ctx context.Context, userModifier UserModifier, authClient auth.Client) Client { +//nolint:funlen // . +func NewClient(ctx context.Context, cancel context.CancelFunc, userModifier UserModifier, authClient auth.Client) Client { cfg := loadConfiguration() cfg.validate() db := storage.MustConnect(ctx, ddl, applicationYamlKey) - + //nolint:contextcheck // Used in queue processing. + queueDB := storagev3.MustConnect(context.Background(), applicationYamlKey) + log.Panic(errors.Wrapf(queueDB.SetNX(ctx, loginRateLimitKey, initEmailRateLimit, 0).Err(), "failed to init email sending rate limit")) //nolint:revive // . cl := &client{ cfg: cfg, shutdown: db.Close, db: db, + cancel: cancel, + queueDB: queueDB, authClient: authClient, userModifier: userModifier, emailClients: make([]email.Client, 0, cfg.ExtraLoadBalancersCount+1), @@ -52,6 +58,7 @@ func NewClient(ctx context.Context, userModifier UserModifier, authClient auth.C cl.emailClients = append(cl.emailClients, email.New(fmt.Sprintf("%v/%v", applicationYamlKey, i+1))) cl.fromRecipients = append(cl.fromRecipients, fromRecipient{nestedCfg.FromEmailName, nestedCfg.FromEmailAddress}) } + go cl.processEmailQueue(ctx) } go cl.startOldLoginAttemptsCleaner(ctx) @@ -68,9 +75,33 @@ func NewROClient(ctx context.Context) IceUserIDClient { } func (c *client) Close() error { + if c.cancel != nil { + c.cancel() + } + c.queueWg.Wait() + return errors.Wrap(c.shutdown(), "closing auth/emaillink repository failed") } +func (c *client) CheckHealth(ctx context.Context) error { + return errors.Wrapf(c.checkQueueDBHealth(ctx), "[health-check] failed to ping queueDB/dfly for email client") +} + +func (c *client) checkQueueDBHealth(ctx context.Context) error { + if resp := c.queueDB.Ping(ctx); resp.Err() != nil || resp.Val() != "PONG" { + if resp.Err() == nil { + resp.SetErr(errors.Errorf("response `%v` is not `PONG`", resp.Val())) + } + + return errors.Wrap(resp.Err(), "[health-check] failed to ping DB") + } + if !c.queueDB.IsRW(ctx) { + return errors.New("db is not writeable") + } + + return nil +} + func loadConfiguration() *config { var cfg config appcfg.MustLoadFromKey(applicationYamlKey, &cfg) diff --git a/auth/email_link/link_start_auth.go b/auth/email_link/link_start_auth.go index 99c0c057..fb42d51b 100644 --- a/auth/email_link/link_start_auth.go +++ b/auth/email_link/link_start_auth.go @@ -22,75 +22,88 @@ import ( "github.com/ice-blockchain/wintr/time" ) -//nolint:funlen,gocognit,revive //. -func (c *client) SendSignInLinkToEmail(ctx context.Context, emailValue, deviceUniqueID, language, clientIP string) (loginSession string, err error) { +//nolint:funlen,gocognit,revive,gocritic,lll //. +func (c *client) SendSignInLinkToEmail(ctx context.Context, emailValue, deviceUniqueID, language, clientIP string) (posInQueue int64, rateLimit, loginSession string, err error) { if ctx.Err() != nil { - return "", errors.Wrap(ctx.Err(), "send sign in link to email failed because context failed") + return 0, "", "", errors.Wrap(ctx.Err(), "send sign in link to email failed because context failed") } - id := loginID{emailValue, deviceUniqueID} now := time.Now() + id := loginID{emailValue, deviceUniqueID} loginSessionNumber := now.Time.Unix() / int64(sameIPCheckRate.Seconds()) + oldEmail := users.ConfirmedEmail(ctx) + if oldEmail == "" { + posInQueue, rateLimit, err = c.enqueueLoginAttempt(ctx, now, emailValue) + if err != nil { + if errors.Is(err, errAlreadyEnqueued) { + loginSession, err = c.getExistingLoginSession(ctx, &id, loginSessionNumber, clientIP) + + return posInQueue, rateLimit, loginSession, errors.Wrapf(err, "failed to fetch existing login session for email %v", id.Email) + } + + return 0, "", "", errors.Wrapf(err, "failed to enqueue email %v", emailValue) + } + } + if vErr := c.validateEmailSignIn(ctx, &id); vErr != nil { - return "", errors.Wrapf(vErr, "can't validate email sign in for:%#v", id) + return 0, "", "", errors.Wrapf(vErr, "can't validate email sign in for:%#v", id) } - oldEmail := users.ConfirmedEmail(ctx) if oldEmail != "" { loginSessionNumber = 0 clientIP = "" //nolint:revive // . oldID := loginID{oldEmail, deviceUniqueID} if vErr := c.validateEmailModification(ctx, emailValue, &oldID); vErr != nil { - return "", errors.Wrapf(vErr, "can't validate modification email for:%#v", oldID) + return 0, "", "", errors.Wrapf(vErr, "can't validate modification email for:%#v", oldID) } } confirmationCode := generateConfirmationCode() loginSession, err = c.generateLoginSession(&id, clientIP, oldEmail, loginSessionNumber) if err != nil { - return "", errors.Wrap(err, "can't call generateLoginSession") + return 0, "", "", errors.Wrap(err, "can't call generateLoginSession") } - if loginSessionNumber > 0 && clientIP != "" && userIDForPhoneNumberToEmailMigration(ctx) == "" { - if ipErr := c.upsertIPLoginAttempt(ctx, &id, clientIP, loginSessionNumber); ipErr != nil { - return "", errors.Wrapf(ipErr, "failed increment login attempts for IP:%v (session num %v)", clientIP, loginSessionNumber) - } - } - if uErr := c.upsertEmailLinkSignIn(ctx, id.Email, id.DeviceUniqueID, confirmationCode, now); uErr != nil { + if uErr := c.upsertEmailLinkSignIn(ctx, id.Email, id.DeviceUniqueID, confirmationCode, language, now); uErr != nil { if errors.Is(uErr, ErrUserDuplicate) { - oldLoginSession, oErr := c.restoreOldLoginSession(ctx, &id, clientIP, oldEmail, loginSessionNumber) + oldLoginSession, oErr := c.restoreOldLoginSession(&id, clientIP, oldEmail, loginSessionNumber) if oErr != nil { - return "", multierror.Append( //nolint:wrapcheck // . + return 0, "", "", multierror.Append( //nolint:wrapcheck // . errors.Wrapf(oErr, "failed to calculate oldLoginSession"), errors.Wrapf(uErr, "failed to store/update email link sign ins for id:%#v", id), ).ErrorOrNil() } - return oldLoginSession, nil + return posInQueue, rateLimit, oldLoginSession, nil } - return "", multierror.Append( //nolint:wrapcheck // . - errors.Wrapf(c.decrementIPLoginAttempts(ctx, clientIP, loginSessionNumber), "[rollback] failed to rollback login attempts for ip"), - errors.Wrapf(uErr, "failed to store/update email link sign ins for id:%#v", id), - ).ErrorOrNil() + return 0, "", "", errors.Wrapf(uErr, "failed to store/update email link sign ins for id:%#v", id) + } + if oldEmail != "" { + if sendModEmailErr := c.sendEmailWithType(ctx, modifyEmailType, language, []string{id.Email}, []string{confirmationCode}); sendModEmailErr != nil { + return 0, "", loginSession, errors.Wrapf(sendModEmailErr, "failed to send validation email for id:%#v", id) + } } - if sErr := c.sendConfirmationCode(ctx, &id, oldEmail, confirmationCode, language); sErr != nil { - return "", multierror.Append( //nolint:wrapcheck // . - errors.Wrapf(c.decrementIPLoginAttempts(ctx, clientIP, loginSessionNumber), "[rollback] failed to rollback login attempts for ip"), - errors.Wrapf(sErr, "can't send magic link for id:%#v", id), - ).ErrorOrNil() + + return posInQueue, rateLimit, loginSession, nil +} + +func (c *client) getExistingLoginSession(ctx context.Context, id *loginID, loginSessionNumber int64, clientIP string) (loginSession string, err error) { + _, sErr := c.getEmailLinkSignInByPk(ctx, id, "") + if sErr != nil { + return "", errors.Wrapf(sErr, "failed to get user info by email:%v", id.Email) + } + loginSession, err = c.generateLoginSession(id, clientIP, "", loginSessionNumber) + if err != nil { + return "", errors.Wrap(err, "can't call generateLoginSession") } return loginSession, nil } -func (c *client) restoreOldLoginSession(ctx context.Context, id *loginID, clientIP, oldEmail string, loginSessionNumber int64) (string, error) { +func (c *client) restoreOldLoginSession(id *loginID, clientIP, oldEmail string, loginSessionNumber int64) (string, error) { oldLoginSession, dErr := c.generateLoginSession(id, clientIP, oldEmail, loginSessionNumber) if dErr != nil { - return "", multierror.Append( //nolint:wrapcheck // . - errors.Wrapf(c.decrementIPLoginAttempts(ctx, clientIP, loginSessionNumber), "[rollback] failed to rollback login attempts for ip"), - errors.Wrap(dErr, "can't generate loginSession"), - ).ErrorOrNil() + return "", errors.Wrap(dErr, "can't generate loginSession") } - return oldLoginSession, errors.Wrapf(c.decrementIPLoginAttempts(ctx, clientIP, loginSessionNumber), - "failed to rollback login attempts for ip due to reuse of loginSession") + return oldLoginSession, nil } func (c *client) validateEmailSignIn(ctx context.Context, id *loginID) error { @@ -112,19 +125,6 @@ func (c *client) validateEmailSignIn(ctx context.Context, id *loginID) error { return nil } -func (c *client) decrementIPLoginAttempts(ctx context.Context, ip string, loginSessionNumber int64) error { - if ip != "" && loginSessionNumber > 0 && userIDForPhoneNumberToEmailMigration(ctx) == "" { - sql := `UPDATE sign_ins_per_ip SET - login_attempts = GREATEST(sign_ins_per_ip.login_attempts - 1, 0) - WHERE ip = $1 AND login_session_number = $2` - _, err := storage.Exec(ctx, c.db, sql, ip, loginSessionNumber) - - return errors.Wrapf(err, "failed to decrease login attempts for ip %v lsn %v", ip, loginSessionNumber) - } - - return nil -} - func (c *client) validateEmailModification(ctx context.Context, newEmail string, oldID *loginID) error { if iErr := c.isUserExist(ctx, newEmail); !storage.IsErr(iErr, storage.ErrNotFound) { if iErr != nil { @@ -149,19 +149,8 @@ func (c *client) validateEmailModification(ctx context.Context, newEmail string, return nil } -func (c *client) sendConfirmationCode(ctx context.Context, id *loginID, oldEmail, confirmationCode, language string) error { - var emailType string - if oldEmail != "" { - emailType = modifyEmailType - } else { - emailType = signInEmailType - } - - return errors.Wrapf(c.sendEmailWithType(ctx, emailType, id.Email, language, confirmationCode), "failed to send validation email for id:%#v", id) -} - //nolint:funlen // . -func (c *client) sendEmailWithType(ctx context.Context, emailType, toEmail, language, confirmationCode string) error { +func (c *client) sendEmailWithType(ctx context.Context, emailType, language string, toEmails, confirmationCodes []string) error { var tmpl *emailTemplate tmpl, ok := allEmailLinkTemplates[emailType][language] if !ok { @@ -174,17 +163,26 @@ func (c *client) sendEmailWithType(ctx context.Context, emailType, toEmail, lang AppName string TeamName string }{ - Email: toEmail, - ConfirmationCode: confirmationCode, PetName: c.cfg.PetName, AppName: c.cfg.AppName, TeamName: c.cfg.TeamName, + Email: "{{.Email}}", + ConfirmationCode: "{{.ConfirmationCode}}", } dataSubject := struct { AppName string }{ AppName: c.cfg.AppName, } + participants := make([]email.Participant, 0, len(toEmails)) + for i := range toEmails { + participants = append(participants, email.Participant{ + Name: "", + Email: toEmails[i], + SubstitutionFields: map[string]string{"{{.ConfirmationCode}}": confirmationCodes[i], "{{.Email}}": toEmails[i]}, + }) + } + lbIdx := atomic.AddUint64(&c.emailClientLBIndex, 1) % uint64(c.cfg.ExtraLoadBalancersCount+1) return errors.Wrapf(c.emailClients[lbIdx].Send(ctx, &email.Parcel{ @@ -197,27 +195,26 @@ func (c *client) sendEmailWithType(ctx context.Context, emailType, toEmail, lang Name: c.fromRecipients[lbIdx].FromEmailName, Email: c.fromRecipients[lbIdx].FromEmailAddress, }, - }, email.Participant{ - Name: "", - Email: toEmail, - }), "failed to send email with type:%v for user with email:%v", emailType, toEmail) + }, participants...), "failed to send email with type:%v for user with emails:%v", emailType, toEmails) } -//nolint:lll // . -func (c *client) upsertEmailLinkSignIn(ctx context.Context, toEmail, deviceUniqueID, code string, now *time.Time) error { +//nolint:lll,revive // . +func (c *client) upsertEmailLinkSignIn(ctx context.Context, toEmail, deviceUniqueID, code, language string, now *time.Time) error { confirmationCodeWrongAttempts := 0 - params := []any{now.Time, toEmail, deviceUniqueID, code, confirmationCodeWrongAttempts, userIDForPhoneNumberToEmailMigration(ctx)} + params := []any{now.Time, toEmail, deviceUniqueID, code, language, confirmationCodeWrongAttempts, userIDForPhoneNumberToEmailMigration(ctx)} sql := fmt.Sprintf(`INSERT INTO email_link_sign_ins ( created_at, email, device_unique_id, confirmation_code, + language, confirmation_code_wrong_attempts_count, phone_number_to_email_migration_user_id) - VALUES ($1, $2, $3, $4, $5, NULLIF($6,'')) + VALUES ($1, $2, $3, $4,$5, $6, NULLIF($7,'')) ON CONFLICT (email, device_unique_id) DO UPDATE SET created_at = EXCLUDED.created_at, confirmation_code = EXCLUDED.confirmation_code, + language = EXCLUDED.language, confirmation_code_wrong_attempts_count = EXCLUDED.confirmation_code_wrong_attempts_count, phone_number_to_email_migration_user_id = COALESCE(NULLIF(EXCLUDED.phone_number_to_email_migration_user_id,''),email_link_sign_ins.phone_number_to_email_migration_user_id), email_confirmed_at = null, @@ -234,25 +231,6 @@ func (c *client) upsertEmailLinkSignIn(ctx context.Context, toEmail, deviceUniqu return errors.Wrapf(err, "failed to insert/update email link sign ins record for email:%v", toEmail) } -func (c *client) upsertIPLoginAttempt(ctx context.Context, id *loginID, clientIP string, loginSessionNumber int64) error { - sql := `INSERT INTO sign_ins_per_ip (ip, login_session_number, login_attempts) - VALUES ($1, $2, 1) - ON CONFLICT (login_session_number, ip) DO UPDATE - SET login_attempts = sign_ins_per_ip.login_attempts + 1` - _, err := storage.Exec(ctx, c.db, sql, clientIP, loginSessionNumber) - if err != nil { - if storage.IsErr(err, storage.ErrCheckFailed) { - err = errors.Wrapf(ErrTooManyAttempts, "user %#v is blocked due to a lot of requests from IP %v", id, clientIP) - - return terror.New(err, map[string]any{"ip": clientIP}) - } - - return errors.Wrapf(err, "failed to increment login attempts from IP %v", clientIP) - } - - return nil -} - func (c *client) generateMagicLinkPayload(id *loginID, oldEmail string, now *time.Time) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodHS256, loginFlowToken{ RegisteredClaims: &jwt.RegisteredClaims{ diff --git a/auth/email_link/queue.go b/auth/email_link/queue.go new file mode 100644 index 00000000..70fa9215 --- /dev/null +++ b/auth/email_link/queue.go @@ -0,0 +1,267 @@ +// SPDX-License-Identifier: ice License 1.0 + +package emaillinkiceauth + +import ( + "context" + "fmt" + "math" + "math/rand/v2" + "strconv" + "strings" + stdlibtime "time" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" + + "github.com/ice-blockchain/wintr/connectors/storage/v2" + "github.com/ice-blockchain/wintr/email" + "github.com/ice-blockchain/wintr/log" + "github.com/ice-blockchain/wintr/time" +) + +//nolint:funlen // . +func (c *client) enqueueLoginAttempt(ctx context.Context, now *time.Time, userEmail string) (queuePosition int64, rateLimit string, err error) { + var result []redis.Cmder + result, err = c.queueDB.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error { + if zErr := pipeliner.ZAddNX(ctx, loginQueueKey, redis.Z{ + Score: float64(now.Nanosecond()), + Member: userEmail, + }).Err(); zErr != nil { + return zErr //nolint:wrapcheck // . + } + if zRankErr := pipeliner.ZRank(ctx, loginQueueKey, userEmail).Err(); zRankErr != nil { + return zRankErr //nolint:wrapcheck // . + } + + return pipeliner.Get(ctx, loginRateLimitKey).Err() + }) + if err != nil { + return 0, "", errors.Wrapf(err, "failed to enqueue email") + } + errs := make([]error, 0, len(result)) + for idx := 2; idx >= 0; idx-- { + cmdRes := result[idx] + if cmdRes.Err() != nil { + errs = append(errs, errors.Wrapf(cmdRes.Err(), "failed to enqueue email because of failed %v", cmdRes.String())) + + continue + } + switch idx { + case 2: //nolint:gomnd // Index in pipeline. + strCmd := cmdRes.(*redis.StringCmd) //nolint:errcheck,forcetypeassert // . + rateLimit = strCmd.Val() + case 1: + intCmd := cmdRes.(*redis.IntCmd) //nolint:errcheck,forcetypeassert // . + queuePosition = intCmd.Val() + 1 + case 0: + intCmd := cmdRes.(*redis.IntCmd) //nolint:errcheck,forcetypeassert // . + if intCmd.Val() == 0 { + return queuePosition, rateLimit, errAlreadyEnqueued + } + } + } + if cmdErr := multierror.Append(nil, errs...).ErrorOrNil(); cmdErr != nil { + return queuePosition, rateLimit, errors.Wrapf(cmdErr, "failed to enqueue email %v", userEmail) + } + + return queuePosition, rateLimit, nil +} + +//nolint:funlen,gocognit,revive,contextcheck // Keep processing in signle place. +func (c *client) processEmailQueue(rootCtx context.Context) { + lastProcessed := time.Now() + + emailQueueLock := storage.NewMutex(c.db, loginQueueKey) + lockCtx, lockCancel := context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // . + if err := emailQueueLock.Lock(lockCtx); err != nil { + if !errors.Is(err, storage.ErrMutexNotLocked) { + log.Panic(errors.Wrapf(err, "failed to obtain emailQueueLock for email queue")) + } + } + lockCancel() + c.queueWg.Add(1) + defer func() { + log.Error(errors.Wrapf(c.queueDB.Close(), "failed to close email queue db")) + c.queueWg.Done() + }() + for rootCtx.Err() == nil { + now := time.Now() + reqCtx, reqCancel := context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // . + if lockErr := emailQueueLock.EnsureLocked(reqCtx); lockErr != nil { + reqCancel() + if errors.Is(lockErr, storage.ErrTxAborted) { + return + } + if errors.Is(lockErr, storage.ErrMutexNotLocked) { + _ = wait(rootCtx, stdlibtime.Duration(1+rand.IntN(4))*stdlibtime.Second) //nolint:errcheck,gosec,gomnd // Nothing to rollback. + + continue + } + } + reqCancel() + reqCtx, reqCancel = context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // . + emails, scores, rateLimit, err := c.dequeueNextEmails(reqCtx) //nolint:contextcheck // Background context. + if err != nil { + log.Error(errors.Wrapf(err, "failed to fetch next %v emails in queue", email.MaxBatchSize)) + reqCancel() + _ = wait(rootCtx, 1*stdlibtime.Second) //nolint:errcheck // Noting to rollback. + + continue + } + reqCancel() + + if len(emails) == 0 { + log.Info("No emails in queue for sending") + _ = wait(rootCtx, 10*stdlibtime.Second) //nolint:errcheck,gomnd // Nothing to rollback. + + continue + } + + rlCount, rlDuration, rlErr := parseRateLimit(rateLimit) + if rlErr != nil { + log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores), "failed to rollback emails %#v back to queue", emails)) + log.Panic(errors.Wrapf(rlErr, "failed to parse rate limit for email queue %v", rateLimit)) //nolint:revive // . + } + limit := int(math.Min(float64(rlCount), float64(len(emails)))) + if rlCount < len(emails) { + log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails[rlCount:], scores), "failed to rollback emails %#v back to queue cuz rate limit %v is less than batch %v", emails[rlCount:], rlCount, email.MaxBatchSize)) //nolint:lll // . + emails = emails[:rlCount] + } + + reqCtx, reqCancel = context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // . + loginInformation, err := c.fetchLoginInformationForEmailBatch(reqCtx, now, emails, limit) + if err != nil { + log.Error(errors.Wrapf(err, "failed to fetch login information for emails: %v", emails)) + reqCancel() + log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores), "failed to rollback emails %#v back to queue", emails)) + _ = wait(rootCtx, 1*stdlibtime.Second) //nolint:errcheck // Already rolled back. + + continue + } + reqCancel() + lastTimeBatchProcessingDuration := time.Now().Sub(*lastProcessed.Time) + rateLimitEstimationDuration := lastTimeBatchProcessingDuration * stdlibtime.Duration(int64(rlCount)/int64(len(emails))) + if rateLimitEstimationDuration < rlDuration { + oneBatchProcessingTimeToRespectRateLimit := stdlibtime.Duration(int64(len(emails))/int64(rlCount)) * rlDuration + if wait(rootCtx, oneBatchProcessingTimeToRespectRateLimit) != nil { + log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(emails, scores), "failed to rollback fetched emails %#v back to queue", emails)) + + continue + } + } + reqCtx, reqCancel = context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // . + if failed, sErr := c.sendEmails(reqCtx, loginInformation); sErr != nil { + reqCancel() + log.Error(errors.Wrapf(sErr, "failed to send email batch for emails %#v", failed)) + log.Error(errors.Wrapf(c.rollbackEmailsBackToQueue(failed, scores), "failed to rollback failed emails %#v back to queue", failed)) + stdlibtime.Sleep(1 * stdlibtime.Second) + + continue + } + reqCancel() + lastProcessed = time.Now() + } +} + +func (c *client) rollbackEmailsBackToQueue(failed []string, scores map[string]int64) error { + rollbackCtx, rollbackCancel := context.WithTimeout(context.Background(), 30*stdlibtime.Second) //nolint:gomnd // . + defer rollbackCancel() + failedZ := make([]redis.Z, 0, len(failed)) + for _, failedEmail := range failed { + failedZ = append(failedZ, redis.Z{ + Score: float64(scores[failedEmail]), + Member: failedEmail, + }) + } + + return errors.Wrapf(c.queueDB.ZAddNX(rollbackCtx, loginQueueKey, failedZ...).Err(), "failed to rollback unsent emails %#v", failed) +} + +//nolint:gocritic,revive // We need all the results from the pipeline +func (c *client) dequeueNextEmails(ctx context.Context) (emailsBatch []string, scores map[string]int64, rateLimit string, err error) { + var pipeRes []redis.Cmder + pipeRes, err = c.queueDB.TxPipelined(ctx, func(pipeliner redis.Pipeliner) error { + if zpopErr := pipeliner.ZPopMin(ctx, loginQueueKey, email.MaxBatchSize).Err(); zpopErr != nil { + return zpopErr //nolint:wrapcheck // . + } + + return pipeliner.Get(ctx, loginRateLimitKey).Err() + }) + if err != nil { + return nil, nil, "", errors.Wrapf(err, "failed to fetch email queue batch") + } + if zpopErr := pipeRes[0].Err(); zpopErr != nil { + return nil, nil, "", errors.Wrapf(zpopErr, "failed to fetch %v email queue batch", pipeRes[0].String()) + } + if len(pipeRes) > 1 { + if rateErr := pipeRes[1].Err(); rateErr != nil { + return nil, nil, "", errors.Wrapf(rateErr, "failed to fetch %v email sending rate", pipeRes[1].String()) + } + } + batch := pipeRes[0].(*redis.ZSliceCmd).Val() //nolint:forcetypeassert // . + emailsBatch = make([]string, 0, len(batch)) + scores = make(map[string]int64, 0) + for _, itemInBatch := range batch { + emailsBatch = append(emailsBatch, itemInBatch.Member.(string)) //nolint:forcetypeassert // . + scores[emailsBatch[len(emailsBatch)-1]] = int64(itemInBatch.Score) + } + rate := pipeRes[1].(*redis.StringCmd).Val() //nolint:forcetypeassert // . + + return emailsBatch, scores, rate, nil +} + +func (c *client) fetchLoginInformationForEmailBatch(ctx context.Context, now *time.Time, emails []string, limit int) ([]*emailLinkSignIn, error) { + sql := fmt.Sprintf(` + SELECT * FROM public.email_link_sign_ins + WHERE email = ANY($1) AND created_at > ($2::TIMESTAMP - (%[2]v * interval '1 second')) + ORDER BY created_at DESC + LIMIT %[1]v;`, limit, c.cfg.EmailValidation.ExpirationTime.Seconds()) + res, err := storage.Select[emailLinkSignIn](ctx, c.db, sql, emails, now.Time) + + return res, err +} + +func parseRateLimit(rateLimit string) (int, stdlibtime.Duration, error) { + spl := strings.Split(rateLimit, ":") + rateLimitCount, rlErr := strconv.Atoi(spl[0]) + if rlErr != nil { + return 0, stdlibtime.Duration(0), rlErr + } + rateLimitDuration, rlErr := stdlibtime.ParseDuration(spl[1]) + if rlErr != nil { + return 0, stdlibtime.Duration(0), rlErr + } + + return rateLimitCount, rateLimitDuration, nil +} + +func (c *client) sendEmails(ctx context.Context, emails []*emailLinkSignIn) (failed []string, err error) { + emailsByLanguage := make(map[string][]string) + confCodesByLanguage := make(map[string][]string) + for _, userEmail := range emails { + emailsByLanguage[userEmail.Language] = append(emailsByLanguage[userEmail.Language], userEmail.Email) + confCodesByLanguage[userEmail.Language] = append(confCodesByLanguage[userEmail.Language], userEmail.ConfirmationCode) + } + var mErr *multierror.Error + for language := range emailsByLanguage { + if sErr := c.sendEmailWithType(ctx, signInEmailType, language, emailsByLanguage[language], confCodesByLanguage[language]); sErr != nil { + mErr = multierror.Append(mErr, errors.Wrapf(sErr, "failed to send emails for language %v: %#v", language, emailsByLanguage[language])) + failed = append(failed, emailsByLanguage[language]...) + } + } + + return failed, mErr.ErrorOrNil() //nolint:wrapcheck // . +} + +func wait(ctx context.Context, d stdlibtime.Duration) error { + select { + case <-stdlibtime.After(d): + return nil + case <-ctx.Done(): + log.Info("cancelled") + + return context.Canceled + } +} diff --git a/auth/email_link/users.go b/auth/email_link/users.go index 99f6e20e..7d1293a0 100644 --- a/auth/email_link/users.go +++ b/auth/email_link/users.go @@ -87,7 +87,7 @@ func (c *client) getUserByIDOrPk(ctx context.Context, userID string, id *loginID phone_number_to_email_migration_user_id, email, $3 AS device_unique_id, - 'en' AS language, + language, COALESCE((account_metadata.metadata -> 'hash_code')::BIGINT,0) AS hash_code, account_metadata.metadata, 2 AS idx diff --git a/cmd/eskimo-hut/api/docs.go b/cmd/eskimo-hut/api/docs.go index a1442f2e..344471b6 100644 --- a/cmd/eskimo-hut/api/docs.go +++ b/cmd/eskimo-hut/api/docs.go @@ -2208,6 +2208,14 @@ const docTemplate = `{ "loginSession": { "type": "string", "example": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJPbmxpbmUgSldUIEJ1aWxkZXIiLCJpYXQiOjE2ODQzMjQ0NTYsImV4cCI6MTcxNTg2MDQ1NiwiYXVkIjoiIiwic3ViIjoianJvY2tldEBleGFtcGxlLmNvbSIsIm90cCI6IjUxMzRhMzdkLWIyMWEtNGVhNi1hNzk2LTAxOGIwMjMwMmFhMCJ9.q3xa8Gwg2FVCRHLZqkSedH3aK8XBqykaIy85rRU40nM" + }, + "positionInQueue": { + "type": "integer", + "example": 675 + }, + "rateLimit": { + "type": "string", + "example": "1000:24h" } } }, diff --git a/cmd/eskimo-hut/api/swagger.json b/cmd/eskimo-hut/api/swagger.json index a42d7b03..e045e64d 100644 --- a/cmd/eskimo-hut/api/swagger.json +++ b/cmd/eskimo-hut/api/swagger.json @@ -2201,6 +2201,14 @@ "loginSession": { "type": "string", "example": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJPbmxpbmUgSldUIEJ1aWxkZXIiLCJpYXQiOjE2ODQzMjQ0NTYsImV4cCI6MTcxNTg2MDQ1NiwiYXVkIjoiIiwic3ViIjoianJvY2tldEBleGFtcGxlLmNvbSIsIm90cCI6IjUxMzRhMzdkLWIyMWEtNGVhNi1hNzk2LTAxOGIwMjMwMmFhMCJ9.q3xa8Gwg2FVCRHLZqkSedH3aK8XBqykaIy85rRU40nM" + }, + "positionInQueue": { + "type": "integer", + "example": 675 + }, + "rateLimit": { + "type": "string", + "example": "1000:24h" } } }, diff --git a/cmd/eskimo-hut/api/swagger.yaml b/cmd/eskimo-hut/api/swagger.yaml index db5bc260..2912d2e6 100644 --- a/cmd/eskimo-hut/api/swagger.yaml +++ b/cmd/eskimo-hut/api/swagger.yaml @@ -6,6 +6,12 @@ definitions: loginSession: example: eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJPbmxpbmUgSldUIEJ1aWxkZXIiLCJpYXQiOjE2ODQzMjQ0NTYsImV4cCI6MTcxNTg2MDQ1NiwiYXVkIjoiIiwic3ViIjoianJvY2tldEBleGFtcGxlLmNvbSIsIm90cCI6IjUxMzRhMzdkLWIyMWEtNGVhNi1hNzk2LTAxOGIwMjMwMmFhMCJ9.q3xa8Gwg2FVCRHLZqkSedH3aK8XBqykaIy85rRU40nM type: string + positionInQueue: + example: 675 + type: integer + rateLimit: + example: 1000:24h + type: string type: object main.CreateUserRequestBody: properties: diff --git a/cmd/eskimo-hut/auth.go b/cmd/eskimo-hut/auth.go index c6a89693..bc573349 100644 --- a/cmd/eskimo-hut/auth.go +++ b/cmd/eskimo-hut/auth.go @@ -63,7 +63,8 @@ func (s *service) SendSignInLinkToEmail( //nolint:gocritic,funlen // . return nil, server.BadRequest(err, invalidEmail) } ctx = emaillink.ContextWithPhoneNumberToEmailMigration(ctx, req.Data.UserID) //nolint:revive // Not a problem. - loginSession, err := s.authEmailLinkClient.SendSignInLinkToEmail(ctx, email, req.Data.DeviceUniqueID, req.Data.Language, req.ClientIP.String()) + posInQueue, rateLimit, loginSession, err := s.authEmailLinkClient.SendSignInLinkToEmail(ctx, email, req.Data.DeviceUniqueID, + req.Data.Language, req.ClientIP.String()) if err != nil { switch { case errors.Is(err, emaillink.ErrUserBlocked): @@ -83,7 +84,7 @@ func (s *service) SendSignInLinkToEmail( //nolint:gocritic,funlen // . } } - return server.OK[Auth](&Auth{LoginSession: loginSession}), nil + return server.OK[Auth](&Auth{LoginSession: loginSession, PositionInQueue: posInQueue, RateLimit: rateLimit}), nil } // SignIn godoc diff --git a/cmd/eskimo-hut/contract.go b/cmd/eskimo-hut/contract.go index 719f7b88..11ed03f3 100644 --- a/cmd/eskimo-hut/contract.go +++ b/cmd/eskimo-hut/contract.go @@ -146,7 +146,9 @@ type ( KycFaceAvailable bool `json:"kycFaceAvailable,omitempty" example:"true"` } Auth struct { - LoginSession string `json:"loginSession" example:"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJPbmxpbmUgSldUIEJ1aWxkZXIiLCJpYXQiOjE2ODQzMjQ0NTYsImV4cCI6MTcxNTg2MDQ1NiwiYXVkIjoiIiwic3ViIjoianJvY2tldEBleGFtcGxlLmNvbSIsIm90cCI6IjUxMzRhMzdkLWIyMWEtNGVhNi1hNzk2LTAxOGIwMjMwMmFhMCJ9.q3xa8Gwg2FVCRHLZqkSedH3aK8XBqykaIy85rRU40nM"` //nolint:lll // . + RateLimit string `json:"rateLimit" example:"1000:24h"` + LoginSession string `json:"loginSession" example:"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJPbmxpbmUgSldUIEJ1aWxkZXIiLCJpYXQiOjE2ODQzMjQ0NTYsImV4cCI6MTcxNTg2MDQ1NiwiYXVkIjoiIiwic3ViIjoianJvY2tldEBleGFtcGxlLmNvbSIsIm90cCI6IjUxMzRhMzdkLWIyMWEtNGVhNi1hNzk2LTAxOGIwMjMwMmFhMCJ9.q3xa8Gwg2FVCRHLZqkSedH3aK8XBqykaIy85rRU40nM"` //nolint:lll // . + PositionInQueue int64 `json:"positionInQueue" example:"675"` } RefreshedToken struct { *auth.Tokens diff --git a/cmd/eskimo-hut/eskimo_hut.go b/cmd/eskimo-hut/eskimo_hut.go index dbec758f..8e2333c0 100644 --- a/cmd/eskimo-hut/eskimo_hut.go +++ b/cmd/eskimo-hut/eskimo_hut.go @@ -53,7 +53,7 @@ func (s *service) RegisterRoutes(router *server.Router) { func (s *service) Init(ctx context.Context, cancel context.CancelFunc) { s.usersProcessor = users.StartProcessor(ctx, cancel) - s.authEmailLinkClient = emaillink.NewClient(ctx, s.usersProcessor, server.Auth(ctx)) + s.authEmailLinkClient = emaillink.NewClient(ctx, cancel, s.usersProcessor, server.Auth(ctx)) s.telegramAuthClient = telegramauth.NewClient(ctx, server.Auth(ctx)) s.tokenRefresher = auth.NewRefresher(server.Auth(ctx), s.authEmailLinkClient, s.telegramAuthClient) s.socialRepository = social.New(ctx, s.usersProcessor) @@ -78,5 +78,8 @@ func (s *service) Close(ctx context.Context) error { func (s *service) CheckHealth(ctx context.Context) error { log.Debug("checking health...", "package", "users") - return errors.Wrapf(s.usersProcessor.CheckHealth(ctx), "processor health check failed") + return multierror.Append( //nolint:wrapcheck // Not needed. + errors.Wrapf(s.usersProcessor.CheckHealth(ctx), "processor health check failed"), + errors.Wrapf(s.authEmailLinkClient.CheckHealth(ctx), "email client health check failed"), + ).ErrorOrNil() } diff --git a/cmd/eskimo-hut/users.go b/cmd/eskimo-hut/users.go index 2c0238be..00c9c72f 100644 --- a/cmd/eskimo-hut/users.go +++ b/cmd/eskimo-hut/users.go @@ -234,7 +234,7 @@ func (s *service) emailUpdateRequested( language = oldUser.Language } - if loginSession, err = s.authEmailLinkClient.SendSignInLinkToEmail( + if _, _, loginSession, err = s.authEmailLinkClient.SendSignInLinkToEmail( users.ConfirmedEmailContext(ctx, loggedInUser.Email), newEmail, deviceID, language, "", ); err != nil { diff --git a/cmd/scripts/merge_firebase_phone_login_with_ice_email_login/merge_firebase_phone_login_with_ice_email_login.go b/cmd/scripts/merge_firebase_phone_login_with_ice_email_login/merge_firebase_phone_login_with_ice_email_login.go index 9c021905..7916222b 100644 --- a/cmd/scripts/merge_firebase_phone_login_with_ice_email_login/merge_firebase_phone_login_with_ice_email_login.go +++ b/cmd/scripts/merge_firebase_phone_login_with_ice_email_login/merge_firebase_phone_login_with_ice_email_login.go @@ -47,7 +47,7 @@ type ( func main() { usersProcessor := users.StartProcessor(context.Background(), func() {}) authClient := auth.New(context.Background(), applicationYamlAuthKey) - authEmailLinkClient := emaillink.NewClient(context.Background(), usersProcessor, authClient) + authEmailLinkClient := emaillink.NewClient(context.Background(), nil, usersProcessor, authClient) db := storage.MustConnect(context.Background(), ddl, applicationYamlEskimoKey) defer db.Close() defer usersProcessor.Close()