Skip to content

Commit

Permalink
Revert "Batch and optimize large deletions (#971)"
Browse files Browse the repository at this point in the history
This reverts commit 229e428.
  • Loading branch information
squeaky-pl committed Nov 12, 2024
1 parent c6cb261 commit bf7836c
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 190 deletions.
6 changes: 1 addition & 5 deletions inbox/mailsync/backends/gmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,7 @@ def __deduplicate_message_object_creation(self, db_session, raw_messages, accoun
uid.update_flags(raw_message.flags)
uid.update_labels(raw_message.g_labels)
common.update_message_metadata(
db_session,
account.id,
account.category_type,
message_obj,
uid.is_draft,
db_session, account, message_obj, uid.is_draft
)
db_session.commit()

Expand Down
203 changes: 69 additions & 134 deletions inbox/mailsync/backends/imap/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,23 @@
"""

from collections import defaultdict
from datetime import datetime
from typing import List, Set, assert_never
from typing import List, Set

from sqlalchemy import bindparam, desc
from sqlalchemy.orm import Query, Session, joinedload
from sqlalchemy.orm import Query, Session
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql.expression import func

from inbox.contacts.processing import update_contacts_from_message
from inbox.crispin import RawMessage
from inbox.logging import get_logger
from inbox.models import Account, ActionLog, Folder, Message, MessageCategory
from inbox.models.account import CategoryType
from inbox.models.backends.imap import ImapFolderInfo, ImapUid, LabelItem
from inbox.models.backends.imap import ImapFolderInfo, ImapUid
from inbox.models.category import Category
from inbox.models.label import Label
from inbox.models.session import session_scope
from inbox.models.util import reconcile_message
from inbox.sqlalchemy_ext.util import get_db_api_cursor_with_query
from inbox.util.itert import chunk

log = get_logger()

Expand Down Expand Up @@ -81,35 +77,20 @@ def lastseenuid(account_id, session, folder_id):


def update_message_metadata(
session: Session,
account_id: int,
category_type: CategoryType,
message: Message,
is_draft: bool,
session: Session, account: Account, message: Message, is_draft: bool
) -> None:
"""Update the message's metadata"""
# We call get_categories on each ImapUid to get the categories for the message.
# To avoid N+1 queries, we use joinedload to eagerly load the categories.
options = [joinedload(ImapUid.folder).joinedload(Folder.category)]
if category_type == "label":
options.append(
joinedload("labelitems")
.joinedload(LabelItem.label)
.joinedload(Label.category)
)

# Sort imapuids in a way that the ones that were added later come first.
# There are non-conforming IMAP servers that can list the same message thousands of times
# in the same folder. This is a workaround to limit the memory pressure caused by such
# servers. The metadata is meaningless for such messages anyway.
latest_imapuids = (
imapuids_for_message_query(
account_id=account_id,
account_id=account.id,
message_id=message.id,
only_latest=IMAPUID_PER_MESSAGE_SANITY_LIMIT,
)
.with_session(session)
.options(options)
.all()
)

Expand All @@ -118,13 +99,11 @@ def update_message_metadata(
message.is_draft = is_draft

latest_categories: List[Category] = [
category
for imapuid in latest_imapuids
for category in imapuid.get_categories(category_type)
category for imapuid in latest_imapuids for category in imapuid.categories
]

categories: Set[Category]
if category_type == "folder":
if account.category_type == "folder":
# For generic IMAP we want to deterministically select the last category.
# A message will always be in a single folder but it seems that for some
# on-prem servers we are not able to reliably detect when a message is moved
Expand All @@ -134,10 +113,10 @@ def update_message_metadata(
# from the database. This makes it deterministic and more-correct because a message
# is likely in a folder (and category) it was added to last.
categories = {latest_categories[0]} if latest_categories else set()
elif category_type == "label":
elif account.category_type == "label":
categories = set(latest_categories)
else:
assert_never(category_type)
raise AssertionError("Unreachable")

# Use a consistent time across creating categories, message updated_at
# and the subsequent transaction that may be created.
Expand Down Expand Up @@ -225,9 +204,7 @@ def update_metadata(account_id, folder_id, folder_role, new_flags, session):
if changed:
change_count += 1
is_draft = item.is_draft and folder_role in ["drafts", "all"]
update_message_metadata(
session, account.id, account.category_type, item.message, is_draft
)
update_message_metadata(session, account, item.message, is_draft)
session.commit()
log.info("Updated UID metadata", changed=change_count, out_of=len(new_flags))

Expand All @@ -244,7 +221,7 @@ def imapuids_for_message_query(
return query


def remove_deleted_uids(account_id: int, folder_id: int, uids: set[int]) -> None:
def remove_deleted_uids(account_id, folder_id, uids):
"""
Make sure you're holding a db write lock on the account. (We don't try
to grab the lock in here in case the caller needs to put higher-level
Expand All @@ -253,109 +230,69 @@ def remove_deleted_uids(account_id: int, folder_id: int, uids: set[int]) -> None
"""
if not uids:
return

# First, get the category type i.e. folder or label for the account
# and message_id corresponding to the uids.
with session_scope(account_id) as db_session:
category_type = Account.get(account_id, db_session).category_type

message_ids_and_uid = (
db_session.query(ImapUid.message_id, ImapUid.msg_uid)
.filter(
ImapUid.account_id == account_id,
ImapUid.folder_id == folder_id,
ImapUid.msg_uid.in_(uids),
)
.with_hint(
ImapUid, "FORCE INDEX (ix_imapuid_account_id_folder_id_msg_uid_desc)"
)
.all()
)

# Group uids by message_id.
uids_by_message_id = defaultdict(list)
for message_id, message_uid in message_ids_and_uid:
uids_by_message_id[message_id].append(message_uid)

for message_id, message_uids in uids_by_message_id.items():
# Delete the uids in batches of 1000.
# This eases the load on MySQL if we are processing a large number of uids.
# Some non-conforming IMAP servers can list the same message thousands of times.
for message_uid_batch in chunk(message_uids, 1000):
with session_scope(account_id) as db_session:
db_session.query(ImapUid).filter(
deleted_uid_count = 0
for uid in uids:
# We do this one-uid-at-a-time because issuing many deletes within a
# single database transaction is problematic. But loading many
# objects into a session and then frequently calling commit() is also
# bad, because expiring objects and checking for revisions is O(number
# of objects in session), resulting in quadratic runtimes.
# Performance could perhaps be additionally improved by choosing a
# sane balance, e.g., operating on 10 or 100 uids or something at once.
with session_scope(account_id) as db_session:
imapuid = (
db_session.query(ImapUid)
.filter(
ImapUid.account_id == account_id,
ImapUid.folder_id == folder_id,
ImapUid.msg_uid.in_(message_uid_batch),
).with_hint(
ImapUid.msg_uid == uid,
)
.with_hint(
ImapUid,
"FORCE INDEX (ix_imapuid_account_id_folder_id_msg_uid_desc)",
).delete(
synchronize_session=False
)
db_session.commit()

update_message_after_uid_deletion(
account_id=account_id, message_id=message_id, category_type=category_type
)

log.info(
"Deleted expunged UIDs",
uid_count=len(uids),
message_count=len(uids_by_message_id),
)


def update_message_after_uid_deletion(
*, account_id: int, message_id: int, category_type: CategoryType
) -> None:
"""
Update the message after deleting UIDs.
If the message has no more UIDs, and it's a draft, delete it immediately.
If the message has no more UIDs, and it's not a draft, mark it for deletion.
Args:
account_id: The account ID.
message_id: The message ID.
category_type: The category type for the account i.e. folder or label.
"""
with session_scope(account_id) as db_session:
message = db_session.query(Message).get(message_id)
if message is None:
return

message_imapuids_exist = db_session.query(
imapuids_for_message_query(
account_id=account_id, message_id=message.id
).exists()
).scalar()

if not message_imapuids_exist and message.is_draft:
# Synchronously delete drafts.
thread = message.thread
if thread is not None:
thread.messages.remove(message)
# Thread.messages relationship is versioned i.e. extra
# logic gets executed on remove call.
# This early flush is needed so the configure_versioning logic
# in inbox.model.sessions can work reliably on newer versions of
# SQLAlchemy.
db_session.flush()
db_session.delete(message)
if thread is not None and not thread.messages:
db_session.delete(thread)
else:
update_message_metadata(
db_session, account_id, category_type, message, message.is_draft
.first()
)
if not message_imapuids_exist:
# But don't outright delete messages. Just mark them as
# 'deleted' and wait for the asynchronous
# dangling-message-collector to delete them.
message.mark_for_deletion()

db_session.commit()
if imapuid is None:
continue
deleted_uid_count += 1
message = imapuid.message

db_session.delete(imapuid)

if message is not None:
message_imapuids_exist = db_session.query(
imapuids_for_message_query(
account_id=account_id, message_id=message.id
).exists()
).scalar()

if not message_imapuids_exist and message.is_draft:
# Synchronously delete drafts.
thread = message.thread
if thread is not None:
thread.messages.remove(message)
# Thread.messages relationship is versioned i.e. extra
# logic gets executed on remove call.
# This early flush is needed so the configure_versioning logic
# in inbox.model.sessions can work reliably on newer versions of
# SQLAlchemy.
db_session.flush()
db_session.delete(message)
if thread is not None and not thread.messages:
db_session.delete(thread)
else:
account = Account.get(account_id, db_session)
update_message_metadata(
db_session, account, message, message.is_draft
)
if not message_imapuids_exist:
# But don't outright delete messages. Just mark them as
# 'deleted' and wait for the asynchronous
# dangling-message-collector to delete them.
message.mark_for_deletion()
db_session.commit()
log.info("Deleted expunged UIDs", count=deleted_uid_count)


def get_folder_info(account_id, session, folder_name):
Expand Down Expand Up @@ -416,9 +353,7 @@ def create_imap_message(
is_draft = imapuid.is_draft and (
folder.canonical_name == "drafts" or folder.canonical_name == "all"
)
update_message_metadata(
db_session, account.id, account.category_type, new_message, is_draft
)
update_message_metadata(db_session, account, new_message, is_draft)

update_contacts_from_message(db_session, new_message, account.namespace.id)

Expand Down
22 changes: 7 additions & 15 deletions inbox/models/backends/imap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from datetime import datetime
from typing import List, assert_never
from typing import List, Set

from sqlalchemy import (
BigInteger,
Expand All @@ -20,7 +20,7 @@
from sqlalchemy.sql.expression import false

from inbox.logging import get_logger
from inbox.models.account import Account, CategoryType
from inbox.models.account import Account
from inbox.models.base import MailSyncBase
from inbox.models.category import Category
from inbox.models.folder import Folder
Expand Down Expand Up @@ -209,19 +209,11 @@ def update_labels(self, new_labels: List[str]) -> None:
def namespace(self):
return self.imapaccount.namespace

def get_categories(self, category_type: CategoryType) -> set[Category]:
"""
Returns the categories this uid belongs to.
Args:
category_type: The category type for the account this uid belongs to.
"""
if category_type == "label":
return {label.category for label in self.labels} | {self.folder.category}
elif category_type == "folder":
return {self.folder.category}
else:
assert_never(category_type)
@property
def categories(self) -> Set[Category]:
categories = {label.category for label in self.labels}
categories.add(self.folder.category)
return categories

__table_args__ = (
UniqueConstraint("folder_id", "msg_uid", "account_id"),
Expand Down
6 changes: 3 additions & 3 deletions tests/imap/test_delete_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_messages_deleted_asynchronously(
db.session,
)
assert "label" in [cat.display_name for cat in message.categories]
remove_deleted_uids(default_account.id, folder.id, {msg_uid})
remove_deleted_uids(default_account.id, folder.id, [msg_uid])
db.session.expire_all()
assert abs((message.deleted_at - datetime.utcnow()).total_seconds()) < 2
# Check that message categories do get updated synchronously.
Expand All @@ -50,7 +50,7 @@ def test_drafts_deleted_synchronously(
message.is_draft = True
db.session.commit()
msg_uid = imapuid.msg_uid
remove_deleted_uids(default_account.id, folder.id, {msg_uid})
remove_deleted_uids(default_account.id, folder.id, [msg_uid])
db.session.expire_all()
with pytest.raises(ObjectDeletedError):
message.id
Expand All @@ -72,7 +72,7 @@ def test_deleting_from_a_message_with_multiple_uids(

assert len(message.imapuids) == 2

remove_deleted_uids(default_account.id, inbox_folder.id, {2222})
remove_deleted_uids(default_account.id, inbox_folder.id, [2222])
db.session.expire_all()

assert (
Expand Down
Loading

0 comments on commit bf7836c

Please sign in to comment.