Skip to content

Commit

Permalink
backfill: implement loop
Browse files Browse the repository at this point in the history
Signed-off-by: Sumner Evans <sumner@beeper.com>
  • Loading branch information
sumnerevans committed Aug 3, 2023
1 parent 1af5233 commit 8502167
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 41 deletions.
205 changes: 205 additions & 0 deletions backfillqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// mautrix-imessage - A Matrix-iMessage puppeting bridge.
// Copyright (C) 2023 Tulir Asokan, Sumner Evans
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package main

import (
"context"
"time"

log "maunium.net/go/maulogger/v2"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/id"
"maunium.net/go/mautrix/util/dbutil"

"go.mau.fi/mautrix-imessage/database"
"go.mau.fi/mautrix-imessage/imessage"
)

type BackfillQueue struct {
BackfillQuery *database.BackfillQuery
reCheckChannels []chan bool
log log.Logger
}

func (bq *BackfillQueue) ReCheck() {
bq.log.Infofln("Sending re-checks to %d channels", len(bq.reCheckChannels))
for _, channel := range bq.reCheckChannels {
go func(c chan bool) {
c <- true
}(channel)
}
}

func (bq *BackfillQueue) GetNextBackfill(userID id.UserID, reCheckChannel chan bool) *database.Backfill {
for {
if backfill := bq.BackfillQuery.GetNext(userID); backfill != nil {
backfill.MarkDispatched()
return backfill
}

select {
case <-reCheckChannel:
case <-time.After(time.Minute):
}
}
}

func (user *User) HandleBackfillRequestsLoop() {
reCheckChannel := make(chan bool)
user.BackfillQueue.reCheckChannels = append(user.BackfillQueue.reCheckChannels, reCheckChannel)

for {
user.log.Infofln("Getting backfill request")
req := user.BackfillQueue.GetNextBackfill(user.MXID, reCheckChannel)
user.log.Infofln("Handling backfill request %s", req)

portal := user.bridge.GetPortalByGUID(req.PortalGUID)
user.backfillInChunks(req, portal)
req.MarkDone()
}
}

func (user *User) EnqueueImmediateBackfill(txn dbutil.Execable, priority int, portal *Portal, earliestBridgedTimestamp time.Time) {
maxMessages := user.bridge.Config.Bridge.Backfill.Immediate.MaxEvents
initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, priority, portal.GUID, nil, &earliestBridgedTimestamp, maxMessages, maxMessages, 0)
initialBackfill.Insert(txn)
}

func (user *User) EnqueueDeferredBackfills(txn dbutil.Execable, portals []*Portal, startIdx int) {
now := time.Now()
numPortals := len(portals)
for stageIdx, backfillStage := range user.bridge.Config.Bridge.Backfill.Deferred {
for portalIdx, portal := range portals {
var startDate *time.Time = nil
if backfillStage.StartDaysAgo > 0 {
startDaysAgo := now.AddDate(0, 0, -backfillStage.StartDaysAgo)
startDate = &startDaysAgo
}
backfillMessages := user.bridge.DB.Backfill.NewWithValues(
user.MXID, startIdx+stageIdx*numPortals+portalIdx, portal.GUID, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay)
backfillMessages.Insert(txn)
}
}
}

func (user *User) runOnlyBackfillMode() {
log := user.bridge.ZLog.With().Str("mode", "backfill_only").Logger()
ctx := log.WithContext(context.Background())

err := user.bridge.Crypto.ShareOneTimeKeys(context.Background())
if err != nil {
// TODO do something more intelligent here
log.Fatal().Err(err).Msg("Error sharing keys")
}

if !user.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) {
log.Fatal().Msg("The homeserver does not support Beeper's batch send endpoint, are you sure you connected to the correct endpoint?")
}

// TODO exit with a specific code if there's nothing more to backfill

// Start the backfill queue
user.handleHistorySyncsLoop()

if count, err := user.bridge.DB.Backfill.Count(ctx); err != nil {
log.Fatal().Err(err).Msg("Error retrieving backfill queue count")
} else if count > 0 {
// If there are already items in the backfill queue, then we just need to
// run the backfill queue.
return
}

// Get every chat from the database
chats, err := user.bridge.IM.GetChatsWithMessagesAfter(imessage.AppleEpoch)
if err != nil {
log.Fatal().Err(err).Msg("Error retrieving all chats")
}

chatGUIDs := make([]string, len(chats))
chatInfos := make([]*imessage.ChatInfo, len(chats))
for i, chat := range chats {
chatGUIDs[i] = chat.ChatGUID
chatInfos[i], err = user.bridge.IM.GetChatInfo(chat.ChatGUID, chat.ThreadID)
if err != nil {
log.Fatal().Err(err).Str("chat_guid", chat.ChatGUID).Msg("Error getting chat info from database")
}
chatInfos[i].JSONChatGUID = chatInfos[i].Identifier.String()
}

// Ask the cloud bridge to create room IDs for every one of the chats.
client := user.CustomIntent().Client
url := client.BuildURL(mautrix.BaseURLPath{
"_matrix", "asmux", "mxauth", "appservice", user.MXID.Localpart(), "imessagecloud",
"exec", "create_rooms_for_backfill"})
_, err = client.MakeRequest("POST", url, NewRoomBackfillRequest{
Chats: chatInfos,
}, nil)
if err != nil {
log.Fatal().Err(err).Msg("Error starting creation of backfill rooms")
}

// Wait for the rooms to be created.
var resp RoomInfoForBackfillResponse
for {
url = client.BuildURL(mautrix.BaseURLPath{
"_matrix", "asmux", "mxauth", "appservice", user.MXID.Localpart(), "imessagecloud",
"exec", "get_room_info_for_backfill"})
_, err = client.MakeRequest("POST", url, RoomInfoForBackfillRequest{
ChatGUIDs: chatGUIDs,
}, &resp)
if err != nil {
// TODO do something more intelligent here
log.Fatal().Err(err).Msg("Error requesting backfill room info")
}

if resp.Status == RoomCreationForBackfillStatusDone {
break
} else if resp.Status == RoomCreationForBackfillStatusError {
// TODO do something more intelligent here
log.Fatal().Str("error", resp.Error).Msg("Error requesting backfill room IDs")
} else if resp.Status == RoomCreationForBackfillStatusInProgress {
log.Info().Msg("Backfill room creation still in progress, waiting 5 seconds")
time.Sleep(5 * time.Second)
} else {
// TODO do something more intelligent here
log.Fatal().Err(err).Str("status", string(resp.Status)).Msg("Unknown status")
}
}

// Create all of the portals locally and enqueue backfill requests for
// all of them.
portals := []*Portal{}
txn, err := user.bridge.DB.Begin()
{
var i int
for guid, roomInfo := range resp.Rooms {
portal := user.bridge.GetPortalByGUIDWithTransaction(txn, guid)
portal.MXID = roomInfo.RoomID
portal.Update(txn)
portals = append(portals, portal)
user.EnqueueImmediateBackfill(txn, i, portal, time.UnixMilli(roomInfo.EarliestBridgedTimestamp))
i++
}
user.EnqueueDeferredBackfills(txn, portals, i)
}
if err = txn.Commit(); err != nil {
// TODO do something intelligent here
log.Fatal().Err(err).Msg("Error committing backfill room IDs")
}

return
}
17 changes: 0 additions & 17 deletions database/backfillqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,6 @@ func (bq *BackfillQuery) Count(ctx context.Context) (count int, err error) {
return
}

func (bq *BackfillQuery) HasUnstartedOrInFlight(userID id.UserID) bool {
bq.backfillQueryLock.Lock()
defer bq.backfillQueryLock.Unlock()

rows, err := bq.db.Query(getUnstartedOrInFlightQuery, userID)
if err != nil || rows == nil {
if err != nil && !errors.Is(err, sql.ErrNoRows) {
bq.log.Warnfln("Failed to query backfill queue jobs: %v", err)
}
// No rows means that there are no unstarted or in flight backfill
// requests.
return false
}
defer rows.Close()
return rows.Next()
}

func (bq *BackfillQuery) DeleteAll(userID id.UserID) {
bq.backfillQueryLock.Lock()
defer bq.backfillQueryLock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/strukturag/libheif v1.16.2
github.com/tidwall/gjson v1.14.4
golang.org/x/crypto v0.11.0
golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b
golang.org/x/image v0.7.0
maunium.net/go/mauflag v1.0.0
maunium.net/go/maulogger/v2 v2.4.1
Expand All @@ -28,7 +29,6 @@ require (
github.com/tidwall/sjson v1.2.5 // indirect
github.com/yuin/goldmark v1.5.4 // indirect
go.mau.fi/zeroconfig v0.1.2 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI=
golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/image v0.7.0 h1:gzS29xtG1J5ybQlv0PuyfE3nmc6R4qB73m6LUUmvFuw=
golang.org/x/image v0.7.0/go.mod h1:nd/q4ef1AKKYl/4kft7g+6UyGbdiqWqTP1ZAbRoV7Rg=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
Expand Down
Loading

0 comments on commit 8502167

Please sign in to comment.