Skip to content

Commit

Permalink
Refactoring of code (#342)
Browse files Browse the repository at this point in the history
1. Changed class name from Ip2GeoCache to Ip2GeoCachedDao
2. Moved the Ip2GeoCachedDao from cache to dao package

Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 authored Jun 26, 2023
1 parent 79347de commit eb5abf3
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;
package org.opensearch.geospatial.ip2geo.dao;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -18,18 +18,20 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;

/**
* Data access object for Datasource and GeoIP data with added caching layer
*/
@Log4j2
public class Ip2GeoCache implements IndexingOperationListener {
public class Ip2GeoCachedDao implements IndexingOperationListener {
private final DatasourceDao datasourceDao;
private Map<String, DatasourceMetadata> data;

public Ip2GeoCache(final DatasourceDao datasourceDao) {
public Ip2GeoCachedDao(final DatasourceDao datasourceDao) {
this.datasourceDao = datasourceDao;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.opensearch.action.ActionListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.IngestService;
Expand Down Expand Up @@ -57,7 +57,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private final ClusterSettings clusterSettings;
private final DatasourceDao datasourceDao;
private final GeoIpDataDao geoIpDataDao;
private final Ip2GeoCache ip2GeoCache;
private final Ip2GeoCachedDao ip2GeoCachedDao;

/**
* Ip2Geo processor type
Expand All @@ -76,7 +76,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
* @param clusterSettings the cluster settings
* @param datasourceDao the datasource facade
* @param geoIpDataDao the geoip data facade
* @param ip2GeoCache the cache
* @param ip2GeoCachedDao the cache
*/
public Ip2GeoProcessor(
final String tag,
Expand All @@ -89,7 +89,7 @@ public Ip2GeoProcessor(
final ClusterSettings clusterSettings,
final DatasourceDao datasourceDao,
final GeoIpDataDao geoIpDataDao,
final Ip2GeoCache ip2GeoCache
final Ip2GeoCachedDao ip2GeoCachedDao
) {
super(tag, description);
this.field = field;
Expand All @@ -100,7 +100,7 @@ public Ip2GeoProcessor(
this.clusterSettings = clusterSettings;
this.datasourceDao = datasourceDao;
this.geoIpDataDao = geoIpDataDao;
this.ip2GeoCache = ip2GeoCache;
this.ip2GeoCachedDao = ip2GeoCachedDao;
}

/**
Expand Down Expand Up @@ -154,8 +154,8 @@ protected void executeInternal(
final String ip
) {
validateDatasourceIsInAvailableState(datasourceName);
String indexName = ip2GeoCache.getIndexName(datasourceName);
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
String indexName = ip2GeoCachedDao.getIndexName(datasourceName);
if (ip2GeoCachedDao.isExpired(datasourceName) || indexName == null) {
handleExpiredData(ingestDocument, handler);
return;
}
Expand Down Expand Up @@ -209,11 +209,11 @@ private List<Map<String, Object>> filteredGeoData(final List<Map<String, Object>
}

private void validateDatasourceIsInAvailableState(final String datasourceName) {
if (ip2GeoCache.has(datasourceName) == false) {
if (ip2GeoCachedDao.has(datasourceName) == false) {
throw new IllegalStateException("datasource does not exist");
}

if (DatasourceState.AVAILABLE.equals(ip2GeoCache.getState(datasourceName)) == false) {
if (DatasourceState.AVAILABLE.equals(ip2GeoCachedDao.getState(datasourceName)) == false) {
throw new IllegalStateException("datasource is not in an available state");
}
}
Expand Down Expand Up @@ -243,8 +243,8 @@ protected void executeInternal(
}

validateDatasourceIsInAvailableState(datasourceName);
String indexName = ip2GeoCache.getIndexName(datasourceName);
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
String indexName = ip2GeoCachedDao.getIndexName(datasourceName);
if (ip2GeoCachedDao.isExpired(datasourceName) || indexName == null) {
handleExpiredData(ingestDocument, handler);
return;
}
Expand Down Expand Up @@ -290,7 +290,7 @@ public static final class Factory implements Processor.Factory {
private final IngestService ingestService;
private final DatasourceDao datasourceDao;
private final GeoIpDataDao geoIpDataDao;
private final Ip2GeoCache ip2GeoCache;
private final Ip2GeoCachedDao ip2GeoCachedDao;

/**
* Within this method, blocking request cannot be called because this method is executed in a transport thread.
Expand Down Expand Up @@ -320,7 +320,7 @@ public Ip2GeoProcessor create(
ingestService.getClusterService().getClusterSettings(),
datasourceDao,
geoIpDataDao,
ip2GeoCache
ip2GeoCachedDao
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@
import org.opensearch.geospatial.ip2geo.action.RestUpdateDatasourceHandler;
import org.opensearch.geospatial.ip2geo.action.UpdateDatasourceAction;
import org.opensearch.geospatial.ip2geo.action.UpdateDatasourceTransportAction;
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
Expand Down Expand Up @@ -94,7 +94,7 @@
*/
@Log4j2
public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin {
private Ip2GeoCache ip2GeoCache;
private Ip2GeoCachedDao ip2GeoCachedDao;
private DatasourceDao datasourceDao;
private GeoIpDataDao geoIpDataDao;

Expand All @@ -107,17 +107,17 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
this.datasourceDao = new DatasourceDao(parameters.client, parameters.ingestService.getClusterService());
this.geoIpDataDao = new GeoIpDataDao(parameters.ingestService.getClusterService(), parameters.client);
this.ip2GeoCache = new Ip2GeoCache(datasourceDao);
this.ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao);
return MapBuilder.<String, Processor.Factory>newMapBuilder()
.put(FeatureProcessor.TYPE, new FeatureProcessor.Factory())
.put(Ip2GeoProcessor.TYPE, new Ip2GeoProcessor.Factory(parameters.ingestService, datasourceDao, geoIpDataDao, ip2GeoCache))
.put(Ip2GeoProcessor.TYPE, new Ip2GeoProcessor.Factory(parameters.ingestService, datasourceDao, geoIpDataDao, ip2GeoCachedDao))
.immutableMap();
}

@Override
public void onIndexModule(IndexModule indexModule) {
if (DatasourceExtension.JOB_INDEX_NAME.equals(indexModule.getIndex().getName())) {
indexModule.addIndexOperationListener(ip2GeoCache);
indexModule.addIndexOperationListener(ip2GeoCachedDao);
log.info("Ip2GeoListener started listening to operations on index {}", DatasourceExtension.JOB_INDEX_NAME);
}
}
Expand Down Expand Up @@ -170,7 +170,7 @@ public Collection<Object> createComponents(
ip2GeoExecutor,
geoIpDataDao,
ip2GeoLockService,
ip2GeoCache
ip2GeoCachedDao
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoProcessorDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
Expand Down Expand Up @@ -79,7 +79,7 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase {
@Mock
protected GeoIpDataDao geoIpDataDao;
@Mock
protected Ip2GeoCache ip2GeoCache;
protected Ip2GeoCachedDao ip2GeoCachedDao;
@Mock
protected ClusterState clusterState;
@Mock
Expand Down Expand Up @@ -266,7 +266,7 @@ protected Ip2GeoProcessor randomIp2GeoProcessor(String datasourceName) {
clusterSettings,
datasourceDao,
geoIpDataDao,
ip2GeoCache
ip2GeoCachedDao
);
return ip2GeoProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;
package org.opensearch.geospatial.ip2geo.dao;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -23,20 +23,20 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.ShardId;

public class Ip2GeoCacheTests extends Ip2GeoTestCase {
private Ip2GeoCache ip2GeoCache;
public class Ip2GeoCachedDaoTests extends Ip2GeoTestCase {
private Ip2GeoCachedDao ip2GeoCachedDao;

@Before
public void init() {
ip2GeoCache = new Ip2GeoCache(datasourceDao);
ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao);
}

public void testGetIndexName_whenCalled_thenReturnIndexName() {
Datasource datasource = randomDatasource();
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
String indexName = ip2GeoCache.getIndexName(datasource.getName());
String indexName = ip2GeoCachedDao.getIndexName(datasource.getName());

// Verify
assertEquals(datasource.currentIndexName(), indexName);
Expand All @@ -49,7 +49,7 @@ public void testIsExpired_whenExpired_thenReturnTrue() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
boolean isExpired = ip2GeoCache.isExpired(datasource.getName());
boolean isExpired = ip2GeoCachedDao.isExpired(datasource.getName());

// Verify
assertTrue(isExpired);
Expand All @@ -62,7 +62,7 @@ public void testIsExpired_whenNotExpired_thenReturnFalse() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
boolean isExpired = ip2GeoCache.isExpired(datasource.getName());
boolean isExpired = ip2GeoCachedDao.isExpired(datasource.getName());

// Verify
assertFalse(isExpired);
Expand All @@ -73,7 +73,7 @@ public void testHas_whenHasDatasource_thenReturnTrue() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
boolean hasDatasource = ip2GeoCache.has(datasource.getName());
boolean hasDatasource = ip2GeoCachedDao.has(datasource.getName());

// Verify
assertTrue(hasDatasource);
Expand All @@ -85,7 +85,7 @@ public void testHas_whenNoDatasource_thenReturnFalse() {

String datasourceName = GeospatialTestHelper.randomLowerCaseString();
// Run
boolean hasDatasource = ip2GeoCache.has(datasourceName);
boolean hasDatasource = ip2GeoCachedDao.has(datasourceName);

// Verify
assertFalse(hasDatasource);
Expand All @@ -96,7 +96,7 @@ public void testGetState_whenCalled_thenReturnState() {
when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList(datasource));

// Run
DatasourceState state = ip2GeoCache.getState(datasource.getName());
DatasourceState state = ip2GeoCachedDao.getState(datasource.getName());

// Verify
assertEquals(datasource.getState(), state);
Expand All @@ -115,13 +115,13 @@ public void testPostIndex_whenFailed_thenNoUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);

// Run
ip2GeoCache.postIndex(shardId, index, result);
ip2GeoCachedDao.postIndex(shardId, index, result);

// Verify
assertFalse(ip2GeoCache.has(datasource.getName()));
assertTrue(ip2GeoCache.isExpired(datasource.getName()));
assertNull(ip2GeoCache.getIndexName(datasource.getName()));
assertNull(ip2GeoCache.getState(datasource.getName()));
assertFalse(ip2GeoCachedDao.has(datasource.getName()));
assertTrue(ip2GeoCachedDao.isExpired(datasource.getName()));
assertNull(ip2GeoCachedDao.getIndexName(datasource.getName()));
assertNull(ip2GeoCachedDao.getState(datasource.getName()));
}

@SneakyThrows
Expand All @@ -137,13 +137,13 @@ public void testPostIndex_whenSucceed_thenUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

// Run
ip2GeoCache.postIndex(shardId, index, result);
ip2GeoCachedDao.postIndex(shardId, index, result);

// Verify
assertTrue(ip2GeoCache.has(datasource.getName()));
assertFalse(ip2GeoCache.isExpired(datasource.getName()));
assertEquals(datasource.currentIndexName(), ip2GeoCache.getIndexName(datasource.getName()));
assertEquals(datasource.getState(), ip2GeoCache.getState(datasource.getName()));
assertTrue(ip2GeoCachedDao.has(datasource.getName()));
assertFalse(ip2GeoCachedDao.isExpired(datasource.getName()));
assertEquals(datasource.currentIndexName(), ip2GeoCachedDao.getIndexName(datasource.getName()));
assertEquals(datasource.getState(), ip2GeoCachedDao.getState(datasource.getName()));
}

public void testPostDelete_whenFailed_thenNoUpdate() {
Expand All @@ -156,10 +156,10 @@ public void testPostDelete_whenFailed_thenNoUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);

// Run
ip2GeoCache.postDelete(shardId, index, result);
ip2GeoCachedDao.postDelete(shardId, index, result);

// Verify
assertTrue(ip2GeoCache.has(datasource.getName()));
assertTrue(ip2GeoCachedDao.has(datasource.getName()));
}

public void testPostDelete_whenSucceed_thenUpdate() {
Expand All @@ -173,9 +173,9 @@ public void testPostDelete_whenSucceed_thenUpdate() {
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

// Run
ip2GeoCache.postDelete(shardId, index, result);
ip2GeoCachedDao.postDelete(shardId, index, result);

// Verify
assertFalse(ip2GeoCache.has(datasource.getName()));
assertFalse(ip2GeoCachedDao.has(datasource.getName()));
}
}
Loading

0 comments on commit eb5abf3

Please sign in to comment.