Skip to content

Commit

Permalink
Edit
Browse files Browse the repository at this point in the history
  • Loading branch information
FilahAnas committed Mar 28, 2024
1 parent a948c98 commit 9a1bad7
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/mongodb-source-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Tag Docker image
run: docker tag airbyte/source-mongodb-v2:dev sibengineering/airbyte-source-mongodb:1.3.0.8
run: docker tag airbyte/source-mongodb-v2:dev sibengineering/airbyte-source-mongodb:1.3.0.9

- name: Push Docker image to Docker Hub
run: docker push sibengineering/airbyte-source-mongodb:1.3.0.8
run: docker push sibengineering/airbyte-source-mongodb:1.3.0.9
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

package io.airbyte.integrations.source.mongodb;

import com.mongodb.ReadConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.*;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
Expand Down Expand Up @@ -51,7 +49,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
.stream()
.map(airbyteStream -> {
final var collectionName = airbyteStream.getStream().getName();
final var collection = database.getCollection(collectionName).withReadConcern(ReadConcern.LOCAL);
final var collection = database.getCollection(collectionName);
final var fields = Projections.fields(Projections.include(CatalogHelpers.getTopLevelFieldNames(airbyteStream).stream().toList()));

final var idTypes = aggregateIdField(collection);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.ConnectionString;
import com.mongodb.MongoConfigurationException;
import com.mongodb.ReadConcern;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoIterable;
import io.airbyte.cdk.db.AbstractDatabase;
import io.airbyte.cdk.db.mongodb.MongoUtils;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.util.MoreIterators;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDatabase extends AbstractDatabase implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDatabase.class);
private static final int BATCH_SIZE = 1000;
private static final String MONGO_RESERVED_COLLECTION_PREFIX = "system.";

private final ConnectionString connectionString;
private final com.mongodb.client.MongoDatabase database;
private final MongoClient mongoClient;

public MongoDatabase(final String connectionString, final String databaseName) {
try {
this.connectionString = new ConnectionString(connectionString);
mongoClient = MongoClients.create(this.connectionString);
database = mongoClient.getDatabase(databaseName);
} catch (final MongoConfigurationException e) {
LOGGER.error(e.getMessage(), e);
throw new ConnectionErrorException(String.valueOf(e.getCode()), e.getMessage(), e);
} catch (final Exception e) {
LOGGER.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
mongoClient.close();
}

public com.mongodb.client.MongoDatabase getDatabase() {
return database;
}

public MongoIterable<String> getDatabaseNames() {
return mongoClient.listDatabaseNames();
}

public Set<String> getCollectionNames() {
final MongoIterable<String> collectionNames = database.listCollectionNames();
if (collectionNames == null) {
return Collections.EMPTY_SET;
}
return MoreIterators.toSet(database.listCollectionNames().iterator()).stream()
.filter(c -> !c.startsWith(MONGO_RESERVED_COLLECTION_PREFIX)).collect(Collectors.toSet());
}

public MongoCollection<Document> getCollection(final String collectionName) {
return database.getCollection(collectionName)
.withReadConcern(ReadConcern.MAJORITY);
}

public MongoCollection<Document> getOrCreateNewCollection(final String collectionName) {
final Set<String> collectionNames = MoreIterators.toSet(database.listCollectionNames().iterator());
if (!collectionNames.contains(collectionName)) {
database.createCollection(collectionName);
}
return database.getCollection(collectionName);
}

@VisibleForTesting
public MongoCollection<Document> createCollection(final String name) {
database.createCollection(name);
return database.getCollection(name);
}

@VisibleForTesting
public String getName() {
return database.getName();
}

public Stream<JsonNode> read(final String collectionName, final List<String> columnNames, final Optional<Bson> filter) {
try {
final MongoCollection<Document> collection = database.getCollection(collectionName);
final MongoCursor<Document> cursor = collection
.find(filter.orElse(new BsonDocument()))
.batchSize(BATCH_SIZE)
.cursor();

return getStream(cursor, (document) -> MongoUtils.toJsonNode(document, columnNames))
.onClose(() -> {
try {
cursor.close();
} catch (final Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
});

} catch (final Exception e) {
LOGGER.error("Exception attempting to read data from collection: {}, {}", collectionName, e.getMessage());
throw new RuntimeException(e);
}
}

private Stream<JsonNode> getStream(final MongoCursor<Document> cursor, final CheckedFunction<Document, JsonNode, Exception> mapper) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {

@Override
public boolean tryAdvance(final Consumer<? super JsonNode> action) {
try {
final Document document = cursor.tryNext();
if (document == null) {
return false;
}
action.accept(mapper.apply(document));
return true;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

}, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.ReadConcern;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
Expand Down Expand Up @@ -179,7 +178,7 @@ public static Optional<CollectionStatistics> getCollectionStatistics(final Mongo
try {
final Map<String, Object> collStats = Map.of(MongoConstants.STORAGE_STATS_KEY, Map.of(), MongoConstants.COUNT_KEY, Map.of());
final MongoDatabase mongoDatabase = mongoClient.getDatabase(stream.getStream().getNamespace());
final MongoCollection<Document> collection = mongoDatabase.getCollection(stream.getStream().getName()).withReadConcern(ReadConcern.LOCAL);
final MongoCollection<Document> collection = mongoDatabase.getCollection(stream.getStream().getName());
final AggregateIterable<Document> output = collection.aggregate(List.of(new Document("$collStats", collStats)));

try (final MongoCursor<Document> cursor = output.allowDiskUse(true).cursor()) {
Expand Down Expand Up @@ -282,7 +281,7 @@ private static Optional<AirbyteStream> discoverFields(final String collectionNam
* This is an attempt to "survey" the documents in the collection for variance in the schema keys.
*/
final Set<Field> discoveredFields;
final MongoCollection<Document> mongoCollection = mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL);
final MongoCollection<Document> mongoCollection = mongoClient.getDatabase(databaseName).getCollection(collectionName);
if (isSchemaEnforced) {
discoveredFields = new HashSet<>(getFieldsInCollection(mongoCollection, sampleSize));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Iterables;
import com.mongodb.ReadConcern;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Updates;
Expand Down Expand Up @@ -125,16 +125,16 @@ protected void setupEnvironment(final TestDestinationEnv testEnv) throws Excepti
}

private void createTestCollections(final MongoClient mongoClient) {
mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection1Name).withReadConcern(ReadConcern.LOCAL).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection2Name).withReadConcern(ReadConcern.LOCAL).drop();
mongoClient.getDatabase(databaseName).getCollection(collectionName).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection1Name).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection2Name).drop();
mongoClient.getDatabase(databaseName).createCollection(collectionName);
mongoClient.getDatabase(databaseName).createCollection(otherCollection1Name);
mongoClient.getDatabase(databaseName).createCollection(otherCollection2Name);
}

private void insertTestData(final MongoClient mongoClient) {
final MongoCollection<Document> collection = mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL);
final MongoCollection<Document> collection = mongoClient.getDatabase(databaseName).getCollection(collectionName);
final var objectDocument =
new Document("testObject", new Document(NAME_FIELD, "subName").append("testField1", "testField1").append(INT_TEST_FIELD, 10)
.append("thirdLevelDocument", new Document("data", "someData").append("intData", 1)));
Expand All @@ -159,9 +159,9 @@ private void insertTestData(final MongoClient mongoClient) {

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection1Name).withReadConcern(ReadConcern.LOCAL).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection2Name).withReadConcern(ReadConcern.LOCAL).drop();
mongoClient.getDatabase(databaseName).getCollection(collectionName).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection1Name).drop();
mongoClient.getDatabase(databaseName).getCollection(otherCollection2Name).drop();
mongoClient.getDatabase(databaseName).drop();
mongoClient.close();
recordCount = 0;
Expand Down Expand Up @@ -350,7 +350,7 @@ void testInsertUpdateDeleteIncrementalSync() throws Exception {
validateAllStreamsComplete(stateMessages, List.of(
new StreamDescriptor().withName(collectionName).withNamespace(databaseName)));

final var result = mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL).insertOne(createDocument(1));
final var result = mongoClient.getDatabase(databaseName).getCollection(collectionName).insertOne(createDocument(1));
final var insertedId = result.getInsertedId();

// Start another sync that finds the insert change
Expand All @@ -369,7 +369,7 @@ void testInsertUpdateDeleteIncrementalSync() throws Exception {
new StreamDescriptor().withName(collectionName).withNamespace(databaseName)));

final var idFilter = new Document(DOCUMENT_ID_FIELD, insertedId);
mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL).updateOne(idFilter, Updates.combine(Updates.set("newField", "new")));
mongoClient.getDatabase(databaseName).getCollection(collectionName).updateOne(idFilter, Updates.combine(Updates.set("newField", "new")));

// Start another sync that finds the update change
final List<AirbyteMessage> messages3 = runRead(configuredCatalog, Jsons.jsonNode(List.of(lastStateMessage2)));
Expand All @@ -386,7 +386,7 @@ void testInsertUpdateDeleteIncrementalSync() throws Exception {
validateAllStreamsComplete(stateMessages3, List.of(
new StreamDescriptor().withName(collectionName).withNamespace(databaseName)));

mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL).deleteOne(idFilter);
mongoClient.getDatabase(databaseName).getCollection(collectionName).deleteOne(idFilter);

// Start another sync that finds the delete change
final List<AirbyteMessage> messages4 = runRead(configuredCatalog, Jsons.jsonNode(List.of(lastStateMessage3)));
Expand Down Expand Up @@ -590,9 +590,9 @@ private List<AirbyteStateMessage> filterStateMessages(final List<AirbyteMessage>
}

private void insertData(final String databaseName, final String collectionName, final int numberOfDocuments) {
mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL).drop();
mongoClient.getDatabase(databaseName).getCollection(collectionName).drop();
mongoClient.getDatabase(databaseName).createCollection(collectionName);
final MongoCollection<Document> collection = mongoClient.getDatabase(databaseName).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL);
final MongoCollection<Document> collection = mongoClient.getDatabase(databaseName).getCollection(collectionName);
collection
.insertMany(IntStream.range(0, numberOfDocuments).boxed().map(this::createDocument).toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object MongoDbInsertClient {
}
mongoClient
.getDatabase(databaseName)
.getCollection(collectionName).withReadConcern(ReadConcern.LOCAL)
.getCollection(collectionName)
.insertMany(documents)
documents.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableSet;
import com.mongodb.ReadConcern;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
Expand Down Expand Up @@ -358,7 +358,7 @@ private void assertConfiguredFieldsEqualsRecordDataFields(final Set<String> conf
}

private void insertDocuments(final String collectionName, final List<Document> documents) {
final MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(collectionName).withReadConcern(ReadConcern.LOCAL);
final MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(collectionName);
collection.insertMany(documents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.MongoCredential;
import com.mongodb.MongoSecurityException;
import com.mongodb.ReadConcern;

import com.mongodb.client.*;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
Expand Down Expand Up @@ -190,7 +190,7 @@ void testDiscoverOperation() throws IOException {
when(cursor.next()).thenReturn(schemaDiscoveryResponses.get(0), schemaDiscoveryResponses.get(1));
when(aggregateIterable.cursor()).thenReturn(cursor);
when(mongoCollection.aggregate(any())).thenReturn(aggregateIterable);
when(mongoDatabase.getCollection(any()).withReadConcern(ReadConcern.LOCAL)).thenReturn(mongoCollection);
when(mongoDatabase.getCollection(any())).thenReturn(mongoCollection);
when(mongoDatabase.runCommand(any())).thenReturn(authorizedCollectionsResponse);
when(mongoClient.getDatabase(any())).thenReturn(mongoDatabase);
when(aggregateIterable.allowDiskUse(anyBoolean())).thenReturn(aggregateIterable);
Expand Down Expand Up @@ -247,7 +247,7 @@ void testDiscoverOperationWithMissingConfiguration() throws IOException {
when(cursor.next()).thenReturn(schemaDiscoveryResponses.get(0), schemaDiscoveryResponses.get(1));
when(aggregateIterable.cursor()).thenReturn(cursor);
when(mongoCollection.aggregate(any())).thenReturn(aggregateIterable);
when(mongoDatabase.getCollection(any()).withReadConcern(ReadConcern.LOCAL)).thenReturn(mongoCollection);
when(mongoDatabase.getCollection(any())).thenReturn(mongoCollection);
when(mongoDatabase.runCommand(any())).thenReturn(authorizedCollectionsResponse);
when(mongoClient.getDatabase(any())).thenReturn(mongoDatabase);

Expand Down
Loading

0 comments on commit 9a1bad7

Please sign in to comment.