Skip to content

Commit

Permalink
Support migrating legacy collections automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesChenX committed Nov 3, 2024
1 parent cea5bd3 commit 01b09eb
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package im.turms.server.common.storage.mongo;

import org.bson.BsonArray;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.types.MaxKey;
Expand All @@ -38,5 +39,6 @@ private BsonPool() {
public static final BsonString BSON_STRING_EMPTY = new BsonString("");
public static final MaxKey MAX_KEY = new MaxKey();
public static final MinKey MIN_KEY = new MinKey();
public static final BsonArray BSON_ARRAY_EMPTY = new BsonArray();

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import lombok.Getter;
import org.bson.BsonDocument;
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecRegistry;
import org.jctools.maps.NonBlockingIdentityHashMap;
Expand Down Expand Up @@ -139,6 +140,10 @@ public <T> MongoCollection<T> getCollection(Class<T> entityClass) {
return collection;
}

public MongoCollection<BsonDocument> getCollection(String collectionName) {
return database.getCollection(collectionName, BsonDocument.class);
}

public List<MongoEntity<?>> getEntities() {
return new ArrayList<>(classToEntity.values());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

package im.turms.server.common.storage.mongo.exception;

import jakarta.annotation.Nullable;

/**
* @author James Chen
*/
public class CorruptedDocumentException extends RuntimeException {

public CorruptedDocumentException(String message, Throwable cause) {
public CorruptedDocumentException(String message) {
super(message);
}

public CorruptedDocumentException(String message, @Nullable Throwable cause) {
super(message, cause);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public interface MongoOperationsSupport {

<T> Mono<T> findOne(Class<T> clazz, Filter filter, @Nullable QueryOptions options);

Flux<BsonDocument> findMany(String collectionName, Filter filter);

Flux<BsonDocument> findMany(
String collectionName,
Filter filter,
@Nullable QueryOptions options);

<T> Flux<T> findMany(Class<T> clazz, Filter filter);

<T> Flux<T> findMany(Class<T> clazz, Filter filter, @Nullable QueryOptions options);
Expand Down Expand Up @@ -162,6 +169,8 @@ Mono<Void> updateZoneKeyRange(

Flux<String> listCollectionNames();

Mono<Void> renameCollection(String oldCollectionName, String newCollectionName);

/**
* @return whether the collection has already existed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import jakarta.annotation.Nullable;

import com.mongodb.ClientSessionOptions;
import com.mongodb.MongoNamespace;
import com.mongodb.TransactionOptions;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Aggregates;
Expand Down Expand Up @@ -188,6 +189,21 @@ public <T> Mono<T> findOne(Class<T> clazz, Filter filter, @Nullable QueryOptions
return Mono.from(source);
}

@Override
public Flux<BsonDocument> findMany(String collectionName, Filter filter) {
return findMany(collectionName, filter, null);
}

@Override
public Flux<BsonDocument> findMany(
String collectionName,
Filter filter,
@Nullable QueryOptions options) {
MongoCollection<BsonDocument> collection = context.getCollection(collectionName);
FindPublisher<BsonDocument> source = find(collection, filter, options);
return Flux.from(source);
}

@Override
public <T> Flux<T> findMany(Class<T> clazz, Filter filter) {
return findMany(clazz, filter, null);
Expand Down Expand Up @@ -795,6 +811,17 @@ public Flux<String> listCollectionNames() {
.listCollectionNames());
}

@Override
public Mono<Void> renameCollection(String oldCollectionName, String newCollectionName) {
MongoCollection<Document> collection = context.getDatabase()
.getCollection(oldCollectionName);
MongoNamespace newNamespace = new MongoNamespace(
collection.getNamespace()
.getDatabaseName(),
newCollectionName);
return Mono.from(collection.renameCollection(newNamespace));
}

@Override
public Mono<Boolean> createCollectionIfNotExists(Class<?> clazz) {
MongoEntity<?> entity = context.getEntity(clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.bson.BsonDateTime;
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.BsonValue;

import im.turms.server.common.access.client.dto.constant.RequestStatus;
Expand Down Expand Up @@ -216,6 +217,11 @@ public Filter or(Filter... filters) {
return this;
}

public Filter type(String id, String string) {
document.append(id, new BsonDocument("$type", new BsonString(string)));
return this;
}

// Expiration Support

public Filter isExpired(String creationDateFieldName, @Nullable Date expirationDate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class MongoCollectionInitializer implements IMongoCollectionInitializer {
private final List<TurmsMongoClient> clients;

private final TurmsApplicationContext context;
private final MongoCollectionMigrator collectionMigrator;
private final MongoFakeDataGenerator fakeDataGenerator;
private final TieredStorageProperties messageTieredStorageProperties;
private final MongoGroupProperties mongoGroupProperties;
Expand Down Expand Up @@ -144,6 +145,7 @@ public MongoCollectionInitializer(
this.propertiesManager = propertiesManager;
ServiceProperties serviceProperties = propertiesManager.getLocalProperties()
.getService();
collectionMigrator = new MongoCollectionMigrator(adminMongoClient, userMongoClient);
fakeDataGenerator = new MongoFakeDataGenerator(
serviceProperties.getFake(),
passwordManager,
Expand Down Expand Up @@ -188,19 +190,22 @@ private boolean isQualifiedToRotateZones() {
}

private void initCollections() {
boolean migrateCollections;
if (!context.isProduction() && fakeDataGenerator.isClearAllCollectionsBeforeFaking()) {
LOGGER.info("Start dropping databases");
LOGGER.warn("Start dropping databases");
try {
dropAllDatabases().block(DurationConst.ONE_MINUTE);
} catch (Exception e) {
throw new MongoInitializationException(
"Caught an error while dropping databases",
e);
}
LOGGER.info("All collections are cleared");
LOGGER.warn("All databases are dropped");
migrateCollections = false;
} else {
migrateCollections = true;
}
LOGGER.info("Start creating collections");
Mono<Void> createCollections = createCollectionsIfNotExist()
Mono<Void> createCollections = createCollectionsIfNotExist(migrateCollections)
.onErrorMap(
t -> new MongoInitializationException("Failed to create collections", t))
.doOnSuccess(ignored -> LOGGER.info("All collections are created"))
Expand Down Expand Up @@ -228,68 +233,76 @@ private void initCollections() {
/**
* @return True if all collections have existed
*/
private Mono<Boolean> createCollectionsIfNotExist() {
private Mono<Boolean> createCollectionsIfNotExist(boolean migrateCollections) {
return adminMongoClient.listCollectionNames()
.collect(CollectorUtil.toSet(32))
.flatMap(existingCollectionNames -> PublisherUtil.areAllTrue(
adminMongoClient.createCollectionIfNotExists(Admin.class,
existingCollectionNames),
adminMongoClient.createCollectionIfNotExists(AdminRole.class,
existingCollectionNames),
.flatMap(existingCollectionNames -> {
if (migrateCollections) {
return collectionMigrator.migrate(existingCollectionNames)
.then(Mono.defer(() -> {
LOGGER.info("Start creating collections");
return createCollectionsIfNotExist(existingCollectionNames);
}));
}
LOGGER.info("Start creating collections");
return createCollectionsIfNotExist(existingCollectionNames);
});
}

private Mono<Boolean> createCollectionsIfNotExist(Set<String> existingCollectionNames) {
return PublisherUtil.areAllTrue(
adminMongoClient.createCollectionIfNotExists(Admin.class, existingCollectionNames),
adminMongoClient.createCollectionIfNotExists(AdminRole.class,
existingCollectionNames),

groupMongoClient.createCollectionIfNotExists(Group.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupBlockedUser.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupInvitation.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupJoinQuestion.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupJoinRequest.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupMember.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupType.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupVersion.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(Group.class, existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupBlockedUser.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupInvitation.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupJoinQuestion.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupJoinRequest.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupMember.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupType.class,
existingCollectionNames),
groupMongoClient.createCollectionIfNotExists(GroupVersion.class,
existingCollectionNames),

conferenceMongoClient.createCollectionIfNotExists(Meeting.class,
existingCollectionNames),
conferenceMongoClient.createCollectionIfNotExists(Meeting.class,
existingCollectionNames),

messageMongoClient.createCollectionIfNotExists(Message.class,
existingCollectionNames),
messageMongoClient.createCollectionIfNotExists(Message.class,
existingCollectionNames),

conversationMongoClient.createCollectionIfNotExists(
ConversationSettings.class,
existingCollectionNames),
conversationMongoClient.createCollectionIfNotExists(GroupConversation.class,
existingCollectionNames),
conversationMongoClient.createCollectionIfNotExists(
PrivateConversation.class,
existingCollectionNames),
conversationMongoClient.createCollectionIfNotExists(ConversationSettings.class,
existingCollectionNames),
conversationMongoClient.createCollectionIfNotExists(GroupConversation.class,
existingCollectionNames),
conversationMongoClient.createCollectionIfNotExists(PrivateConversation.class,
existingCollectionNames),

// ElasticsearchManager has its own logic to create collections dynamically,
// so we don't need to create collections for it.
// mongoClient.createCollectionIfNotExists(SyncLog.class),
// ElasticsearchManager has its own logic to create collections dynamically,
// so we don't need to create collections for it.
// mongoClient.createCollectionIfNotExists(SyncLog.class),

userMongoClient.createCollectionIfNotExists(User.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserFriendRequest.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserRelationship.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserRelationshipGroup.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(
UserRelationshipGroupMember.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserRole.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserSettings.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserVersion.class,
existingCollectionNames)));
userMongoClient.createCollectionIfNotExists(User.class, existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserFriendRequest.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserRelationship.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserRelationshipGroup.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserRelationshipGroupMember.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserRole.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserSettings.class,
existingCollectionNames),
userMongoClient.createCollectionIfNotExists(UserVersion.class,
existingCollectionNames));
}

private Mono<Void> dropAllDatabases() {
Expand Down
Loading

0 comments on commit 01b09eb

Please sign in to comment.