-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Fix the resuming of MongoDB change streams in log-based replication mode #31
base: main
Are you sure you want to change the base?
Conversation
…-mongodb into change-stream-improvements
@@ -179,7 +170,7 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess | |||
Record message objects. | |||
""" | |||
extracted_at: DateTime = record.pop("_sdc_extracted_at", utc_now()) | |||
pop_deselected_record_properties(record, self.schema, self.mask, self.logger) | |||
pop_deselected_record_properties(record, self.schema, self.mask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating the SDK version requires this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about this change, I was confident this was in practice a private API 😞
@@ -197,174 +188,190 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess | |||
) | |||
yield record_message | |||
|
|||
def get_records(self, context: dict | None) -> Iterable[dict]: | |||
"""Return a generator of record-type dictionary objects.""" | |||
def _get_records_incremental( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored slightly to split up the giant method, now have incremental and log-based get_records
implementation methods.
self.logger.critical(exception) | ||
raise exception | ||
|
||
while change_stream.alive and keep_open: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's hard to see in this diff with the other refactors, but note the with change_stream
in https://github.com/MeltanoLabs/tap-mongodb/pull/31/files#diff-e276f9374d5b67980ff79f28e37aa6417f4f456c8bdfbf49d709acd30ebb3f7aL289 that is absent here.
) from operation_failure | ||
elif ( | ||
self._connector.version < (4, 2) | ||
resume_strategy == ResumeStrategy.RESUME_AFTER | ||
and operation_failure.code == 286 | ||
and "as the resume point may no longer be in the oplog." in operation_failure.details["errmsg"] | ||
): | ||
self.logger.warning("Unable to resume change stream from resume token. Resetting resume token.") | ||
change_stream_options.pop("resume_after", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff with the refactor makes this hard to see, but lines 291–293 here are new - these three lines combined with the removal of the with change_stream
line (see above comment) are the meat of this fix. These lines allow the tap to gracefully re-open a change stream if the resume token from the state is not valid.
# fullDocument key is not present on delete events - if it is missing, fall back to documentKey | ||
# instead. If that is missing, pass None/null to avoid raising an error. | ||
document = record.get("fullDocument", record.get("documentKey", None)) | ||
object_id: Optional[ObjectId] = document["_id"] if document and "_id" in document else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this issue is fixed with the if document
check. Compare:
object_id: Optional[ObjectId] = document["_id"] if "_id" in document else None
to
object_id: Optional[ObjectId] = document["_id"] if document and "_id" in document else None
pyproject.toml
Outdated
@@ -28,7 +29,7 @@ black = "^23.1.0" | |||
pyupgrade = "^3.3.1" | |||
mypy = "^1.0.0" | |||
isort = "^5.11.5" | |||
singer-sdk = { version = "^0.31.1", extras = ["testing"] } | |||
singer-sdk = { version = "^0.35.0", extras = ["testing"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixes #27
@@ -43,7 +35,7 @@ def __init__( # pylint: disable=too-many-arguments | |||
self._datetime_conversion: str = datetime_conversion.upper() | |||
self._prefix: Optional[str] = prefix | |||
self._logger: Logger = getLogger(__name__) | |||
self._version: Optional[MongoVersion] = None | |||
self._version: MongoVersion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might make sense to move this annotation to the class level, ie
class MongoDBConnector:
_version: MongoVersion
@@ -179,7 +170,7 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess | |||
Record message objects. | |||
""" | |||
extracted_at: DateTime = record.pop("_sdc_extracted_at", utc_now()) | |||
pop_deselected_record_properties(record, self.schema, self.mask, self.logger) | |||
pop_deselected_record_properties(record, self.schema, self.mask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about this change, I was confident this was in practice a private API 😞
|
||
from pymongo import MongoClient | ||
from pymongo.database import Database | ||
from pymongo.errors import PyMongoError | ||
from singer_sdk._singerlib.catalog import CatalogEntry, MetadataMapping, Schema | ||
|
||
from tap_mongodb.schema import SCHEMA | ||
from tap_mongodb.types import MongoVersion | ||
|
||
if sys.version_info[:2] < (3, 8): | ||
from backports.cached_property import cached_property | ||
else: | ||
from functools import cached_property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're dropping support for Python 3.7 I think this can be simplified
@@ -102,7 +96,6 @@ def primary_keys(self) -> Optional[list[str]]: | |||
def primary_keys(self, new_value: list[str]) -> None: | |||
"""Set primary keys for the stream.""" | |||
self._primary_keys = new_value | |||
return self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com>
Moving this PR to Draft status - I'm still working on it. |
…-mongodb into change-stream-improvements
👍 @menzenski thanks for this PR! Ping me when it's ready :) |
@edgarrmondragon will do! I think I have a way to run a MongoDB container in docker as a replica set, which hopefully will let me add real actual tests of the log-based behavior. |
…mmy record scenario
Fixes #27
Fixes #29
Fixes #28
Fixes #30