Skip to content

Commit

Permalink
Add periodic check for elasticsearch indices.
Browse files Browse the repository at this point in the history
Add option to restrict index maintenance actions to specific indices
Let index maintenance actions run on the master, if cluster coordinator
is used
  • Loading branch information
npomaroli committed Oct 15, 2021
1 parent 7d472b0 commit d66f85f
Show file tree
Hide file tree
Showing 51 changed files with 1,185 additions and 99 deletions.
18 changes: 17 additions & 1 deletion LTS-CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,24 @@ include::content/docs/variables.adoc-include[]
The LTS changelog lists releases which are only accessible via a commercial subscription.
All fixes and changes in LTS releases will be released the next minor release. Changes from LTS 1.4.x will be included in release 1.5.0.

[[v1.6.22]]
== 1.6.22 (TBD)

icon:check[] Search: If ElasticSearch indices, which were originally created by Mesh were dropped, they would probably be re-created with the default mapping when Mesh
stored a document in the index. This could cause the index to be not completely filled and also the mapping to be incorrect, so that search queries would fail to find
documents. In order to detect and repair such situations, a periodic check has been added to Mesh, which will check existence and correctness of all required ElasticSearch
indices. Missing indices will be created and incorrect indices will be dropped and re-created. Afterwards a full sync for the affected indices will be triggered. The check
period can be configured with the new configuration setting `search.indexCheckInterval`, which defaults to 60_000 milliseconds. Setting the interval to 0 will disable the
periodic check (not recommended).

icon:plus[] Search: The index maintenance endpoints `/api/v1/search/sync` and `/api/v1/search/clear` have been extended with the query parameter `index`, which can be used
to restrict the synchronized/cleared indices by a regular expression.

icon:check[] Search: The index maintenance actions triggered via `/api/v1/search/sync` and `/api/v1/search/clear` will now be redirected to the current master instance,
if the cluster coordinator is used.

[[v1.6.21]]
== 1.6.21 (TBD)
== 1.6.21 (12.10.2021)

icon:check[] Search: When a full synchronization of the search indices was triggered, language specific indices were unnecessarily removed first. This has been changed,
language specific indices will now be treated like all other indices during a full synchronization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class ElasticSearchOptions implements Option {

public static final boolean DEFAULT_HOSTNAME_VERIFICATION = true;

public static final long DEFAULT_INDEX_CHECK_INTERVAL = 60 * 1000;
public static final long DEFAULT_INDEX_MAPPING_CACHE_TIMEOUT = 60 * 60 * 1000;

public static final String MESH_ELASTICSEARCH_URL_ENV = "MESH_ELASTICSEARCH_URL";
public static final String MESH_ELASTICSEARCH_USERNAME_ENV = "MESH_ELASTICSEARCH_USERNAME";
public static final String MESH_ELASTICSEARCH_PASSWORD_ENV = "MESH_ELASTICSEARCH_PASSWORD";
Expand All @@ -66,6 +69,9 @@ public class ElasticSearchOptions implements Option {
public static final String MESH_ELASTICSEARCH_HOSTNAME_VERIFICATION_ENV = "MESH_ELASTICSEARCH_HOSTNAME_VERIFICATION";
public static final String MESH_ELASTICSEARCH_INCLUDE_BINARY_FIELDS_ENV = "MESH_ELASTICSEARCH_INCLUDE_BINARY_FIELDS";

public static final String MESH_ELASTICSEARCH_INDEX_CHECK_INTERVAL_ENV = "MESH_ELASTICSEARCH_INDEX_CHECK_INTERVAL";
public static final String MESH_ELASTICSEARCH_INDEX_MAPPING_CACHE_TIMEOUT_ENV = "MESH_ELASTICSEARCH_INDEX_MAPPING_CACHE_TIMEOUT";

@JsonProperty(required = false)
@JsonPropertyDescription("Elasticsearch connection url to be used. Set this setting to null will disable the Elasticsearch support.")
@EnvironmentVariable(name = MESH_ELASTICSEARCH_URL_ENV, description = "Override the configured elasticsearch server url. The value can be set to null in order to disable the Elasticsearch support.")
Expand Down Expand Up @@ -189,6 +195,16 @@ public class ElasticSearchOptions implements Option {
@EnvironmentVariable(name = MESH_ELASTICSEARCH_SYNC_BATCH_SIZE_ENV, description = "Override the search sync batch size")
private int syncBatchSize = DEFAULT_SYNC_BATCH_SIZE;

@JsonProperty(required = false)
@JsonPropertyDescription("Set the interval of index checks in ms. Default: " + DEFAULT_INDEX_CHECK_INTERVAL)
@EnvironmentVariable(name = MESH_ELASTICSEARCH_INDEX_CHECK_INTERVAL_ENV, description = "Override the interval for index checks")
private long indexCheckInterval = DEFAULT_INDEX_CHECK_INTERVAL;

@JsonProperty(required = false)
@JsonPropertyDescription("Set the timeout for the cache of index mappings in ms. Default: " + DEFAULT_INDEX_MAPPING_CACHE_TIMEOUT)
@EnvironmentVariable(name = MESH_ELASTICSEARCH_INDEX_MAPPING_CACHE_TIMEOUT_ENV, description = "Override the timeout for the cache if index mappings")
private long indexMappingCacheTimeout = DEFAULT_INDEX_MAPPING_CACHE_TIMEOUT;

public ElasticSearchOptions() {

}
Expand Down Expand Up @@ -442,4 +458,36 @@ public int getSyncBatchSize() {
public void setSyncBatchSize(int batchSize) {
this.syncBatchSize = batchSize;
}

/**
* Index check interval in ms
* @return interval
*/
public long getIndexCheckInterval() {
return indexCheckInterval;
}

/**
* Set the index check interval in ms
* @param indexCheckInterval interval
*/
public void setIndexCheckInterval(long indexCheckInterval) {
this.indexCheckInterval = indexCheckInterval;
}

/**
* Timeout for the cache of index mappings in ms
* @return timeout
*/
public long getIndexMappingCacheTimeout() {
return indexMappingCacheTimeout;
}

/**
* Set the timeout for the cache of index mappings
* @param indexMappingCacheTimeout timeout
*/
public void setIndexMappingCacheTimeout(long indexMappingCacheTimeout) {
this.indexMappingCacheTimeout = indexMappingCacheTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,18 @@ public class EventAwareCacheImpl<K, V> implements EventAwareCache<K, V> {
private final Counter missCounter;
private final Counter hitCounter;

public EventAwareCacheImpl(String name, long maxSize, Duration expireAfter, Vertx vertx, MeshOptions options, MetricsService metricsService, Predicate<Message<JsonObject>> filter,
BiConsumer<Message<JsonObject>, EventAwareCache<K, V>> onNext,
MeshEvent... events) {
public EventAwareCacheImpl(String name, long maxSize, Duration expireAfter, Duration expireAfterAccess, Vertx vertx, MeshOptions options, MetricsService metricsService,
Predicate<Message<JsonObject>> filter,
BiConsumer<Message<JsonObject>, EventAwareCache<K, V>> onNext, MeshEvent... events) {
this.vertx = vertx;
this.options = options;
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder().maximumSize(maxSize);
if (expireAfter != null) {
cacheBuilder = cacheBuilder.expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS);
}
if (expireAfterAccess != null) {
cacheBuilder = cacheBuilder.expireAfterAccess(expireAfterAccess.getSeconds(), TimeUnit.SECONDS);
}
this.cache = cacheBuilder.build();
this.filter = filter;
this.onNext = onNext;
Expand Down Expand Up @@ -185,6 +188,7 @@ public static class Builder<K, V> {
private MeshEvent[] events = null;
private Vertx vertx;
private Duration expireAfter;
private Duration expireAfterAccess;
private String name;
private MeshOptions options;
private MetricsService metricsService;
Expand All @@ -193,7 +197,7 @@ public EventAwareCache<K, V> build() {
Objects.requireNonNull(events, "No events for the cache have been set");
Objects.requireNonNull(vertx, "No Vert.x instance has been set");
Objects.requireNonNull(name, "No name has been set");
EventAwareCacheImpl<K, V> c = new EventAwareCacheImpl<>(name, maxSize, expireAfter, vertx, options, metricsService, filter, onNext, events);
EventAwareCacheImpl<K, V> c = new EventAwareCacheImpl<>(name, maxSize, expireAfter, expireAfterAccess, vertx, options, metricsService, filter, onNext, events);
if (disabled) {
c.disable();
}
Expand Down Expand Up @@ -296,6 +300,18 @@ public Builder<K, V> expireAfter(long amount, TemporalUnit unit) {
return this;
}

/**
* Define when the cache should automatically expire after last access
*
* @param amount
* @param unit
* @return Fluent API
*/
public Builder<K, V> expireAfterAccess(long amount, TemporalUnit unit) {
this.expireAfterAccess = Duration.of(amount, unit);
return this;
}

/**
* Sets the name for the cache. This is used for caching metrics.
* @param name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gentics.mesh.core.data.search;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import com.gentics.mesh.context.InternalActionContext;
Expand All @@ -27,6 +29,7 @@
* @param <T>
*/
public interface IndexHandler<T extends MeshCoreVertex<?, T>> {
public final static Pattern MATCH_ALL = Pattern.compile(".*");

/**
* Initialise the search index by creating the index first and setting the mapping afterwards.
Expand Down Expand Up @@ -82,9 +85,10 @@ public interface IndexHandler<T extends MeshCoreVertex<?, T>> {
/**
* Diff the elements within all indices that are handled by the index handler and synchronize the data.
*
* @param indexPattern optional index pattern to restrict synchronized indices
* @return
*/
Flowable<SearchRequest> syncIndices();
Flowable<SearchRequest> syncIndices(Optional<Pattern> indexPattern);

/**
* Filter the given list and return only indices which match the type of the handler but are no longer in use or unknown.
Expand Down Expand Up @@ -167,4 +171,9 @@ default GraphPermission getReadPermission(InternalActionContext ac) {
*/
Observable<UpdateBulkEntry> updatePermissionForBulk(UpdateDocumentEntry entry);

/**
* Check indices handled by this handler for existence and correctness (mapping)
* @return completable
*/
Completable check();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public class DistributionUtils {

private static final Set<Pattern> readOnlyPathPatternSet = createReadOnlyPatternSet();

/**
* Set of black-listed path patterns (paths matching any of these patterns is supposed to be "modifying" and should be redirected to the master)
*/
private static final Set<Pattern> blackListPathPatternSet = createBlackListPatternSet();

/**
* Check whether the request is a read request.
*
Expand All @@ -25,6 +30,10 @@ public class DistributionUtils {
* @return
*/
public static boolean isReadRequest(HttpMethod method, String path) {
if (isBlackListed(path)) {
return false;
}

switch (method) {
case CONNECT:
case OPTIONS:
Expand Down Expand Up @@ -59,6 +68,21 @@ public static boolean isReadOnly(String path) {
return false;
}

/**
* Check whether the given path matches one of the known black-listed paths.
* @param path path
* @return true if the request to the given path is black-listed (is supposed to be "modifying")
*/
public static boolean isBlackListed(String path) {
for (Pattern pattern : blackListPathPatternSet) {
Matcher m = pattern.matcher(path);
if (m.matches()) {
return true;
}
}
return false;
}

/**
* Create the set of read-only patterns
* @return pattern set
Expand All @@ -76,4 +100,19 @@ private static Set<Pattern> createReadOnlyPatternSet() {
patterns.add(Pattern.compile("/api/v[0-9]+/utilities/validateMicroschema/?"));
return patterns;
}

/**
* Create the set of blacklisted path patterns
* @return pattern set
*/
private static Set<Pattern> createBlackListPatternSet() {
Set<Pattern> patterns = new HashSet<>();
// clearing the search indices should only be done on the Master
patterns.add(Pattern.compile("/api/v[0-9]+/search/clear"));
// index sync should only be done on the Master
patterns.add(Pattern.compile("/api/v[0-9]+/search/sync"));
// search index operation status should only be fetched from the Master (which is doing the index operations)
patterns.add(Pattern.compile("/api/v[0-9]+/search/status"));
return patterns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public interface RequestDelegator extends Handler<RoutingContext> {
* @param routingContext
*/
void redirectToMaster(RoutingContext routingContext);

/**
* Returns true when this instance is the master
* @return true for master
*/
boolean isMaster();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.gentics.mesh.parameter.impl.DeleteParametersImpl;
import com.gentics.mesh.parameter.impl.GenericParametersImpl;
import com.gentics.mesh.parameter.impl.ImageManipulationParametersImpl;
import com.gentics.mesh.parameter.impl.IndexMaintenanceParametersImpl;
import com.gentics.mesh.parameter.impl.NodeParametersImpl;
import com.gentics.mesh.parameter.impl.PagingParametersImpl;
import com.gentics.mesh.parameter.impl.ProjectPurgeParametersImpl;
Expand Down Expand Up @@ -71,4 +72,8 @@ default SearchParameters getSearchParameters() {
default BackupParameters getBackupParameters() {
return new BackupParametersImpl(this);
}

default IndexMaintenanceParameters getIndexMaintenanceParameters() {
return new IndexMaintenanceParametersImpl(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.gentics.mesh.parameter.impl;

import java.util.HashMap;
import java.util.Map;

import org.raml.model.ParamType;
import org.raml.model.parameter.QueryParameter;

import com.gentics.mesh.handler.ActionContext;
import com.gentics.mesh.parameter.AbstractParameters;
import com.gentics.mesh.parameter.IndexMaintenanceParameters;

/**
* Parameter implementation for index maintenance parameters
*/
public class IndexMaintenanceParametersImpl extends AbstractParameters implements IndexMaintenanceParameters {
/**
* Create empty instance
*/
public IndexMaintenanceParametersImpl() {
}

/**
* Create instance with parameters filled from the action context
* @param ac action context
*/
public IndexMaintenanceParametersImpl(ActionContext ac) {
super(ac);
}

@Override
public String getName() {
return "Index Maintenance Parameters";
}

@Override
public Map<? extends String, ? extends QueryParameter> getRAMLParameters() {
Map<String, QueryParameter> parameters = new HashMap<>();

// index parameter
QueryParameter indexParameter = new QueryParameter();
indexParameter.setDescription("Index pattern to handle");
indexParameter.setExample("node-.*");
indexParameter.setRequired(false);
indexParameter.setType(ParamType.STRING);
parameters.put(INDEX_PARAMETER_KEY, indexParameter);
return parameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void reset() {
}

@Override
public Completable clear() {
public Completable clear(String indexPattern) {
return Completable.complete();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.gentics.mesh.search;

import com.gentics.mesh.cache.MeshCache;

import io.vertx.core.json.JsonObject;

/**
* Interface for the cache of expected search mappings
*/
public interface SearchMappingsCache extends MeshCache<String, JsonObject> {
/**
* Put the value into the cache
* @param key cache key
* @param value cached value
*/
void put(String key, JsonObject value);
}
Loading

0 comments on commit d66f85f

Please sign in to comment.