From bf7836ce7ba85e89ca3d87482e69e7e3f7c20f7f Mon Sep 17 00:00:00 2001 From: Squeaky Date: Tue, 12 Nov 2024 16:22:09 +0100 Subject: [PATCH] Revert "Batch and optimize large deletions (#971)" This reverts commit 229e42879be6b2b26153b7a0d99e41c0a8387b74. --- inbox/mailsync/backends/gmail.py | 6 +- inbox/mailsync/backends/imap/common.py | 203 +++++++++---------------- inbox/models/backends/imap.py | 22 +-- tests/imap/test_delete_handling.py | 6 +- tests/imap/test_labels.py | 34 +---- tests/imap/test_update_metadata.py | 8 +- 6 files changed, 89 insertions(+), 190 deletions(-) diff --git a/inbox/mailsync/backends/gmail.py b/inbox/mailsync/backends/gmail.py index 58e58047f..f0fbd97f6 100644 --- a/inbox/mailsync/backends/gmail.py +++ b/inbox/mailsync/backends/gmail.py @@ -281,11 +281,7 @@ def __deduplicate_message_object_creation(self, db_session, raw_messages, accoun uid.update_flags(raw_message.flags) uid.update_labels(raw_message.g_labels) common.update_message_metadata( - db_session, - account.id, - account.category_type, - message_obj, - uid.is_draft, + db_session, account, message_obj, uid.is_draft ) db_session.commit() diff --git a/inbox/mailsync/backends/imap/common.py b/inbox/mailsync/backends/imap/common.py index 917cd9051..27ca3a419 100644 --- a/inbox/mailsync/backends/imap/common.py +++ b/inbox/mailsync/backends/imap/common.py @@ -12,12 +12,11 @@ """ -from collections import defaultdict from datetime import datetime -from typing import List, Set, assert_never +from typing import List, Set from sqlalchemy import bindparam, desc -from sqlalchemy.orm import Query, Session, joinedload +from sqlalchemy.orm import Query, Session from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.sql.expression import func @@ -25,14 +24,11 @@ from inbox.crispin import RawMessage from inbox.logging import get_logger from inbox.models import Account, ActionLog, Folder, Message, MessageCategory -from inbox.models.account import CategoryType -from inbox.models.backends.imap import ImapFolderInfo, ImapUid, LabelItem +from inbox.models.backends.imap import ImapFolderInfo, ImapUid from inbox.models.category import Category -from inbox.models.label import Label from inbox.models.session import session_scope from inbox.models.util import reconcile_message from inbox.sqlalchemy_ext.util import get_db_api_cursor_with_query -from inbox.util.itert import chunk log = get_logger() @@ -81,35 +77,20 @@ def lastseenuid(account_id, session, folder_id): def update_message_metadata( - session: Session, - account_id: int, - category_type: CategoryType, - message: Message, - is_draft: bool, + session: Session, account: Account, message: Message, is_draft: bool ) -> None: """Update the message's metadata""" - # We call get_categories on each ImapUid to get the categories for the message. - # To avoid N+1 queries, we use joinedload to eagerly load the categories. - options = [joinedload(ImapUid.folder).joinedload(Folder.category)] - if category_type == "label": - options.append( - joinedload("labelitems") - .joinedload(LabelItem.label) - .joinedload(Label.category) - ) - # Sort imapuids in a way that the ones that were added later come first. # There are non-conforming IMAP servers that can list the same message thousands of times # in the same folder. This is a workaround to limit the memory pressure caused by such # servers. The metadata is meaningless for such messages anyway. latest_imapuids = ( imapuids_for_message_query( - account_id=account_id, + account_id=account.id, message_id=message.id, only_latest=IMAPUID_PER_MESSAGE_SANITY_LIMIT, ) .with_session(session) - .options(options) .all() ) @@ -118,13 +99,11 @@ def update_message_metadata( message.is_draft = is_draft latest_categories: List[Category] = [ - category - for imapuid in latest_imapuids - for category in imapuid.get_categories(category_type) + category for imapuid in latest_imapuids for category in imapuid.categories ] categories: Set[Category] - if category_type == "folder": + if account.category_type == "folder": # For generic IMAP we want to deterministically select the last category. # A message will always be in a single folder but it seems that for some # on-prem servers we are not able to reliably detect when a message is moved @@ -134,10 +113,10 @@ def update_message_metadata( # from the database. This makes it deterministic and more-correct because a message # is likely in a folder (and category) it was added to last. categories = {latest_categories[0]} if latest_categories else set() - elif category_type == "label": + elif account.category_type == "label": categories = set(latest_categories) else: - assert_never(category_type) + raise AssertionError("Unreachable") # Use a consistent time across creating categories, message updated_at # and the subsequent transaction that may be created. @@ -225,9 +204,7 @@ def update_metadata(account_id, folder_id, folder_role, new_flags, session): if changed: change_count += 1 is_draft = item.is_draft and folder_role in ["drafts", "all"] - update_message_metadata( - session, account.id, account.category_type, item.message, is_draft - ) + update_message_metadata(session, account, item.message, is_draft) session.commit() log.info("Updated UID metadata", changed=change_count, out_of=len(new_flags)) @@ -244,7 +221,7 @@ def imapuids_for_message_query( return query -def remove_deleted_uids(account_id: int, folder_id: int, uids: set[int]) -> None: +def remove_deleted_uids(account_id, folder_id, uids): """ Make sure you're holding a db write lock on the account. (We don't try to grab the lock in here in case the caller needs to put higher-level @@ -253,109 +230,69 @@ def remove_deleted_uids(account_id: int, folder_id: int, uids: set[int]) -> None """ if not uids: return - - # First, get the category type i.e. folder or label for the account - # and message_id corresponding to the uids. - with session_scope(account_id) as db_session: - category_type = Account.get(account_id, db_session).category_type - - message_ids_and_uid = ( - db_session.query(ImapUid.message_id, ImapUid.msg_uid) - .filter( - ImapUid.account_id == account_id, - ImapUid.folder_id == folder_id, - ImapUid.msg_uid.in_(uids), - ) - .with_hint( - ImapUid, "FORCE INDEX (ix_imapuid_account_id_folder_id_msg_uid_desc)" - ) - .all() - ) - - # Group uids by message_id. - uids_by_message_id = defaultdict(list) - for message_id, message_uid in message_ids_and_uid: - uids_by_message_id[message_id].append(message_uid) - - for message_id, message_uids in uids_by_message_id.items(): - # Delete the uids in batches of 1000. - # This eases the load on MySQL if we are processing a large number of uids. - # Some non-conforming IMAP servers can list the same message thousands of times. - for message_uid_batch in chunk(message_uids, 1000): - with session_scope(account_id) as db_session: - db_session.query(ImapUid).filter( + deleted_uid_count = 0 + for uid in uids: + # We do this one-uid-at-a-time because issuing many deletes within a + # single database transaction is problematic. But loading many + # objects into a session and then frequently calling commit() is also + # bad, because expiring objects and checking for revisions is O(number + # of objects in session), resulting in quadratic runtimes. + # Performance could perhaps be additionally improved by choosing a + # sane balance, e.g., operating on 10 or 100 uids or something at once. + with session_scope(account_id) as db_session: + imapuid = ( + db_session.query(ImapUid) + .filter( ImapUid.account_id == account_id, ImapUid.folder_id == folder_id, - ImapUid.msg_uid.in_(message_uid_batch), - ).with_hint( + ImapUid.msg_uid == uid, + ) + .with_hint( ImapUid, "FORCE INDEX (ix_imapuid_account_id_folder_id_msg_uid_desc)", - ).delete( - synchronize_session=False ) - db_session.commit() - - update_message_after_uid_deletion( - account_id=account_id, message_id=message_id, category_type=category_type - ) - - log.info( - "Deleted expunged UIDs", - uid_count=len(uids), - message_count=len(uids_by_message_id), - ) - - -def update_message_after_uid_deletion( - *, account_id: int, message_id: int, category_type: CategoryType -) -> None: - """ - Update the message after deleting UIDs. - - If the message has no more UIDs, and it's a draft, delete it immediately. - If the message has no more UIDs, and it's not a draft, mark it for deletion. - - Args: - account_id: The account ID. - message_id: The message ID. - category_type: The category type for the account i.e. folder or label. - """ - with session_scope(account_id) as db_session: - message = db_session.query(Message).get(message_id) - if message is None: - return - - message_imapuids_exist = db_session.query( - imapuids_for_message_query( - account_id=account_id, message_id=message.id - ).exists() - ).scalar() - - if not message_imapuids_exist and message.is_draft: - # Synchronously delete drafts. - thread = message.thread - if thread is not None: - thread.messages.remove(message) - # Thread.messages relationship is versioned i.e. extra - # logic gets executed on remove call. - # This early flush is needed so the configure_versioning logic - # in inbox.model.sessions can work reliably on newer versions of - # SQLAlchemy. - db_session.flush() - db_session.delete(message) - if thread is not None and not thread.messages: - db_session.delete(thread) - else: - update_message_metadata( - db_session, account_id, category_type, message, message.is_draft + .first() ) - if not message_imapuids_exist: - # But don't outright delete messages. Just mark them as - # 'deleted' and wait for the asynchronous - # dangling-message-collector to delete them. - message.mark_for_deletion() - - db_session.commit() + if imapuid is None: + continue + deleted_uid_count += 1 + message = imapuid.message + + db_session.delete(imapuid) + + if message is not None: + message_imapuids_exist = db_session.query( + imapuids_for_message_query( + account_id=account_id, message_id=message.id + ).exists() + ).scalar() + + if not message_imapuids_exist and message.is_draft: + # Synchronously delete drafts. + thread = message.thread + if thread is not None: + thread.messages.remove(message) + # Thread.messages relationship is versioned i.e. extra + # logic gets executed on remove call. + # This early flush is needed so the configure_versioning logic + # in inbox.model.sessions can work reliably on newer versions of + # SQLAlchemy. + db_session.flush() + db_session.delete(message) + if thread is not None and not thread.messages: + db_session.delete(thread) + else: + account = Account.get(account_id, db_session) + update_message_metadata( + db_session, account, message, message.is_draft + ) + if not message_imapuids_exist: + # But don't outright delete messages. Just mark them as + # 'deleted' and wait for the asynchronous + # dangling-message-collector to delete them. + message.mark_for_deletion() + db_session.commit() + log.info("Deleted expunged UIDs", count=deleted_uid_count) def get_folder_info(account_id, session, folder_name): @@ -416,9 +353,7 @@ def create_imap_message( is_draft = imapuid.is_draft and ( folder.canonical_name == "drafts" or folder.canonical_name == "all" ) - update_message_metadata( - db_session, account.id, account.category_type, new_message, is_draft - ) + update_message_metadata(db_session, account, new_message, is_draft) update_contacts_from_message(db_session, new_message, account.namespace.id) diff --git a/inbox/models/backends/imap.py b/inbox/models/backends/imap.py index 3dcd32cb6..a5bf29515 100644 --- a/inbox/models/backends/imap.py +++ b/inbox/models/backends/imap.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import List, assert_never +from typing import List, Set from sqlalchemy import ( BigInteger, @@ -20,7 +20,7 @@ from sqlalchemy.sql.expression import false from inbox.logging import get_logger -from inbox.models.account import Account, CategoryType +from inbox.models.account import Account from inbox.models.base import MailSyncBase from inbox.models.category import Category from inbox.models.folder import Folder @@ -209,19 +209,11 @@ def update_labels(self, new_labels: List[str]) -> None: def namespace(self): return self.imapaccount.namespace - def get_categories(self, category_type: CategoryType) -> set[Category]: - """ - Returns the categories this uid belongs to. - - Args: - category_type: The category type for the account this uid belongs to. - """ - if category_type == "label": - return {label.category for label in self.labels} | {self.folder.category} - elif category_type == "folder": - return {self.folder.category} - else: - assert_never(category_type) + @property + def categories(self) -> Set[Category]: + categories = {label.category for label in self.labels} + categories.add(self.folder.category) + return categories __table_args__ = ( UniqueConstraint("folder_id", "msg_uid", "account_id"), diff --git a/tests/imap/test_delete_handling.py b/tests/imap/test_delete_handling.py index ac9f90194..9c2610ecd 100644 --- a/tests/imap/test_delete_handling.py +++ b/tests/imap/test_delete_handling.py @@ -37,7 +37,7 @@ def test_messages_deleted_asynchronously( db.session, ) assert "label" in [cat.display_name for cat in message.categories] - remove_deleted_uids(default_account.id, folder.id, {msg_uid}) + remove_deleted_uids(default_account.id, folder.id, [msg_uid]) db.session.expire_all() assert abs((message.deleted_at - datetime.utcnow()).total_seconds()) < 2 # Check that message categories do get updated synchronously. @@ -50,7 +50,7 @@ def test_drafts_deleted_synchronously( message.is_draft = True db.session.commit() msg_uid = imapuid.msg_uid - remove_deleted_uids(default_account.id, folder.id, {msg_uid}) + remove_deleted_uids(default_account.id, folder.id, [msg_uid]) db.session.expire_all() with pytest.raises(ObjectDeletedError): message.id @@ -72,7 +72,7 @@ def test_deleting_from_a_message_with_multiple_uids( assert len(message.imapuids) == 2 - remove_deleted_uids(default_account.id, inbox_folder.id, {2222}) + remove_deleted_uids(default_account.id, inbox_folder.id, [2222]) db.session.expire_all() assert ( diff --git a/tests/imap/test_labels.py b/tests/imap/test_labels.py index 3d9d8713c..99d36d641 100644 --- a/tests/imap/test_labels.py +++ b/tests/imap/test_labels.py @@ -32,13 +32,7 @@ def folder_and_message_maps(db, default_account): # Create a message in the folder message = add_fake_message(db.session, default_account.namespace.id, thread) add_fake_imapuid(db.session, default_account.id, message, folder, 13) - update_message_metadata( - db.session, - default_account.id, - default_account.category_type, - message, - False, - ) + update_message_metadata(db.session, default_account, message, False) db.session.commit() folder_map[name] = folder message_map[name] = message @@ -48,18 +42,11 @@ def folder_and_message_maps(db, default_account): def add_inbox_label(db, default_account, message): assert len(message.imapuids) == 1 imapuid = message.imapuids[0] - assert {c.name for c in imapuid.get_categories(default_account.category_type)} == { - "all" - } + assert {c.name for c in imapuid.categories} == {"all"} imapuid.update_labels(["\\Inbox"]) db.session.commit() - assert {c.name for c in imapuid.get_categories(default_account.category_type)} == { - "all", - "inbox", - } - update_message_metadata( - db.session, default_account.id, default_account.category_type, message, False - ) + assert {c.name for c in imapuid.categories} == {"all", "inbox"} + update_message_metadata(db.session, default_account, message, False) db.session.commit() return message @@ -67,18 +54,11 @@ def add_inbox_label(db, default_account, message): def add_custom_label(db, default_account, message): assert len(message.imapuids) == 1 imapuid = message.imapuids[0] - existing = [c.name for c in imapuid.get_categories(default_account.category_type)][ - 0 - ] + existing = [c.name for c in imapuid.categories][0] imapuid.update_labels(["<3"]) db.session.commit() - assert {c.name for c in imapuid.get_categories(default_account.category_type)} == { - existing, - "", - } - update_message_metadata( - db.session, default_account.id, default_account.category_type, message, False - ) + assert {c.name for c in imapuid.categories} == {existing, ""} + update_message_metadata(db.session, default_account, message, False) db.session.commit() return message diff --git a/tests/imap/test_update_metadata.py b/tests/imap/test_update_metadata.py index 5d9434c8c..1e3226c89 100644 --- a/tests/imap/test_update_metadata.py +++ b/tests/imap/test_update_metadata.py @@ -68,9 +68,7 @@ def test_update_categories_when_actionlog_entry_missing( ): message.categories_changes = True db.session.commit() - update_message_metadata( - db.session, imapuid.account.id, imapuid.account.category_type, message, False - ) + update_message_metadata(db.session, imapuid.account, message, False) assert message.categories == {imapuid.folder.category} @@ -114,9 +112,7 @@ def test_categories_from_multiple_imap_folders( imapuid.updated_at = imapuid.updated_at + datetime.timedelta(seconds=delay) db.session.commit() - update_message_metadata( - db.session, generic_account.id, generic_account.category_type, message, False - ) + update_message_metadata(db.session, generic_account, message, False) assert {category.name for category in message.categories} == categories delete_imapuids(db.session)