From f3a1e3f85b613ba920eec3151e22361d22f4dd25 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Fri, 7 Jul 2023 16:03:56 -0600 Subject: [PATCH 01/13] config: add options for backfill queue Signed-off-by: Sumner Evans --- config/bridge.go | 12 ++++++++++++ config/upgrade.go | 3 +++ example-config.yaml | 47 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/config/bridge.go b/config/bridge.go index 73627135..b7dbb7d7 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -27,6 +27,12 @@ import ( "maunium.net/go/mautrix/id" ) +type DeferredConfig struct { + StartDaysAgo int `yaml:"start_days_ago"` + MaxBatchEvents int `yaml:"max_batch_events"` + BatchDelay int `yaml:"batch_delay"` +} + type BridgeConfig struct { User id.UserID `yaml:"user"` @@ -48,9 +54,15 @@ type BridgeConfig struct { DoublePuppetServerURL string `yaml:"double_puppet_server_url"` Backfill struct { Enable bool `yaml:"enable"` + OnlyBackfill bool `yaml:"only_backfill"` InitialLimit int `yaml:"initial_limit"` InitialSyncMaxAge float64 `yaml:"initial_sync_max_age"` UnreadHoursThreshold int `yaml:"unread_hours_threshold"` + Immediate struct { + MaxEvents int `yaml:"max_events"` + } `yaml:"immediate"` + + Deferred []DeferredConfig `yaml:"deferred"` } `yaml:"backfill"` PeriodicSync bool `yaml:"periodic_sync"` FindPortalsIfEmpty bool `yaml:"find_portals_if_db_empty"` diff --git a/config/upgrade.go b/config/upgrade.go index 0f8150c5..15bdcb05 100644 --- a/config/upgrade.go +++ b/config/upgrade.go @@ -80,6 +80,9 @@ func DoUpgrade(helper *up.Helper) { } helper.Copy(up.Bool, "bridge", "backfill", "enable") helper.Copy(up.Int, "bridge", "backfill", "unread_hours_threshold") + helper.Copy(up.Bool, "bridge", "backfill", "only_backfill") + helper.Copy(up.Int, "bridge", "backfill", "immediate", "max_events") + helper.Copy(up.List, "bridge", "backfill", "deferred") helper.Copy(up.Bool, "bridge", "periodic_sync") helper.Copy(up.Bool, "bridge", "find_portals_if_db_empty") if legacyMediaViewerURL, ok := helper.Get(up.Str, "bridge", "media_viewer_url"); ok && legacyMediaViewerURL != "" { diff --git a/example-config.yaml b/example-config.yaml index a3ae067f..fb5d04d4 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -158,6 +158,53 @@ bridge: # If a backfilled chat is older than this number of hours, mark it as read even if it's unread on iMessage. # Set to -1 to let any chat be unread. unread_hours_threshold: 720 + + ######################################################################### + # The settings below are only applicable if you are: # + # # + # 1. Using batch sending, which is no longer supported in Synapse. # + # 2. Running the bridge in backfill-only mode connecting to another # + # instance for portal creation via websocket commands. # + # # + # In other words, unless you are Beeper, the rest of the backfill # + # section very likely does not apply to you. # + ######################################################################### + # Is this bridge only meant for backfilling chats? + only_backfill: false + + # Settings for immediate backfills. These backfills should generally be small and their main purpose is + # to populate each of the initial chats (as configured by max_initial_conversations) with a few messages + # so that you can continue conversations without losing context. + immediate: + # The maximum number of events to backfill initially. + max_events: 25 + # Settings for deferred backfills. The purpose of these backfills are to fill in the rest of + # the chat history that was not covered by the immediate backfills. + # These backfills generally should happen at a slower pace so as not to overload the homeserver. + # Each deferred backfill config should define a "stage" of backfill (i.e. the last week of messages). + # The fields are as follows: + # - start_days_ago: the number of days ago to start backfilling from. + # To indicate the start of time, use -1. For example, for a week ago, use 7. + # - max_batch_events: the number of events to send per batch. + # - batch_delay: the number of seconds to wait before backfilling each batch. + deferred: + # Last Week + - start_days_ago: 7 + max_batch_events: 50 + batch_delay: 5 + # Last Month + - start_days_ago: 30 + max_batch_events: 100 + batch_delay: 10 + # Last 3 months + - start_days_ago: 90 + max_batch_events: 250 + batch_delay: 10 + # The start of time + - start_days_ago: -1 + max_batch_events: 500 + batch_delay: 10 + # Whether or not the bridge should periodically resync chat and contact info. periodic_sync: true # Should the bridge look through joined rooms to find existing portals if the database has none? From 67aac768498d7945b7697ab1112d5facbe0bca6f Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Mon, 31 Jul 2023 15:09:19 -0600 Subject: [PATCH 02/13] db: add backfill queue Signed-off-by: Sumner Evans --- database/backfillqueue.go | 332 ++++++++++++++++++ database/database.go | 5 + database/message.go | 10 + database/upgrades/00-latest-schema.sql | 33 +- database/upgrades/21-prioritized-backfill.sql | 32 ++ 5 files changed, 411 insertions(+), 1 deletion(-) create mode 100644 database/backfillqueue.go create mode 100644 database/upgrades/21-prioritized-backfill.sql diff --git a/database/backfillqueue.go b/database/backfillqueue.go new file mode 100644 index 00000000..0e664a74 --- /dev/null +++ b/database/backfillqueue.go @@ -0,0 +1,332 @@ +// mautrix-imessage - A Matrix-iMessage puppeting bridge. +// Copyright (C) 2023 Tulir Asokan, Sumner Evans +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package database + +import ( + "context" + "database/sql" + "errors" + "fmt" + "sync" + "time" + + log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix/id" + "maunium.net/go/mautrix/util/dbutil" +) + +type BackfillQuery struct { + db *Database + log log.Logger + + backfillQueryLock sync.Mutex +} + +func (bq *BackfillQuery) New() *Backfill { + return &Backfill{ + db: bq.db, + log: bq.log, + } +} + +func (bq *BackfillQuery) NewWithValues(userID id.UserID, priority int, portalGUID string, timeStart, timeEnd *time.Time, maxBatchEvents, maxTotalEvents, batchDelay int) *Backfill { + return &Backfill{ + db: bq.db, + log: bq.log, + UserID: userID, + Priority: priority, + PortalGUID: portalGUID, + TimeStart: timeStart, + TimeEnd: timeEnd, + MaxBatchEvents: maxBatchEvents, + MaxTotalEvents: maxTotalEvents, + BatchDelay: batchDelay, + } +} + +const ( + getNextBackfillQuery = ` + SELECT queue_id, user_mxid, priority, portal_guid, time_start, time_end, max_batch_events, max_total_events, batch_delay + FROM backfill_queue + WHERE user_mxid=$1 + AND ( + dispatch_time IS NULL + OR ( + dispatch_time < $2 + AND completed_at IS NULL + ) + ) + ORDER BY priority + LIMIT 1 + ` + getIncompleteCountQuery = ` + SELECT COUNT(*) + FROM backfill_queue + WHERE user_mxid=$1 AND completed_at IS NULL + ` + getCountQuery = ` + SELECT COUNT(*) + FROM backfill_queue + WHERE user_mxid=$1 + ` + getUnstartedOrInFlightQuery = ` + SELECT 1 + FROM backfill_queue + WHERE user_mxid=$1 + AND (dispatch_time IS NULL OR completed_at IS NULL) + LIMIT 1 + ` +) + +// GetNext returns the next backfill to perform +func (bq *BackfillQuery) GetNext(userID id.UserID) (backfill *Backfill) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + + rows, err := bq.db.Query(getNextBackfillQuery, userID, time.Now().Add(-15*time.Minute)) + if err != nil || rows == nil { + bq.log.Errorfln("Failed to query next backfill queue job: %v", err) + return + } + defer rows.Close() + if rows.Next() { + backfill = bq.New().Scan(rows) + } + return +} + +func (bq *BackfillQuery) IncompleteCount(ctx context.Context, userID id.UserID) (incompleteCount int, err error) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + + row := bq.db.QueryRowContext(ctx, getIncompleteCountQuery, userID) + err = row.Scan(&incompleteCount) + return +} + +func (bq *BackfillQuery) Count(ctx context.Context, userID id.UserID) (count int, err error) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + + row := bq.db.QueryRowContext(ctx, getCountQuery, userID) + err = row.Scan(&count) + return +} + +func (bq *BackfillQuery) HasUnstartedOrInFlight(userID id.UserID) bool { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + + rows, err := bq.db.Query(getUnstartedOrInFlightQuery, userID) + if err != nil || rows == nil { + if err != nil && !errors.Is(err, sql.ErrNoRows) { + bq.log.Warnfln("Failed to query backfill queue jobs: %v", err) + } + // No rows means that there are no unstarted or in flight backfill + // requests. + return false + } + defer rows.Close() + return rows.Next() +} + +func (bq *BackfillQuery) DeleteAll(userID id.UserID) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + _, err := bq.db.Exec("DELETE FROM backfill_queue WHERE user_mxid=$1", userID) + if err != nil { + bq.log.Warnfln("Failed to delete backfill queue items for %s: %v", userID, err) + } +} + +func (bq *BackfillQuery) DeleteAllForPortal(userID id.UserID, portalGUID string) { + bq.backfillQueryLock.Lock() + defer bq.backfillQueryLock.Unlock() + _, err := bq.db.Exec(` + DELETE FROM backfill_queue + WHERE user_mxid=$1 + AND portal_guid=$2 + `, userID, portalGUID) + if err != nil { + bq.log.Warnfln("Failed to delete backfill queue items for %s/%s: %v", userID, portalGUID, err) + } +} + +type Backfill struct { + db *Database + log log.Logger + + // Fields + QueueID int + UserID id.UserID + Priority int + PortalGUID string + TimeStart *time.Time + TimeEnd *time.Time + MaxBatchEvents int + MaxTotalEvents int + BatchDelay int + DispatchTime *time.Time + CompletedAt *time.Time +} + +func (b *Backfill) String() string { + return fmt.Sprintf("Backfill{QueueID: %d, UserID: %s, Priority: %d, Portal: %s, TimeStart: %s, TimeEnd: %s, MaxBatchEvents: %d, MaxTotalEvents: %d, BatchDelay: %d, DispatchTime: %s, CompletedAt: %s}", + b.QueueID, b.UserID, b.Priority, b.PortalGUID, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.CompletedAt, b.DispatchTime, + ) +} + +func (b *Backfill) Scan(row dbutil.Scannable) *Backfill { + var maxTotalEvents, batchDelay sql.NullInt32 + err := row.Scan(&b.QueueID, &b.UserID, &b.Priority, &b.PortalGUID, &b.TimeStart, &b.TimeEnd, &b.MaxBatchEvents, &maxTotalEvents, &batchDelay) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + b.log.Errorln("Database scan failed:", err) + } + return nil + } + b.MaxTotalEvents = int(maxTotalEvents.Int32) + b.BatchDelay = int(batchDelay.Int32) + return b +} + +func (b *Backfill) Insert(txn dbutil.Execable) { + b.db.Backfill.backfillQueryLock.Lock() + defer b.db.Backfill.backfillQueryLock.Unlock() + + if txn == nil { + txn = b.db + } + rows, err := txn.Query(` + INSERT INTO backfill_queue + (user_mxid, priority, portal_guid, time_start, time_end, max_batch_events, max_total_events, batch_delay, dispatch_time, completed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + RETURNING queue_id + `, b.UserID, b.Priority, b.PortalGUID, b.TimeStart, b.TimeEnd, b.MaxBatchEvents, b.MaxTotalEvents, b.BatchDelay, b.DispatchTime, b.CompletedAt) + defer rows.Close() + if err != nil || !rows.Next() { + b.log.Warnfln("Failed to insert backfill for %s with priority %d: %v", b.PortalGUID, b.Priority, err) + return + } + err = rows.Scan(&b.QueueID) + if err != nil { + b.log.Warnfln("Failed to insert backfill for %s with priority %d: %v", b.PortalGUID, b.Priority, err) + } +} + +func (b *Backfill) MarkDispatched() { + b.db.Backfill.backfillQueryLock.Lock() + defer b.db.Backfill.backfillQueryLock.Unlock() + + if b.QueueID == 0 { + b.log.Errorfln("Cannot mark backfill as dispatched without queue_id. Maybe it wasn't actually inserted in the database?") + return + } + _, err := b.db.Exec("UPDATE backfill_queue SET dispatch_time=$1 WHERE queue_id=$2", time.Now(), b.QueueID) + if err != nil { + b.log.Warnfln("Failed to mark backfill with priority %d as dispatched: %v", b.Priority, err) + } +} + +func (b *Backfill) MarkDone() { + b.db.Backfill.backfillQueryLock.Lock() + defer b.db.Backfill.backfillQueryLock.Unlock() + + if b.QueueID == 0 { + b.log.Errorfln("Cannot mark backfill done without queue_id. Maybe it wasn't actually inserted in the database?") + return + } + _, err := b.db.Exec("UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2", time.Now(), b.QueueID) + if err != nil { + b.log.Warnfln("Failed to mark backfill with priority %d as complete: %v", b.Priority, err) + } +} + +func (bq *BackfillQuery) NewBackfillState(userID id.UserID, portalGUID string) *BackfillState { + return &BackfillState{ + db: bq.db, + log: bq.log, + UserID: userID, + PortalGUID: portalGUID, + } +} + +const ( + getBackfillState = ` + SELECT user_mxid, portal_guid, processing_batch, backfill_complete, first_expected_ts + FROM backfill_state + WHERE user_mxid=$1 + AND portal_guid=$2 + ` +) + +type BackfillState struct { + db *Database + log log.Logger + + // Fields + UserID id.UserID + PortalGUID string + ProcessingBatch bool + BackfillComplete bool + FirstExpectedTimestamp uint64 +} + +func (b *BackfillState) Scan(row dbutil.Scannable) *BackfillState { + err := row.Scan(&b.UserID, &b.PortalGUID, &b.ProcessingBatch, &b.BackfillComplete, &b.FirstExpectedTimestamp) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + b.log.Errorln("Database scan failed:", err) + } + return nil + } + return b +} + +func (b *BackfillState) Upsert() { + _, err := b.db.Exec(` + INSERT INTO backfill_state + (user_mxid, portal_guid, processing_batch, backfill_complete, first_expected_ts) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (user_mxid, portal_guid) + DO UPDATE SET + processing_batch=EXCLUDED.processing_batch, + backfill_complete=EXCLUDED.backfill_complete, + first_expected_ts=EXCLUDED.first_expected_ts`, + b.UserID, b.PortalGUID, b.ProcessingBatch, b.BackfillComplete, b.FirstExpectedTimestamp) + if err != nil { + b.log.Warnfln("Failed to insert backfill state for %s: %v", b.PortalGUID, err) + } +} + +func (b *BackfillState) SetProcessingBatch(processing bool) { + b.ProcessingBatch = processing + b.Upsert() +} + +func (bq *BackfillQuery) GetBackfillState(userID id.UserID, portalGUID string) (backfillState *BackfillState) { + rows, err := bq.db.Query(getBackfillState, userID, portalGUID) + if err != nil || rows == nil { + bq.log.Error(err) + return + } + defer rows.Close() + if rows.Next() { + backfillState = bq.NewBackfillState(userID, portalGUID).Scan(rows) + } + return +} diff --git a/database/database.go b/database/database.go index 53a48df7..981a87a4 100644 --- a/database/database.go +++ b/database/database.go @@ -35,6 +35,7 @@ type Database struct { Tapback *TapbackQuery KV *KeyValueQuery MergedChat *MergedChatQuery + Backfill *BackfillQuery } func New(parent *dbutil.Database, log maulogger.Logger) *Database { @@ -71,5 +72,9 @@ func New(parent *dbutil.Database, log maulogger.Logger) *Database { db: db, log: log.Sub("MergedChat"), } + db.Backfill = &BackfillQuery{ + db: db, + log: log.Sub("Backfill"), + } return db } diff --git a/database/message.go b/database/message.go index 1be6afa2..e1777309 100644 --- a/database/message.go +++ b/database/message.go @@ -92,6 +92,16 @@ func (mq *MessageQuery) GetLastInChat(chat string) *Message { return msg } +func (mq *MessageQuery) GetFirstInChat(chat string) *Message { + msg := mq.get("SELECT portal_guid, guid, part, mxid, sender_guid, handle_guid, timestamp "+ + "FROM message WHERE portal_guid=$1 ORDER BY timestamp ASC LIMIT 1", chat) + if msg == nil || msg.Timestamp == 0 { + // Old db, we don't know what the first message is. + return nil + } + return msg +} + func (mq *MessageQuery) GetEarliestTimestampInChat(chat string) (int64, error) { row := mq.db.QueryRow("SELECT MIN(timestamp) FROM message WHERE portal_guid=$1", chat) var timestamp int64 diff --git a/database/upgrades/00-latest-schema.sql b/database/upgrades/00-latest-schema.sql index 12ef0094..7b622542 100644 --- a/database/upgrades/00-latest-schema.sql +++ b/database/upgrades/00-latest-schema.sql @@ -1,4 +1,4 @@ --- v0 -> v20 (compatible with v18+): Latest schema +-- v0 -> v21 (compatible with v18+): Latest schema CREATE TABLE portal ( guid TEXT PRIMARY KEY, @@ -76,3 +76,34 @@ END; CREATE TRIGGER on_merge_delete_portal AFTER INSERT ON merged_chat WHEN NEW.source_guid<>NEW.target_guid BEGIN DELETE FROM portal WHERE guid=NEW.source_guid; END; + +CREATE TABLE backfill_queue ( + queue_id INTEGER PRIMARY KEY + -- only: postgres + GENERATED ALWAYS AS IDENTITY + , + user_mxid TEXT, + priority INTEGER NOT NULL, + portal_guid TEXT, + time_start TIMESTAMP, + time_end TIMESTAMP, + dispatch_time TIMESTAMP, + completed_at TIMESTAMP, + batch_delay INTEGER, + max_batch_events INTEGER NOT NULL, + max_total_events INTEGER, + + FOREIGN KEY (user_mxid) REFERENCES "user" (mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_guid) REFERENCES portal (guid) ON DELETE CASCADE +); + +CREATE TABLE backfill_state ( + user_mxid TEXT, + portal_guid TEXT, + processing_batch BOOLEAN, + backfill_complete BOOLEAN, + first_expected_ts BIGINT, + PRIMARY KEY (user_mxid, portal_guid), + FOREIGN KEY (user_mxid) REFERENCES "user" (mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_guid) REFERENCES portal (guid) ON DELETE CASCADE +); diff --git a/database/upgrades/21-prioritized-backfill.sql b/database/upgrades/21-prioritized-backfill.sql new file mode 100644 index 00000000..dfd19550 --- /dev/null +++ b/database/upgrades/21-prioritized-backfill.sql @@ -0,0 +1,32 @@ +-- v21 (compatible with v18+): Add backfill queue + +CREATE TABLE backfill_queue ( + queue_id INTEGER PRIMARY KEY + -- only: postgres + GENERATED ALWAYS AS IDENTITY + , + user_mxid TEXT, + priority INTEGER NOT NULL, + portal_guid TEXT, + time_start TIMESTAMP, + time_end TIMESTAMP, + dispatch_time TIMESTAMP, + completed_at TIMESTAMP, + batch_delay INTEGER, + max_batch_events INTEGER NOT NULL, + max_total_events INTEGER, + + FOREIGN KEY (user_mxid) REFERENCES "user" (mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_guid) REFERENCES portal (guid) ON DELETE CASCADE +); + +CREATE TABLE backfill_state ( + user_mxid TEXT, + portal_guid TEXT, + processing_batch BOOLEAN, + backfill_complete BOOLEAN, + first_expected_ts BIGINT, + PRIMARY KEY (user_mxid, portal_guid), + FOREIGN KEY (user_mxid) REFERENCES "user" (mxid) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (portal_guid) REFERENCES portal (guid) ON DELETE CASCADE +); From 298320c65d22d8f9ace117b3ce09c90b8a4b5a40 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Mon, 31 Jul 2023 15:13:03 -0600 Subject: [PATCH 03/13] api: add GetMessagesBetween and GetMessagesBeforeWithLimit and implement for mac Signed-off-by: Sumner Evans --- imessage/interface.go | 2 ++ imessage/ios/ipc.go | 8 +++++++ imessage/mac/database.go | 38 +++++++++++++++-------------- imessage/mac/messages.go | 52 +++++++++++++++++++++++++++++++++++----- 4 files changed, 76 insertions(+), 24 deletions(-) diff --git a/imessage/interface.go b/imessage/interface.go index a3d8bd80..cc4971a1 100644 --- a/imessage/interface.go +++ b/imessage/interface.go @@ -48,6 +48,8 @@ type API interface { Start(readyCallback func()) error Stop() GetMessagesSinceDate(chatID string, minDate time.Time, backfillID string) ([]*Message, error) + GetMessagesBetween(chatID string, minDate, maxDate time.Time) ([]*Message, error) + GetMessagesBeforeWithLimit(chatID string, before time.Time, limit int) ([]*Message, error) GetMessagesWithLimit(chatID string, limit int, backfillID string) ([]*Message, error) GetChatsWithMessagesAfter(minDate time.Time) ([]ChatIdentifier, error) GetMessage(guid string) (*Message, error) diff --git a/imessage/ios/ipc.go b/imessage/ios/ipc.go index e4b6b4f3..23ce903b 100644 --- a/imessage/ios/ipc.go +++ b/imessage/ios/ipc.go @@ -394,6 +394,14 @@ func (ios *iOSConnector) GetMessagesSinceDate(chatID string, minDate time.Time, return resp, err } +func (ios *iOSConnector) GetMessagesBetween(chatID string, minDate, maxDate time.Time) ([]*imessage.Message, error) { + panic("not implemented") +} + +func (ios *iOSConnector) GetMessagesBeforeWithLimit(chatID string, before time.Time, limit int) ([]*imessage.Message, error) { + panic("not implemented") +} + func (ios *iOSConnector) GetMessagesWithLimit(chatID string, limit int, backfillID string) ([]*imessage.Message, error) { resp := make([]*imessage.Message, 0) err := ios.IPC.Request(context.Background(), ReqGetRecentMessages, &GetRecentMessagesRequest{ diff --git a/imessage/mac/database.go b/imessage/mac/database.go index 6ac32baa..1a682205 100644 --- a/imessage/mac/database.go +++ b/imessage/mac/database.go @@ -33,24 +33,26 @@ type macOSDatabase struct { log log.Logger bridge imessage.Bridge - chatDBPath string - chatDB *sql.DB - messagesQuery *sql.Stmt - singleMessageQuery *sql.Stmt - limitedMessagesQuery *sql.Stmt - newMessagesQuery *sql.Stmt - newReceiptsQuery *sql.Stmt - attachmentsQuery *sql.Stmt - chatQuery *sql.Stmt - chatGUIDQuery *sql.Stmt - groupActionQuery *sql.Stmt - recentChatsQuery *sql.Stmt - groupMemberQuery *sql.Stmt - Messages chan *imessage.Message - ReadReceipts chan *imessage.ReadReceipt - stopWakeupDetecting chan struct{} - stopWatching chan struct{} - stopWait sync.WaitGroup + chatDBPath string + chatDB *sql.DB + messagesAfterQuery *sql.Stmt + messagesBetweenQuery *sql.Stmt + messagesBeforeWithLimitQuery *sql.Stmt + singleMessageQuery *sql.Stmt + limitedMessagesQuery *sql.Stmt + newMessagesQuery *sql.Stmt + newReceiptsQuery *sql.Stmt + attachmentsQuery *sql.Stmt + chatQuery *sql.Stmt + chatGUIDQuery *sql.Stmt + groupActionQuery *sql.Stmt + recentChatsQuery *sql.Stmt + groupMemberQuery *sql.Stmt + Messages chan *imessage.Message + ReadReceipts chan *imessage.ReadReceipt + stopWakeupDetecting chan struct{} + stopWatching chan struct{} + stopWait sync.WaitGroup *ContactStore } diff --git a/imessage/mac/messages.go b/imessage/mac/messages.go index 81b2c753..d85fcfeb 100644 --- a/imessage/mac/messages.go +++ b/imessage/mac/messages.go @@ -63,11 +63,22 @@ var singleMessageQuery = baseMessagesQuery + ` WHERE message.guid = $1 ` -var messagesQuery = baseMessagesQuery + ` +var messagesAfterQuery = baseMessagesQuery + ` WHERE (chat.guid=$1 OR $1='') AND message.date>$2 ORDER BY message.date ASC ` +var messagesBetweenQuery = baseMessagesQuery + ` +WHERE (chat.guid=$1 OR $1='') AND message.date>$2 AND message.date<$3 +ORDER BY message.date DESC +` + +var messagesBeforeWithLimitQuery = baseMessagesQuery + ` +WHERE (chat.guid=$1 OR $1='') AND message.date<$2 +ORDER BY message.date DESC +LIMIT $3 +` + var limitedMessagesQuery = baseMessagesQuery + ` WHERE (chat.guid=$1 OR $1='') ORDER BY message.date DESC @@ -147,24 +158,35 @@ func (mac *macOSDatabase) prepareMessages() error { return err } if !columnExists(mac.chatDB, "message", "thread_originator_guid") { - messagesQuery = strings.ReplaceAll(messagesQuery, "COALESCE(message.thread_originator_guid, '')", "''") + messagesAfterQuery = strings.ReplaceAll(messagesAfterQuery, "COALESCE(message.thread_originator_guid, '')", "''") + messagesBetweenQuery = strings.ReplaceAll(messagesBetweenQuery, "COALESCE(message.thread_originator_guid, '')", "''") limitedMessagesQuery = strings.ReplaceAll(limitedMessagesQuery, "COALESCE(message.thread_originator_guid, '')", "''") newMessagesQuery = strings.ReplaceAll(newMessagesQuery, "COALESCE(message.thread_originator_guid, '')", "''") singleMessageQuery = strings.ReplaceAll(singleMessageQuery, "COALESCE(message.thread_originator_guid, '')", "''") } if !columnExists(mac.chatDB, "message", "thread_originator_part") { - messagesQuery = strings.ReplaceAll(messagesQuery, "COALESCE(message.thread_originator_part, '')", "''") + messagesAfterQuery = strings.ReplaceAll(messagesAfterQuery, "COALESCE(message.thread_originator_part, '')", "''") + messagesBetweenQuery = strings.ReplaceAll(messagesBetweenQuery, "COALESCE(message.thread_originator_part, '')", "''") limitedMessagesQuery = strings.ReplaceAll(limitedMessagesQuery, "COALESCE(message.thread_originator_part, '')", "''") newMessagesQuery = strings.ReplaceAll(newMessagesQuery, "COALESCE(message.thread_originator_part, '')", "''") singleMessageQuery = strings.ReplaceAll(singleMessageQuery, "COALESCE(message.thread_originator_part, '')", "''") } if !columnExists(mac.chatDB, "message", "group_action_type") { - messagesQuery = strings.ReplaceAll(messagesQuery, "message.group_action_type", "0") + messagesAfterQuery = strings.ReplaceAll(messagesAfterQuery, "message.group_action_type", "0") + messagesBetweenQuery = strings.ReplaceAll(messagesBetweenQuery, "message.group_action_type", "0") limitedMessagesQuery = strings.ReplaceAll(limitedMessagesQuery, "message.group_action_type", "0") newMessagesQuery = strings.ReplaceAll(newMessagesQuery, "message.group_action_type", "0") singleMessageQuery = strings.ReplaceAll(singleMessageQuery, "message.group_action_type", "0") } - mac.messagesQuery, err = mac.chatDB.Prepare(messagesQuery) + mac.messagesAfterQuery, err = mac.chatDB.Prepare(messagesAfterQuery) + if err != nil { + return fmt.Errorf("failed to prepare message query: %w", err) + } + mac.messagesBetweenQuery, err = mac.chatDB.Prepare(messagesBetweenQuery) + if err != nil { + return fmt.Errorf("failed to prepare message query: %w", err) + } + mac.messagesBeforeWithLimitQuery, err = mac.chatDB.Prepare(messagesBeforeWithLimitQuery) if err != nil { return fmt.Errorf("failed to prepare message query: %w", err) } @@ -319,13 +341,31 @@ func (mac *macOSDatabase) GetMessagesWithLimit(chatID string, limit int, backfil } func (mac *macOSDatabase) GetMessagesSinceDate(chatID string, minDate time.Time, _ string) ([]*imessage.Message, error) { - res, err := mac.messagesQuery.Query(chatID, minDate.UnixNano()-imessage.AppleEpoch.UnixNano()) + res, err := mac.messagesAfterQuery.Query(chatID, minDate.UnixNano()-imessage.AppleEpoch.UnixNano()) if err != nil { return nil, fmt.Errorf("error querying messages after date: %w", err) } return mac.scanMessages(res) } +func (mac *macOSDatabase) GetMessagesBetween(chatID string, minDate time.Time, maxDate time.Time) ([]*imessage.Message, error) { + res, err := mac.messagesBetweenQuery.Query(chatID, + minDate.UnixNano()-imessage.AppleEpoch.UnixNano(), + maxDate.UnixNano()-imessage.AppleEpoch.UnixNano()) + if err != nil { + return nil, fmt.Errorf("error querying messages between dates: %w", err) + } + return mac.scanMessages(res) +} + +func (mac *macOSDatabase) GetMessagesBeforeWithLimit(chatID string, before time.Time, limit int) ([]*imessage.Message, error) { + res, err := mac.messagesBeforeWithLimitQuery.Query(chatID, before.UnixNano()-imessage.AppleEpoch.UnixNano(), limit) + if err != nil { + return nil, fmt.Errorf("error querying messages before date with limit: %w", err) + } + return mac.scanMessages(res) +} + func (mac *macOSDatabase) GetMessage(guid string) (*imessage.Message, error) { res, err := mac.singleMessageQuery.Query(guid) if err != nil { From bbc777bd113e3c4d19e732add19ed0a7289bc998 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 1 Aug 2023 14:12:41 -0600 Subject: [PATCH 04/13] ws commands: split backfill room into create and poll Signed-off-by: Sumner Evans --- matrix.go | 112 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/matrix.go b/matrix.go index 3ad1e87e..09549dcb 100644 --- a/matrix.go +++ b/matrix.go @@ -43,6 +43,8 @@ type WebsocketCommandHandler struct { lastSyncProxyError time.Time syncProxyBackoff time.Duration syncProxyWaiting int64 + + createRoomsForBackfillError error } func NewWebsocketCommandHandler(br *IMBridge) *WebsocketCommandHandler { @@ -63,6 +65,7 @@ func NewWebsocketCommandHandler(br *IMBridge) *WebsocketCommandHandler { br.AS.SetWebsocketCommandHandler("edit_ghost", handler.handleWSEditGhost) br.AS.SetWebsocketCommandHandler("do_hacky_test", handler.handleWSHackyTest) br.AS.SetWebsocketCommandHandler("create_rooms_for_backfill", handler.handleCreateRoomsForBackfill) + br.AS.SetWebsocketCommandHandler("get_room_info_for_backfill", handler.handleGetRoomInfoForBackfill) return handler } @@ -348,48 +351,105 @@ type NewRoomBackfillRequest struct { Chats []*imessage.ChatInfo `json:"chats"` } -type NewRoomForBackfill struct { - RoomID id.RoomID - EarliestBridgedTimestamp int64 +func (mx *WebsocketCommandHandler) handleCreateRoomsForBackfill(cmd appservice.WebsocketCommand) (bool, any) { + var req NewRoomBackfillRequest + if err := json.Unmarshal(cmd.Data, &req); err != nil { + return false, fmt.Errorf("failed to parse request: %w", err) + } + + mx.log.Debugfln("Got request to create %d rooms for backfill", len(req.Chats)) + go func() { + mx.createRoomsForBackfillError = nil + for _, info := range req.Chats { + info.Identifier = imessage.ParseIdentifier(info.JSONChatGUID) + portals := mx.bridge.FindPortalsByThreadID(info.ThreadID) + var portal *Portal + if len(portals) > 1 { + mx.log.Warnfln("Found multiple portals with thread ID %s (message chat guid: %s)", info.ThreadID, info.Identifier.String()) + continue + } else if len(portals) == 1 { + portal = portals[0] + } else { + // This will create the new portal + portal = mx.bridge.GetPortalByGUID(info.Identifier.String()) + } + + if len(portal.MXID) == 0 { + portal.zlog.Info().Msg("Creating Matrix room with latest chat info") + err := portal.CreateMatrixRoom(info, nil) + if err != nil { + mx.createRoomsForBackfillError = err + return + } + } else { + portal.zlog.Info().Msg("Syncing Matrix room with latest chat info") + portal.SyncWithInfo(info) + } + + mx.log.Debugfln("Room %s created for backfilling %s", portal.MXID, info.JSONChatGUID) + } + }() + return true, struct{}{} } -type NewRoomBackfillResponse map[string]NewRoomForBackfill +type RoomInfoForBackfillRequest struct { + ChatGUIDs []string `json:"chats_guids"` +} -func (mx *WebsocketCommandHandler) handleCreateRoomsForBackfill(cmd appservice.WebsocketCommand) (bool, any) { - var req NewRoomBackfillRequest +type RoomInfoForBackfill struct { + RoomID id.RoomID `json:"room_id"` + EarliestBridgedTimestamp int64 `json:"earliest_bridged_timestamp"` +} + +type RoomCreationForBackfillStatus string + +const ( + RoomCreationForBackfillStatusInProgress RoomCreationForBackfillStatus = "in-progress" + RoomCreationForBackfillStatusDone RoomCreationForBackfillStatus = "done" + RoomCreationForBackfillStatusError RoomCreationForBackfillStatus = "error" +) + +type RoomInfoForBackfillResponse struct { + Status RoomCreationForBackfillStatus `json:"status"` + Error string `json:"error,omitempty"` + Rooms map[string]RoomInfoForBackfill `json:"rooms,omitempty"` +} + +func (mx *WebsocketCommandHandler) handleGetRoomInfoForBackfill(cmd appservice.WebsocketCommand) (bool, any) { + if mx.createRoomsForBackfillError != nil { + return true, RoomInfoForBackfillResponse{ + Status: RoomCreationForBackfillStatusError, + Error: mx.createRoomsForBackfillError.Error(), + } + } + + var req RoomInfoForBackfillRequest if err := json.Unmarshal(cmd.Data, &req); err != nil { return false, fmt.Errorf("failed to parse request: %w", err) } - createdRooms := NewRoomBackfillResponse{} + resp := RoomInfoForBackfillResponse{ + Status: RoomCreationForBackfillStatusDone, + Rooms: map[string]RoomInfoForBackfill{}, + } now := time.Now().UnixMilli() - for _, info := range req.Chats { - info.Identifier = imessage.ParseIdentifier(info.JSONChatGUID) - portal := mx.bridge.GetPortalByGUID(info.Identifier.String()) + mx.log.Debugfln("Got request to get room info for backfills") + for _, chatGUID := range req.ChatGUIDs { + portal := mx.bridge.GetPortalByGUID(chatGUID) - if len(portal.MXID) > 0 { - portal.zlog.Info().Msg("Syncing Matrix room with latest chat info") - portal.SyncWithInfo(info) - } else { - portal.zlog.Info().Msg("Creating Matrix room with latest chat info") - err := portal.CreateMatrixRoom(info, nil) - if err != nil { - return false, fmt.Errorf("failed to create portal: %w", err) - } + if len(portal.MXID) == 0 { + return true, RoomInfoForBackfillResponse{Status: RoomCreationForBackfillStatusInProgress} } - timestamp, err := mx.bridge.DB.Message.GetEarliestTimestampInChat(info.Identifier.String()) - if err != nil { - return false, fmt.Errorf("failed to get earliest timestamp in chat: %w", err) - } else if timestamp < 0 { + timestamp, err := mx.bridge.DB.Message.GetEarliestTimestampInChat(chatGUID) + if err != nil || timestamp < 0 { timestamp = now } - - createdRooms[portal.GUID] = NewRoomForBackfill{ + resp.Rooms[portal.GUID] = RoomInfoForBackfill{ RoomID: portal.MXID, EarliestBridgedTimestamp: timestamp, } } - return true, createdRooms + return true, resp } From 68c3116c6e2acf28fba33aa00b1c31fdbe60fbfd Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 2 Aug 2023 09:51:50 -0600 Subject: [PATCH 05/13] portal: add transaction version of GetPortalByGUID Signed-off-by: Sumner Evans --- portal.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/portal.go b/portal.go index d3db2883..ae2677d9 100644 --- a/portal.go +++ b/portal.go @@ -61,18 +61,22 @@ func (br *IMBridge) GetPortalByMXID(mxid id.RoomID) *Portal { } func (br *IMBridge) GetPortalByGUID(guid string) *Portal { + return br.GetPortalByGUIDWithTransaction(nil, guid) +} + +func (br *IMBridge) GetPortalByGUIDWithTransaction(txn dbutil.Execable, guid string) *Portal { br.portalsLock.Lock() defer br.portalsLock.Unlock() - return br.maybeGetPortalByGUID(guid, true) + return br.maybeGetPortalByGUID(txn, guid, true) } func (br *IMBridge) GetPortalByGUIDIfExists(guid string) *Portal { br.portalsLock.Lock() defer br.portalsLock.Unlock() - return br.maybeGetPortalByGUID(guid, false) + return br.maybeGetPortalByGUID(nil, guid, false) } -func (br *IMBridge) maybeGetPortalByGUID(guid string, createIfNotExist bool) *Portal { +func (br *IMBridge) maybeGetPortalByGUID(txn dbutil.Execable, guid string, createIfNotExist bool) *Portal { if br.Config.Bridge.DisableSMSPortals && strings.HasPrefix(guid, "SMS;-;") { parsed := imessage.ParseIdentifier(guid) if !parsed.IsGroup && parsed.Service == "SMS" { @@ -86,7 +90,7 @@ func (br *IMBridge) maybeGetPortalByGUID(guid string, createIfNotExist bool) *Po } portal, ok := br.portalsByGUID[guid] if !ok { - return br.loadDBPortal(nil, br.DB.Portal.GetByGUID(guid), fallbackGUID) + return br.loadDBPortal(txn, br.DB.Portal.GetByGUID(guid), fallbackGUID) } return portal } @@ -98,7 +102,7 @@ func (br *IMBridge) GetMessagesSince(chatGUID string, since time.Time) (out []st func (br *IMBridge) ReIDPortal(oldGUID, newGUID string, mergeExisting bool) bool { br.portalsLock.Lock() defer br.portalsLock.Unlock() - portal := br.maybeGetPortalByGUID(oldGUID, false) + portal := br.maybeGetPortalByGUID(nil, oldGUID, false) if portal == nil { br.Log.Debugfln("Ignoring chat ID change %s->%s, no portal with old ID found", oldGUID, newGUID) return false @@ -114,7 +118,7 @@ func (portal *Portal) reIDInto(newGUID string, newPortal *Portal, lock, mergeExi defer br.portalsLock.Unlock() } if newPortal == nil { - newPortal = br.maybeGetPortalByGUID(newGUID, false) + newPortal = br.maybeGetPortalByGUID(nil, newGUID, false) } if newPortal != nil { if mergeExisting && portal.MXID != "" && newPortal.MXID != "" && br.Config.Homeserver.Software == bridgeconfig.SoftwareHungry { From 52c6238a2221ff82d0e9ece0787ce76eefff38e3 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 2 Aug 2023 12:24:33 -0600 Subject: [PATCH 06/13] mac connector: sort recent chats by most recent first Signed-off-by: Sumner Evans --- imessage/mac/messages.go | 1 + 1 file changed, 1 insertion(+) diff --git a/imessage/mac/messages.go b/imessage/mac/messages.go index d85fcfeb..8681d751 100644 --- a/imessage/mac/messages.go +++ b/imessage/mac/messages.go @@ -116,6 +116,7 @@ SELECT DISTINCT chat.guid, chat.group_id FROM message JOIN chat_message_join ON chat_message_join.message_id = message.ROWID JOIN chat ON chat_message_join.chat_id = chat.ROWID WHERE message.date>$1 +ORDER BY message.date DESC ` const newReceiptsQuery = ` From 29d62db83144a9b1945941cb7e20ece445475872 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 3 Aug 2023 07:38:58 -0600 Subject: [PATCH 07/13] crypto: set different ProtocolName for only_backfill mode Signed-off-by: Sumner Evans --- main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/main.go b/main.go index 8743e59d..c6bda364 100644 --- a/main.go +++ b/main.go @@ -219,6 +219,10 @@ func (br *IMBridge) Init() { br.Bridge.BeeperServiceName = "imessage" } + if br.Config.Bridge.Backfill.OnlyBackfill { + br.ProtocolName = "iMessage (Backfill)" + } + br.IMHandler = NewiMessageHandler(br) br.WebsocketHandler = NewWebsocketCommandHandler(br) br.wsOnConnectWait.Add(1) From 24651214c1efbed63db80a86c4242cfa8a1c6bd8 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 3 Aug 2023 08:48:51 -0600 Subject: [PATCH 08/13] beeper service name: set to imessagecloud in only_backfill mode Signed-off-by: Sumner Evans --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index c6bda364..c1dad2a1 100644 --- a/main.go +++ b/main.go @@ -211,7 +211,7 @@ func (br *IMBridge) Init() { }) br.Bridge.BeeperNetworkName = "androidsms" br.Bridge.BeeperServiceName = "androidsms" - } else if br.Config.IMessage.Platform == "mac-nosip" { + } else if br.Config.IMessage.Platform == "mac-nosip" || br.Config.Bridge.Backfill.OnlyBackfill { br.Bridge.BeeperNetworkName = "imessage" br.Bridge.BeeperServiceName = "imessagecloud" } else { From 093686d4abae006031400199e7530a7ad738e926 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 3 Aug 2023 14:42:52 -0600 Subject: [PATCH 09/13] deps/mautrix: upgrade to get ShareOneTimeKeys function Signed-off-by: Sumner Evans --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index aa5dc879..a6b31b06 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( golang.org/x/image v0.7.0 maunium.net/go/mauflag v1.0.0 maunium.net/go/maulogger/v2 v2.4.1 - maunium.net/go/mautrix v0.15.5-0.20230729114956-ff77fa8ec663 + maunium.net/go/mautrix v0.15.5-0.20230803204106-2212b451f1ca ) require ( diff --git a/go.sum b/go.sum index ac422e22..677d03d7 100644 --- a/go.sum +++ b/go.sum @@ -94,5 +94,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= maunium.net/go/maulogger/v2 v2.4.1 h1:N7zSdd0mZkB2m2JtFUsiGTQQAdP0YeFWT7YMc80yAL8= maunium.net/go/maulogger/v2 v2.4.1/go.mod h1:omPuYwYBILeVQobz8uO3XC8DIRuEb5rXYlQSuqrbCho= -maunium.net/go/mautrix v0.15.5-0.20230729114956-ff77fa8ec663 h1:tsXiuaWnBe9Xa/ms8tUhsJ4x7tPbQJ94fugqBIC0vsw= -maunium.net/go/mautrix v0.15.5-0.20230729114956-ff77fa8ec663/go.mod h1:dBaDmsnOOBM4a+gKcgefXH73pHGXm+MCJzCs1dXFgrw= +maunium.net/go/mautrix v0.15.5-0.20230803204106-2212b451f1ca h1:ZV6m+VfhxKnIoChAlV0eiJLNzVH5eklAGWSBEYrwRqw= +maunium.net/go/mautrix v0.15.5-0.20230803204106-2212b451f1ca/go.mod h1:dBaDmsnOOBM4a+gKcgefXH73pHGXm+MCJzCs1dXFgrw= From fd687c055d6d9dc2b2a7db9a02607457c99af227 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Mon, 31 Jul 2023 15:11:12 -0600 Subject: [PATCH 10/13] backfill: implement loop Signed-off-by: Sumner Evans --- backfillqueue.go | 332 ++++++++++++++++++++++++++++++++++++++ database/backfillqueue.go | 17 -- go.mod | 2 +- go.sum | 4 +- historysync.go | 159 ++++++++++++++++-- imessage.go | 2 +- main.go | 19 ++- portal.go | 15 +- user.go | 4 + 9 files changed, 513 insertions(+), 41 deletions(-) create mode 100644 backfillqueue.go diff --git a/backfillqueue.go b/backfillqueue.go new file mode 100644 index 00000000..b6898831 --- /dev/null +++ b/backfillqueue.go @@ -0,0 +1,332 @@ +// mautrix-imessage - A Matrix-iMessage puppeting bridge. +// Copyright (C) 2023 Tulir Asokan, Sumner Evans +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/rs/zerolog" + log "maunium.net/go/maulogger/v2" + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/id" + "maunium.net/go/mautrix/util/dbutil" + + "go.mau.fi/mautrix-imessage/database" + "go.mau.fi/mautrix-imessage/imessage" +) + +type BackfillQueue struct { + BackfillQuery *database.BackfillQuery + reCheckChannel chan bool + log log.Logger +} + +func (bq *BackfillQueue) ReCheck() { + bq.log.Infofln("Sending re-checks to channel") + go func() { + bq.reCheckChannel <- true + }() +} + +func (bq *BackfillQueue) GetNextBackfill(userID id.UserID) *database.Backfill { + for { + if backfill := bq.BackfillQuery.GetNext(userID); backfill != nil { + backfill.MarkDispatched() + return backfill + } + + select { + case <-bq.reCheckChannel: + case <-time.After(time.Minute): + } + } +} + +func (user *User) HandleBackfillRequestsLoop(ctx context.Context) { + log := zerolog.Ctx(ctx).With().Str("component", "backfill_requests_loop").Logger() + + for { + if count, err := user.bridge.DB.Backfill.Count(ctx, user.MXID); err != nil { + user.setBackfillError(log, err, "Failed to get the number of backfills") + return + } else if incompleteCount, err := user.bridge.DB.Backfill.IncompleteCount(ctx, user.MXID); err != nil { + user.setBackfillError(log, err, "Failed to get the number of incomplete backfills") + return + } else if count > 0 && incompleteCount == 0 { + log.Info(). + Int("num_backfills", count). + Msg("No incomplete backfills, setting status to done") + user.setBackfillDone(log) + time.Sleep(5 * time.Second) + continue + } + + log.Info().Msg("Getting backfill request") + req := user.BackfillQueue.GetNextBackfill(user.MXID) + log.Info().Interface("req", req).Msg("Handling backfill request") + + portal := user.bridge.GetPortalByGUID(req.PortalGUID) + user.backfillInChunks(req, portal) + req.MarkDone() + } +} + +func (user *User) EnqueueImmediateBackfill(txn dbutil.Execable, priority int, portal *Portal, earliestBridgedTimestamp time.Time) { + maxMessages := user.bridge.Config.Bridge.Backfill.Immediate.MaxEvents + initialBackfill := user.bridge.DB.Backfill.NewWithValues(user.MXID, priority, portal.GUID, nil, &earliestBridgedTimestamp, maxMessages, maxMessages, 0) + initialBackfill.Insert(txn) +} + +func (user *User) EnqueueDeferredBackfills(txn dbutil.Execable, portals []*Portal, startIdx int) { + now := time.Now() + numPortals := len(portals) + for stageIdx, backfillStage := range user.bridge.Config.Bridge.Backfill.Deferred { + for portalIdx, portal := range portals { + var startDate *time.Time = nil + if backfillStage.StartDaysAgo > 0 { + startDaysAgo := now.AddDate(0, 0, -backfillStage.StartDaysAgo) + startDate = &startDaysAgo + } + backfillMessages := user.bridge.DB.Backfill.NewWithValues( + user.MXID, startIdx+stageIdx*numPortals+portalIdx, portal.GUID, startDate, nil, backfillStage.MaxBatchEvents, -1, backfillStage.BatchDelay) + backfillMessages.Insert(txn) + } + } +} + +type BackfillStatus string + +const ( + BackfillStatusUnknown BackfillStatus = "unknown" + BackfillStatusLockedByAnotherDevice BackfillStatus = "locked_by_another_device" + BackfillStatusRunning BackfillStatus = "running" + BackfillStatusError BackfillStatus = "error" + BackfillStatusDone BackfillStatus = "done" +) + +type BackfillInfo struct { + Status BackfillStatus `json:"status"` + Error string `json:"error,omitempty"` +} + +func (user *User) GetBackfillInfo() BackfillInfo { + backfillInfo := BackfillInfo{Status: BackfillStatusUnknown} + if user.backfillStatus != "" { + backfillInfo.Status = user.backfillStatus + } + if user.backfillError != nil { + backfillInfo.Error = user.backfillError.Error() + } + return backfillInfo +} + +func (user *User) setBackfillError(log zerolog.Logger, err error, msg string) { + log.Err(err).Msg(msg) + user.backfillStatus = BackfillStatusError + user.backfillError = fmt.Errorf("%s: %w", msg, err) +} + +type BackfillStateAccountData struct { + DeviceID id.DeviceID `json:"device_id"` + Done bool `json:"done"` +} + +func (user *User) setBackfillDone(log zerolog.Logger) { + log.Info(). + Str("device_id", user.bridge.Config.Bridge.DeviceID). + Msg("Setting backfill state account data to done") + err := user.bridge.Bot.SetAccountData("fi.mau.imessage.backfill_state", &BackfillStateAccountData{ + DeviceID: id.DeviceID(user.bridge.Config.Bridge.DeviceID), + Done: true, + }) + if err != nil { + user.setBackfillError(log, err, "failed to set backfill state account data") + return + } + user.backfillStatus = BackfillStatusDone +} + +func (user *User) runOnlyBackfillMode() { + log := user.bridge.ZLog.With().Str("mode", "backfill_only").Logger() + ctx := log.WithContext(context.Background()) + + // Start the backfill queue. We always want this running so that the + // desktop app can request the backfill status. + user.handleHistorySyncsLoop(ctx) + + if !user.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) { + user.setBackfillError(log, nil, "The homeserver does not support Beeper's batch send endpoint") + return + } + + if user.bridge.Config.Bridge.DeviceID == "" { + user.setBackfillError(log, nil, "No device ID set in the config") + return + } + + var backfillState BackfillStateAccountData + err := user.bridge.Bot.GetAccountData("fi.mau.imessage.backfill_state", &backfillState) + if err != nil { + if !errors.Is(err, mautrix.MNotFound) { + user.setBackfillError(log, err, "Error fetching backfill state account data") + return + } + } else if backfillState.DeviceID.String() != user.bridge.Config.Bridge.DeviceID { + user.backfillStatus = BackfillStatusLockedByAnotherDevice + log.Warn(). + Str("device_id", backfillState.DeviceID.String()). + Msg("Backfill already locked for a different device") + return + } else if backfillState.Done { + log.Info(). + Str("device_id", backfillState.DeviceID.String()). + Msg("Backfill already completed") + user.backfillStatus = BackfillStatusDone + return + } + + if count, err := user.bridge.DB.Backfill.Count(ctx, user.MXID); err != nil { + user.setBackfillError(log, err, "Failed to get the number of backfills") + return + } else if incompleteCount, err := user.bridge.DB.Backfill.IncompleteCount(ctx, user.MXID); err != nil { + user.setBackfillError(log, err, "Failed to get the number of incomplete backfills") + return + } else if count > 0 && incompleteCount == 0 { + log.Info(). + Int("num_backfills", count). + Msg("No incomplete backfills, setting status to done") + user.setBackfillDone(log) + return + } else { + err = user.bridge.Crypto.ShareOneTimeKeys(context.Background()) + if err != nil { + user.setBackfillError(log, err, "Error sharing keys") + } + + err = user.bridge.Bot.SetAccountData("fi.mau.imessage.backfill_state", &BackfillStateAccountData{ + DeviceID: id.DeviceID(user.bridge.Config.Bridge.DeviceID), + Done: false, + }) + if err != nil { + user.setBackfillError(log, err, "failed to set backfill state account data") + return + } + user.backfillStatus = BackfillStatusRunning + + if count == 0 { + log.Info().Msg("Starting backfill") + user.getRoomsForBackfillAndEnqueue(ctx) + } else { + log.Info(). + Int("num_backfills", count). + Int("num_incomplete_backfills", incompleteCount). + Msg("Resuming backfill") + // No need to do anything else because the history syncs loop is + // already running + } + } +} + +func (user *User) getRoomsForBackfillAndEnqueue(ctx context.Context) { + log := zerolog.Ctx(ctx).With().Str("method", "getRoomsForBackfillAndEnqueue").Logger() + + // Get every chat from the database + chats, err := user.bridge.IM.GetChatsWithMessagesAfter(imessage.AppleEpoch) + if err != nil { + user.setBackfillError(log, err, "Error retrieving all chats") + return + } + + chatGUIDs := make([]string, len(chats)) + chatInfos := make([]*imessage.ChatInfo, len(chats)) + for i, chat := range chats { + chatGUIDs[i] = chat.ChatGUID + chatInfos[i], err = user.bridge.IM.GetChatInfo(chat.ChatGUID, chat.ThreadID) + if err != nil { + user.setBackfillError(log, err, + fmt.Sprintf("Error getting chat info for %s from database", chat.ChatGUID)) + return + } + chatInfos[i].JSONChatGUID = chatInfos[i].Identifier.String() + } + + // Ask the cloud bridge to create room IDs for every one of the chats. + client := user.CustomIntent().Client + url := client.BuildURL(mautrix.BaseURLPath{ + "_matrix", "asmux", "mxauth", "appservice", user.MXID.Localpart(), "imessagecloud", + "exec", "create_rooms_for_backfill"}) + _, err = client.MakeRequest("POST", url, NewRoomBackfillRequest{ + Chats: chatInfos, + }, nil) + if err != nil { + user.setBackfillError(log, err, "Error starting creation of backfill rooms") + return + } + + // Wait for the rooms to be created. + var roomInfoResp RoomInfoForBackfillResponse + for { + url = client.BuildURL(mautrix.BaseURLPath{ + "_matrix", "asmux", "mxauth", "appservice", user.MXID.Localpart(), "imessagecloud", + "exec", "get_room_info_for_backfill"}) + _, err = client.MakeRequest("POST", url, RoomInfoForBackfillRequest{ + ChatGUIDs: chatGUIDs, + }, &roomInfoResp) + if err != nil { + user.setBackfillError(log, err, "Error requesting backfill room info") + return + } + + if roomInfoResp.Status == RoomCreationForBackfillStatusDone { + break + } else if roomInfoResp.Status == RoomCreationForBackfillStatusError { + user.setBackfillError(log, fmt.Errorf(roomInfoResp.Error), "Error requesting backfill room IDs") + return + } else if roomInfoResp.Status == RoomCreationForBackfillStatusInProgress { + log.Info().Msg("Backfill room creation still in progress, waiting 5 seconds") + time.Sleep(5 * time.Second) + } else { + user.setBackfillError(log, fmt.Errorf("Unknown status %s", roomInfoResp.Status), "Error requesting backfill room IDs") + return + } + } + + // Create all of the portals locally and enqueue backfill requests for + // all of them. + txn, err := user.bridge.DB.Begin() + { + portals := []*Portal{} + var i int + for _, chatIdentifier := range chats { + roomInfo := roomInfoResp.Rooms[chatIdentifier.ChatGUID] + portal := user.bridge.GetPortalByGUIDWithTransaction(txn, chatIdentifier.ChatGUID) + portal.MXID = roomInfo.RoomID + portal.Update(txn) + portals = append(portals, portal) + user.EnqueueImmediateBackfill(txn, i, portal, time.UnixMilli(roomInfo.EarliestBridgedTimestamp)) + i++ + } + user.EnqueueDeferredBackfills(txn, portals, i) + } + if err = txn.Commit(); err != nil { + user.setBackfillError(log, err, "Error committing backfill room IDs") + return + } +} diff --git a/database/backfillqueue.go b/database/backfillqueue.go index 0e664a74..1699d113 100644 --- a/database/backfillqueue.go +++ b/database/backfillqueue.go @@ -127,23 +127,6 @@ func (bq *BackfillQuery) Count(ctx context.Context, userID id.UserID) (count int return } -func (bq *BackfillQuery) HasUnstartedOrInFlight(userID id.UserID) bool { - bq.backfillQueryLock.Lock() - defer bq.backfillQueryLock.Unlock() - - rows, err := bq.db.Query(getUnstartedOrInFlightQuery, userID) - if err != nil || rows == nil { - if err != nil && !errors.Is(err, sql.ErrNoRows) { - bq.log.Warnfln("Failed to query backfill queue jobs: %v", err) - } - // No rows means that there are no unstarted or in flight backfill - // requests. - return false - } - defer rows.Close() - return rows.Next() -} - func (bq *BackfillQuery) DeleteAll(userID id.UserID) { bq.backfillQueryLock.Lock() defer bq.backfillQueryLock.Unlock() diff --git a/go.mod b/go.mod index a6b31b06..31331a76 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/strukturag/libheif v1.16.2 github.com/tidwall/gjson v1.14.4 golang.org/x/crypto v0.11.0 + golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b golang.org/x/image v0.7.0 maunium.net/go/mauflag v1.0.0 maunium.net/go/maulogger/v2 v2.4.1 @@ -28,7 +29,6 @@ require ( github.com/tidwall/sjson v1.2.5 // indirect github.com/yuin/goldmark v1.5.4 // indirect go.mau.fi/zeroconfig v0.1.2 // indirect - golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect diff --git a/go.sum b/go.sum index 677d03d7..2d60b55d 100644 --- a/go.sum +++ b/go.sum @@ -45,8 +45,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= +golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.7.0 h1:gzS29xtG1J5ybQlv0PuyfE3nmc6R4qB73m6LUUmvFuw= golang.org/x/image v0.7.0/go.mod h1:nd/q4ef1AKKYl/4kft7g+6UyGbdiqWqTP1ZAbRoV7Rg= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= diff --git a/historysync.go b/historysync.go index d43785f6..5e36e8a3 100644 --- a/historysync.go +++ b/historysync.go @@ -17,12 +17,14 @@ package main import ( + "context" "crypto/sha256" "encoding/base64" "fmt" "runtime/debug" "time" + "golang.org/x/exp/slices" "maunium.net/go/mautrix" "maunium.net/go/mautrix/appservice" "maunium.net/go/mautrix/bridge/bridgeconfig" @@ -34,7 +36,28 @@ import ( "go.mau.fi/mautrix-imessage/imessage" ) -var PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType} +var ( + PortalCreationDummyEvent = event.Type{Type: "fi.mau.dummy.portal_created", Class: event.MessageEventType} + BackfillStatusEvent = event.Type{Type: "com.beeper.backfill_status", Class: event.StateEventType} +) + +func (user *User) handleHistorySyncsLoop(ctx context.Context) { + if !user.bridge.Config.Bridge.Backfill.OnlyBackfill || !user.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) { + user.log.Infofln("Not backfilling history since OnlyBackfill is disabled") + return + } + + // Start the backfill queue. + user.BackfillQueue = &BackfillQueue{ + BackfillQuery: user.bridge.DB.Backfill, + reCheckChannel: make(chan bool), + log: user.log.Sub("BackfillQueue"), + } + + // Handle all backfills in the same loop. Since new chats will not need to + // be handled by this loop, priority is all that is needed. + go user.HandleBackfillRequestsLoop(ctx) +} func (portal *Portal) lockBackfill() { portal.backfillLock.Lock() @@ -96,7 +119,7 @@ func (portal *Portal) forwardBackfill() { } else if len(messages) == 0 || allSkipped { portal.log.Debugln("Nothing to backfill") } else { - portal.sendBackfill(backfillID, messages, true) + portal.sendBackfill(backfillID, messages, true, false, false) } } @@ -175,7 +198,7 @@ func (portal *Portal) convertBackfill(messages []*imessage.Message) ([]*event.Ev return events, metas, metaIndexes, isRead, nil } -func (portal *Portal) sendBackfill(backfillID string, messages []*imessage.Message, forward bool) (success bool) { +func (portal *Portal) sendBackfill(backfillID string, messages []*imessage.Message, forward, forwardIfNoMessages, markAsRead bool) (success bool) { idMap := make(map[string][]id.EventID, len(messages)) for _, msg := range messages { idMap[msg.GUID] = []id.EventID{} @@ -189,8 +212,8 @@ func (portal *Portal) sendBackfill(backfillID string, messages []*imessage.Messa portal.bridge.IM.SendBackfillResult(portal.GUID, backfillID, success, idMap) }() batchSending := portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) - if !batchSending && !forward { - portal.log.Debugfln("Dropping non-forward backfill %s as Beeper batch sending is not supported", backfillID) + if !batchSending { + portal.log.Debugfln("Dropping backfill %s as Beeper batch sending is not supported", backfillID) return true } events, metas, metaIndexes, isRead, err := portal.convertBackfill(messages) @@ -205,10 +228,11 @@ func (portal *Portal) sendBackfill(backfillID string, messages []*imessage.Messa var eventIDs []id.EventID if batchSending { req := &mautrix.ReqBeeperBatchSend{ - Events: events, - Forward: forward, + Events: events, + Forward: forward, + ForwardIfNoMessages: forwardIfNoMessages, } - if isRead { + if isRead || markAsRead { req.MarkReadBy = portal.bridge.user.MXID } resp, err := portal.MainIntent().BeeperBatchSend(portal.MXID, req) @@ -235,7 +259,7 @@ func (portal *Portal) sendBackfill(backfillID string, messages []*imessage.Messa } eventIDs[i] = resp.EventID } - if isRead && portal.bridge.user.DoublePuppetIntent != nil { + if (isRead || markAsRead) && portal.bridge.user.DoublePuppetIntent != nil { lastReadEvent := eventIDs[len(eventIDs)-1] err := portal.markRead(portal.bridge.user.DoublePuppetIntent, lastReadEvent, time.Time{}) if err != nil { @@ -291,3 +315,120 @@ func (portal *Portal) finishBackfill(txn dbutil.Transaction, eventIDs []id.Event } } } + +func (user *User) backfillInChunks(req *database.Backfill, portal *Portal) { + if len(portal.MXID) == 0 { + user.log.Errorfln("Portal %s has no room ID, but backfill was requested", portal.GUID) + return + } + portal.Sync(false) + + backfillState := user.bridge.DB.Backfill.GetBackfillState(user.MXID, portal.GUID) + if backfillState == nil { + backfillState = user.bridge.DB.Backfill.NewBackfillState(user.MXID, portal.GUID) + } + backfillState.SetProcessingBatch(true) + defer backfillState.SetProcessingBatch(false) + portal.updateBackfillStatus(backfillState) + + var timeStart = imessage.AppleEpoch + if req.TimeStart != nil { + timeStart = *req.TimeStart + user.log.Debugfln("Limiting backfill to start at %v", timeStart) + } + + var timeEnd time.Time + var forwardIfNoMessages, shouldMarkAsRead bool + if req.TimeEnd != nil { + timeEnd = *req.TimeEnd + user.log.Debugfln("Limiting backfill to end at %v", req.TimeEnd) + forwardIfNoMessages = true + } else { + firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.GUID) + if firstMessage != nil { + timeEnd = firstMessage.Time().Add(-1 * time.Millisecond) + user.log.Debugfln("Limiting backfill to end at %v", timeEnd) + } else { + // Portal is empty, but no TimeEnd was set. + user.log.Errorln("Portal %s is empty, but no TimeEnd was set", portal.MXID) + return + } + } + + backfillID := fmt.Sprintf("bridge-chunk-%s::%s-%s::%d", portal.GUID, timeStart, timeEnd, time.Now().UnixMilli()) + + // If the message was before the unread hours threshold, mark it as + // read. + lastMessages, err := user.bridge.IM.GetMessagesWithLimit(portal.GUID, 1, backfillID) + if err != nil { + user.log.Errorfln("Failed to get last message from database") + return + } else if len(lastMessages) == 1 { + shouldMarkAsRead = user.bridge.Config.Bridge.Backfill.UnreadHoursThreshold > 0 && + lastMessages[0].Time.Before(time.Now().Add(time.Duration(-user.bridge.Config.Bridge.Backfill.UnreadHoursThreshold)*time.Hour)) + } + + var allMsgs []*imessage.Message + if req.MaxTotalEvents >= 0 { + allMsgs, err = user.bridge.IM.GetMessagesBeforeWithLimit(portal.GUID, timeEnd, req.MaxTotalEvents) + } else { + allMsgs, err = user.bridge.IM.GetMessagesBetween(portal.GUID, timeStart, timeEnd) + } + if err != nil { + user.log.Errorfln("Failed to get messages between %v and %v: %v", req.TimeStart, timeEnd, err) + return + } + + if len(allMsgs) == 0 { + user.log.Debugfln("Not backfilling %s (%v - %v): no bridgeable messages found", portal.GUID, timeStart, timeEnd) + return + } + + user.log.Infofln("Backfilling %d messages in %s, %d messages at a time (queue ID: %d)", len(allMsgs), portal.GUID, req.MaxBatchEvents, req.QueueID) + toBackfill := allMsgs[0:] + for len(toBackfill) > 0 { + var msgs []*imessage.Message + if len(toBackfill) <= req.MaxBatchEvents || req.MaxBatchEvents < 0 { + msgs = toBackfill + toBackfill = nil + } else { + msgs = toBackfill[:req.MaxBatchEvents] + toBackfill = toBackfill[req.MaxBatchEvents:] + } + + if len(msgs) > 0 { + time.Sleep(time.Duration(req.BatchDelay) * time.Second) + user.log.Debugfln("Backfilling %d messages in %s (queue ID: %d)", len(msgs), portal.GUID, req.QueueID) + + // The sendBackfill function wants the messages in order, but the + // queries give it in reversed order. + slices.Reverse(msgs) + portal.sendBackfill(backfillID, msgs, false, forwardIfNoMessages, shouldMarkAsRead) + } + } + user.log.Debugfln("Finished backfilling %d messages in %s (queue ID: %d)", len(allMsgs), portal.GUID, req.QueueID) + + if req.TimeStart == nil && req.TimeEnd == nil { + // If both the start time and end time are nil, then this is the max + // history backfill, so there is no more history to backfill. + backfillState.BackfillComplete = true + backfillState.FirstExpectedTimestamp = uint64(allMsgs[len(allMsgs)-1].Time.UnixMilli()) + backfillState.Upsert() + portal.updateBackfillStatus(backfillState) + } +} + +func (portal *Portal) updateBackfillStatus(backfillState *database.BackfillState) { + backfillStatus := "backfilling" + if backfillState.BackfillComplete { + backfillStatus = "complete" + } + + _, err := portal.bridge.Bot.SendStateEvent(portal.MXID, BackfillStatusEvent, "", map[string]any{ + "status": backfillStatus, + "first_timestamp": backfillState.FirstExpectedTimestamp * 1000, + }) + if err != nil { + portal.log.Errorln("Error sending backfill status event:", err) + } +} diff --git a/imessage.go b/imessage.go index 3a9d8752..65661cef 100644 --- a/imessage.go +++ b/imessage.go @@ -291,7 +291,7 @@ func (imh *iMessageHandler) HandleBackfillTask(task *imessage.BackfillTask) { return } portal.log.Debugfln("Running backfill %s in background", task.BackfillID) - go portal.sendBackfill(task.BackfillID, task.Messages, false) + go portal.sendBackfill(task.BackfillID, task.Messages, false, false, false) } func (imh *iMessageHandler) HandleContact(contact *imessage.Contact) { diff --git a/main.go b/main.go index c1dad2a1..0a78ed82 100644 --- a/main.go +++ b/main.go @@ -465,6 +465,17 @@ func (br *IMBridge) isWarmingUp() bool { } func (br *IMBridge) Start() { + br.Log.Debugln("Finding bridge user") + br.user = br.loadDBUser() + br.user.initDoublePuppet() + + // If this bridge is in OnlyBackfill mode, then only run the backfill + // queue, and not the new message listeners. + if br.Config.Bridge.Backfill.OnlyBackfill { + br.user.runOnlyBackfillMode() + return + } + if br.Config.Bridge.MessageStatusEvents { sendStatusStart := br.DB.KV.Get(database.KVSendStatusStart) if len(sendStatusStart) > 0 { @@ -488,9 +499,6 @@ func (br *IMBridge) Start() { needsPortalFinding := br.Config.Bridge.FindPortalsIfEmpty && br.DB.Portal.Count() == 0 && br.DB.KV.Get(database.KVLookedForPortals) != "true" - br.Log.Debugln("Finding bridge user") - br.user = br.loadDBUser() - br.user.initDoublePuppet() var startupGroup sync.WaitGroup startupGroup.Add(1) br.Log.Debugln("Connecting to iMessage") @@ -633,7 +641,10 @@ func (br *IMBridge) ipcStop(_ json.RawMessage) interface{} { } func (br *IMBridge) Stop() { - br.Log.Debugln("Stopping iMessage connector") + br.ZLog.Debug().Msg("Stopping iMessage connector") + if br.Config.Bridge.Backfill.OnlyBackfill { + return + } br.IM.Stop() br.IMHandler.Stop() } diff --git a/portal.go b/portal.go index ae2677d9..5b609200 100644 --- a/portal.go +++ b/portal.go @@ -243,13 +243,14 @@ type Portal struct { SecondaryGUIDs []string - Messages chan *imessage.Message - ReadReceipts chan *imessage.ReadReceipt - MessageStatuses chan *imessage.SendMessageStatus - MatrixMessages chan *event.Event - backfillStart chan struct{} - backfillWait sync.WaitGroup - backfillLock sync.Mutex + Messages chan *imessage.Message + ReadReceipts chan *imessage.ReadReceipt + MessageStatuses chan *imessage.SendMessageStatus + MatrixMessages chan *event.Event + backfillStart chan struct{} + backfillWait sync.WaitGroup + backfillLock sync.Mutex + roomCreateLock sync.Mutex messageDedup map[string]SentMessage messageDedupLock sync.Mutex diff --git a/user.go b/user.go index e861a862..17b5227d 100644 --- a/user.go +++ b/user.go @@ -47,6 +47,10 @@ type User struct { mgmtCreateLock sync.Mutex spaceMembershipChecked bool + + BackfillQueue *BackfillQueue + backfillStatus BackfillStatus + backfillError error } var _ bridge.User = (*User)(nil) From 4a2976d69de7a62c489120461fa1586887a89de5 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Fri, 4 Aug 2023 09:17:56 -0600 Subject: [PATCH 11/13] ipc: add backfill-status command Signed-off-by: Sumner Evans --- main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/main.go b/main.go index 0a78ed82..93dcd770 100644 --- a/main.go +++ b/main.go @@ -196,6 +196,7 @@ func (br *IMBridge) Init() { br.IPC.SetHandler("merge-rooms", br.ipcMergeRooms) br.IPC.SetHandler("split-rooms", br.ipcSplitRooms) br.IPC.SetHandler("do-auto-merge", br.ipcDoAutoMerge) + br.IPC.SetHandler("backfill-status", br.ipcBackfillStatus) br.Log.Debugln("Initializing iMessage connector") var err error @@ -320,6 +321,10 @@ func (br *IMBridge) ipcDoAutoMerge(_ json.RawMessage) any { return struct{}{} } +func (br *IMBridge) ipcBackfillStatus(_ json.RawMessage) any { + return br.user.GetBackfillInfo() +} + type StartSyncRequest struct { AccessToken string `json:"access_token"` DeviceID id.DeviceID `json:"device_id"` From 30d0ebe6b43f5e2f652c4b2f6dc97e5d290f23b4 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Sun, 6 Aug 2023 09:52:27 -0600 Subject: [PATCH 12/13] db/message: fix GetEarliestTimestampInChat Signed-off-by: Sumner Evans --- database/message.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/database/message.go b/database/message.go index e1777309..abe66a1b 100644 --- a/database/message.go +++ b/database/message.go @@ -104,11 +104,14 @@ func (mq *MessageQuery) GetFirstInChat(chat string) *Message { func (mq *MessageQuery) GetEarliestTimestampInChat(chat string) (int64, error) { row := mq.db.QueryRow("SELECT MIN(timestamp) FROM message WHERE portal_guid=$1", chat) - var timestamp int64 + var timestamp sql.NullInt64 if err := row.Scan(×tamp); err != nil { return -1, err + } else if !timestamp.Valid { + return -1, nil + } else { + return timestamp.Int64, nil } - return timestamp, nil } func (mq *MessageQuery) MergePortalGUID(txn dbutil.Execable, to string, from ...string) int64 { From 1d92cde0e6f8a8458912d945542217a08b1fba52 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Sun, 6 Aug 2023 14:03:10 -0600 Subject: [PATCH 13/13] bridge/start: actually run IPC loop Signed-off-by: Sumner Evans --- main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 93dcd770..e2c19a30 100644 --- a/main.go +++ b/main.go @@ -475,8 +475,10 @@ func (br *IMBridge) Start() { br.user.initDoublePuppet() // If this bridge is in OnlyBackfill mode, then only run the backfill - // queue, and not the new message listeners. + // queue and the IPC listener, and not the new message listeners. if br.Config.Bridge.Backfill.OnlyBackfill { + br.ZLog.Debug().Msg("Starting IPC loop") + go br.IPC.Loop() br.user.runOnlyBackfillMode() return }