diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 26074eca8c56..8af600561bea 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -51,7 +51,7 @@ dependencies { dataGeneratorImplementation ('com.github.javafaker:javafaker:1.0.2') { exclude module: 'snakeyaml' } dataGeneratorImplementation 'io.github.oshai:kotlin-logging-jvm:5.1.0' dataGeneratorImplementation 'org.jetbrains.kotlinx:kotlinx-cli-jvm:0.3.5' - dataGeneratorImplementation 'org.mongodb:mongodb-driver-sync:3.12.10' + dataGeneratorImplementation 'org.mongodb:mongodb-driver-sync:4.10.2' debeziumTestImplementation 'io.debezium:debezium-embedded:2.4.0.Final' debeziumTestImplementation 'io.debezium:debezium-connector-mongodb:2.4.0.Final' diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java index 61c826f8e28e..4930fb43b352 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/InitialSnapshotHandler.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.mongodb; import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.*; import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator; import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency; diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDatabase.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDatabase.java deleted file mode 100644 index 5a1459c639a9..000000000000 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDatabase.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.source.mongodb; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; -import com.mongodb.ConnectionString; -import com.mongodb.MongoConfigurationException; -import com.mongodb.ReadConcern; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoCursor; -import com.mongodb.client.MongoIterable; -import io.airbyte.cdk.db.AbstractDatabase; -import io.airbyte.cdk.db.mongodb.MongoUtils; -import io.airbyte.commons.exceptions.ConnectionErrorException; -import io.airbyte.commons.functional.CheckedFunction; -import io.airbyte.commons.util.MoreIterators; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; -import org.bson.BsonDocument; -import org.bson.Document; -import org.bson.conversions.Bson; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MongoDatabase extends AbstractDatabase implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(MongoDatabase.class); - private static final int BATCH_SIZE = 1000; - private static final String MONGO_RESERVED_COLLECTION_PREFIX = "system."; - - private final ConnectionString connectionString; - private final com.mongodb.client.MongoDatabase database; - private final MongoClient mongoClient; - - public MongoDatabase(final String connectionString, final String databaseName) { - try { - this.connectionString = new ConnectionString(connectionString); - mongoClient = MongoClients.create(this.connectionString); - database = mongoClient.getDatabase(databaseName); - } catch (final MongoConfigurationException e) { - LOGGER.error(e.getMessage(), e); - throw new ConnectionErrorException(String.valueOf(e.getCode()), e.getMessage(), e); - } catch (final Exception e) { - LOGGER.error(e.getMessage(), e); - throw new RuntimeException(e); - } - } - - @Override - public void close() throws Exception { - mongoClient.close(); - } - - public com.mongodb.client.MongoDatabase getDatabase() { - return database; - } - - public MongoIterable getDatabaseNames() { - return mongoClient.listDatabaseNames(); - } - - public Set getCollectionNames() { - final MongoIterable collectionNames = database.listCollectionNames(); - if (collectionNames == null) { - return Collections.EMPTY_SET; - } - return MoreIterators.toSet(database.listCollectionNames().iterator()).stream() - .filter(c -> !c.startsWith(MONGO_RESERVED_COLLECTION_PREFIX)).collect(Collectors.toSet()); - } - - public MongoCollection getCollection(final String collectionName) { - return database.getCollection(collectionName) - .withReadConcern(ReadConcern.MAJORITY); - } - - public MongoCollection getOrCreateNewCollection(final String collectionName) { - final Set collectionNames = MoreIterators.toSet(database.listCollectionNames().iterator()); - if (!collectionNames.contains(collectionName)) { - database.createCollection(collectionName); - } - return database.getCollection(collectionName); - } - - @VisibleForTesting - public MongoCollection createCollection(final String name) { - database.createCollection(name); - return database.getCollection(name); - } - - @VisibleForTesting - public String getName() { - return database.getName(); - } - - public Stream read(final String collectionName, final List columnNames, final Optional filter) { - try { - final MongoCollection collection = database.getCollection(collectionName); - final MongoCursor cursor = collection - .find(filter.orElse(new BsonDocument())) - .batchSize(BATCH_SIZE) - .cursor(); - - return getStream(cursor, (document) -> MongoUtils.toJsonNode(document, columnNames)) - .onClose(() -> { - try { - cursor.close(); - } catch (final Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - }); - - } catch (final Exception e) { - LOGGER.error("Exception attempting to read data from collection: {}, {}", collectionName, e.getMessage()); - throw new RuntimeException(e); - } - } - - private Stream getStream(final MongoCursor cursor, final CheckedFunction mapper) { - return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) { - - @Override - public boolean tryAdvance(final Consumer action) { - try { - final Document document = cursor.tryNext(); - if (document == null) { - return false; - } - action.accept(mapper.apply(document)); - return true; - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - }, false); - } - - public Object runCommand(Object any) { - return null; - } -} diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java index 6c2c74fb4e6a..6becadb3225d 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoUtil.java @@ -17,6 +17,7 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoIterable; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Projections; @@ -176,7 +177,7 @@ public static int getDebeziumEventQueueSize(final MongoDbSourceConfig config) { public static Optional getCollectionStatistics(final MongoClient mongoClient, final ConfiguredAirbyteStream stream) { try { final Map collStats = Map.of(MongoConstants.STORAGE_STATS_KEY, Map.of(), MongoConstants.COUNT_KEY, Map.of()); - final MongoDatabase mongoDatabase = (MongoDatabase) mongoClient.getDatabase(stream.getStream().getNamespace()); + final MongoDatabase mongoDatabase = mongoClient.getDatabase(stream.getStream().getNamespace()); final MongoCollection collection = mongoDatabase.getCollection(stream.getStream().getName()); final AggregateIterable output = collection.aggregate(List.of(new Document("$collStats", collStats))); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java index 487da0b00a7c..75104c5e5126 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceTest.java @@ -15,7 +15,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.mongodb.MongoCredential; import com.mongodb.MongoSecurityException; - import com.mongodb.client.*; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterType;