Skip to content

Commit

Permalink
add initial support for changelog & changeloglock
Browse files Browse the repository at this point in the history
this also comes with various supporting infrastructure, e.g. the
executor.

this does not yet allow executing any changesets, but it allows
liquibase to run with an empty changeset which will create the
`changelog` and also manage the lock (which ensures that only one
liquibase operation is running at a time).

note: the `liquibase.nosql` package has been copied from
[`liquibase-mongodb`] and adapted where needed (it is not 100% generic).
no authorship is claimed for this content!

[`liquibase-mongodb`]: https://github.com/liquibase/liquibase-mongodb
  • Loading branch information
rursprung authored and filipelautert committed May 14, 2024
1 parent 7ea8322 commit c98d6f8
Show file tree
Hide file tree
Showing 19 changed files with 1,346 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-java</artifactId>
<version>2.10.1</version>
<version>2.10.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down Expand Up @@ -131,6 +131,11 @@
</dependencies>

<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package liquibase.ext.opensearch.changelog;

import liquibase.ChecksumVersion;
import liquibase.Scope;
import liquibase.change.CheckSum;
import liquibase.changelog.ChangeSet;
import liquibase.changelog.RanChangeSet;
import liquibase.database.Database;
import liquibase.exception.DatabaseException;
import liquibase.ext.opensearch.database.OpenSearchConnection;
import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase;
import liquibase.logging.Logger;
import liquibase.nosql.changelog.AbstractNoSqlHistoryService;
import lombok.AllArgsConstructor;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.mapping.*;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.opensearch.core.search.Hit;
import org.opensearch.client.opensearch.indices.PutMappingRequest;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class OpenSearchHistoryService extends AbstractNoSqlHistoryService<OpenSearchLiquibaseDatabase> {

private final Logger log = Scope.getCurrentScope().getLog(getClass());

@Override
protected Logger getLogger() {
return log;
}

private OpenSearchClient getOpenSearchClient() {
final var connection = (OpenSearchConnection) this.getNoSqlDatabase().getConnection();
return connection.getOpenSearchClient();
}

@Override
protected boolean existsRepository() throws DatabaseException {
try {
return this.getOpenSearchClient().indices().exists(r -> r.index(this.getDatabaseChangeLogTableName())).value();
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected void createRepository() throws DatabaseException {
// note: the mapping will be created in adjustRepository

try {
this.getOpenSearchClient().indices().create(r -> r.index(this.getDatabaseChangeLogTableName()));
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected void adjustRepository() throws DatabaseException {
// properties must match RanChangeSet
final var request = new PutMappingRequest.Builder()
.index(this.getDatabaseChangeLogTableName())
.properties("id", new Property.Builder().keyword(new KeywordProperty.Builder().build()).build())
.properties("changeLog", new Property.Builder().keyword(new KeywordProperty.Builder().build()).build())
.properties("storedChangeLog", new Property.Builder().keyword(new KeywordProperty.Builder().build()).build())
.properties("author", new Property.Builder().text(new TextProperty.Builder().build()).build())
.properties("lastCheckSum", new Property.Builder().object(
new ObjectProperty.Builder()
.properties("version", new Property.Builder().integer(new IntegerNumberProperty.Builder().build()).build())
.properties("storedCheckSum", new Property.Builder().keyword(new KeywordProperty.Builder().build()).build())
.build())
.build()
)
.properties("dateExecuted", new Property.Builder().date(new DateProperty.Builder().build()).build())
.properties("tag", new Property.Builder().text(new TextProperty.Builder().build()).build())
.properties("execType", new Property.Builder().keyword(new KeywordProperty.Builder().build()).build())
.properties("description", new Property.Builder().text(new TextProperty.Builder().build()).build())
.properties("comments", new Property.Builder().text(new TextProperty.Builder().build()).build())
.properties("orderExecuted", new Property.Builder().integer(new IntegerNumberProperty.Builder().build()).build())
// TODO: contextExpression ?
.properties("labels", new Property.Builder().text(new TextProperty.Builder().build()).build())
.properties("deploymentId", new Property.Builder().text(new TextProperty.Builder().build()).build())
.properties("liquibaseVersion", new Property.Builder().text(new TextProperty.Builder().build()).build())
.build();

try {
this.getOpenSearchClient().indices().putMapping(request);
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected void dropRepository() throws DatabaseException {
try {
this.getOpenSearchClient().indices().delete(r -> r.index(this.getDatabaseChangeLogTableName()));
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected List<RanChangeSet> queryRanChangeSets() throws DatabaseException {
try {
final var response = this.getOpenSearchClient()
.search(s -> s.index(this.getDatabaseChangeLogTableName()), RanChangeSet.class);
return response.hits().hits().stream()
.map(Hit::source)
.collect(Collectors.toList());
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected int generateNextSequence() throws DatabaseException {
final var aggregationName = "max";
final var request = new SearchRequest.Builder()
.index(this.getDatabaseChangeLogTableName())
.aggregations(aggregationName, a -> a.max(m -> m.field("orderExecuted")))
.build();
try {
final var response = this.getOpenSearchClient().search(request, RanChangeSet.class);
return (int) response.aggregations().get(aggregationName).max().value();
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected void markChangeSetRun(final ChangeSet changeSet, final ChangeSet.ExecType execType, final Integer nextSequenceValue) throws DatabaseException {
final var ranChangeSet = new RanChangeSet(changeSet, execType, null, null);

if (execType.ranBefore) {
// TODO: handle updating existing entry!
throw new UnsupportedOperationException();
}

try {
this.getOpenSearchClient()
.create(r -> r.index(this.getDatabaseChangeLogTableName())
.id(ranChangeSet.getId())
.document(ranChangeSet));
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected void removeRanChangeSet(final ChangeSet changeSet) throws DatabaseException {
try {
this.getOpenSearchClient()
.delete(r -> r.index(this.getDatabaseChangeLogTableName()).id(String.valueOf(changeSet.getId())));
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
public void clearAllCheckSums() throws DatabaseException {
throw new UnsupportedOperationException();
}

@Override
protected long countTags(final String tag) throws DatabaseException {
final var request = new SearchRequest.Builder()
.index(this.getDatabaseChangeLogTableName())
.query(q -> q.match(m -> m.field("tag").query(FieldValue.of(tag))))
.build();
try {
final var response = this.getOpenSearchClient().search(request, RanChangeSet.class);
return response.hits().total().value();
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
protected void tagLast(final String tagString) throws DatabaseException {
// TODO
}

@Override
protected long countRanChangeSets() throws DatabaseException {
return this.queryRanChangeSets().size();
}

@Override
protected void updateCheckSum(final ChangeSet changeSet) throws DatabaseException {
@AllArgsConstructor
class CheckSumObj {
final CheckSum lastCheckSum;
}
final var currentChecksumVersion = Optional.ofNullable(changeSet.getStoredCheckSum())
.map(cs -> ChecksumVersion.enumFromChecksumVersion(cs.getVersion()))
.orElse(ChecksumVersion.latest());
final var checkSum = changeSet.generateCheckSum(currentChecksumVersion);

try {
this.getOpenSearchClient()
.update(r -> r.index(this.getDatabaseChangeLogTableName())
.id(changeSet.getId())
.doc(new CheckSumObj(checkSum))
, RanChangeSet.class);
} catch (final IOException e) {
throw new DatabaseException(e);
}
}

@Override
public boolean supports(final Database database) {
return OpenSearchLiquibaseDatabase.PRODUCT_NAME.equals(database.getDatabaseProductName());
}

@Override
public boolean isDatabaseChecksumsCompatible() {
return false;
}

}
Loading

0 comments on commit c98d6f8

Please sign in to comment.