diff --git a/container/features/src/main/resources/features-sentinel.xml b/container/features/src/main/resources/features-sentinel.xml
index d79751cfecc2..48b11d431c16 100644
--- a/container/features/src/main/resources/features-sentinel.xml
+++ b/container/features/src/main/resources/features-sentinel.xml
@@ -139,6 +139,7 @@
wrap:mvn:org.freemarker/freemarker/${freemarkerVersion}$Bundle-SymbolicName=org.freemarker&Bundle-Version=${freemarkerVersion}
mvn:org.opennms.core/org.opennms.core.cache/${project.version}
mvn:org.opennms.features.telemetry.protocols.netflow/org.opennms.features.telemetry.protocols.netflow.transport/${project.version}
+ mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.cache/${project.version}
mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.common/${project.version}
mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.flows/${project.version}
mvn:org.opennms.features.telemetry.protocols.netflow/org.opennms.features.telemetry.protocols.netflow.adapter/${project.version}
diff --git a/container/features/src/main/resources/features.xml b/container/features/src/main/resources/features.xml
index e79bfcb71608..bf661b65558e 100644
--- a/container/features/src/main/resources/features.xml
+++ b/container/features/src/main/resources/features.xml
@@ -1151,6 +1151,7 @@
-->
mvn:com.google.protobuf/protobuf-java/${protobufVersion}
wrap:mvn:com.google.protobuf/protobuf-java-util/${protobufVersion}$overwrite=merge&Import-Package=javax.annotation;version=!,*
+ mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.cache/${project.version}
mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.common/${project.version}
mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.flows/${project.version}
mvn:org.opennms.features.telemetry.protocols.netflow/org.opennms.features.telemetry.protocols.netflow.parser/${project.version}
diff --git a/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java b/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java
index a5618e267fc5..f00fb9c77ce1 100644
--- a/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java
+++ b/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java
@@ -29,13 +29,10 @@
package org.opennms.netmgt.flows.elastic;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.opennms.integration.api.v1.flows.Flow;
-import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow;
-import org.opennms.netmgt.flows.processing.enrichment.NodeInfo;
import com.google.gson.annotations.SerializedName;
diff --git a/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java b/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java
index 81cb0f30451f..f11ba4a1a99d 100644
--- a/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java
+++ b/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java
@@ -37,7 +37,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.time.Instant;
import java.util.List;
import java.util.Optional;
@@ -45,7 +44,7 @@
import org.opennms.core.test.xml.JsonTest;
import org.opennms.netmgt.flows.api.Flow;
import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow;
-import org.opennms.netmgt.flows.processing.enrichment.NodeInfo;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo;
import com.google.common.io.Resources;
import com.google.gson.Gson;
diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java
index ff2e5006a375..9bf904bf27b0 100644
--- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java
+++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java
@@ -93,13 +93,15 @@
import org.opennms.netmgt.flows.classification.persistence.api.RuleBuilder;
import org.opennms.netmgt.flows.elastic.agg.AggregatedFlowQueryService;
import org.opennms.netmgt.flows.processing.impl.DocumentEnricherImpl;
-import org.opennms.netmgt.flows.processing.enrichment.NodeInfo;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo;
import org.opennms.netmgt.flows.filter.api.Filter;
import org.opennms.netmgt.flows.filter.api.SnmpInterfaceIdFilter;
import org.opennms.netmgt.flows.filter.api.TimeRangeFilter;
import org.opennms.netmgt.flows.persistence.FlowDocumentBuilder;
import org.opennms.netmgt.flows.processing.FlowBuilder;
import org.opennms.netmgt.flows.processing.impl.DocumentMangler;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableSet;
@@ -168,18 +170,24 @@ public void setUp() throws MalformedURLException, ExecutionException, Interrupte
new RuleBuilder().withName("https").withSrcPort("443").withProtocol("tcp,udp").build()),
FilterService.NOOP);
- documentEnricher = new DocumentEnricherImpl(metricRegistry,
- new MockNodeDao(),
- new MockIpInterfaceDao(),
- new MockInterfaceToNodeCache(),
- new MockSessionUtils(),
+ final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl(
+ new CacheConfigBuilder()
+ .withName("nodeInfoCache")
+ .withMaximumSize(1000)
+ .withExpireAfterWrite(300)
+ .withExpireAfterRead(300)
+ .build(),
+ true,
+ new MetricRegistry(),
+ new MockNodeDao(),
+ new MockIpInterfaceDao(),
+ new MockInterfaceToNodeCache()
+ );
+ documentEnricher = new DocumentEnricherImpl(new MockSessionUtils(),
classificationEngine,
- new CacheConfigBuilder()
- .withName("flows.node")
- .withMaximumSize(1000)
- .withExpireAfterWrite(300)
- .build(), 0,
- new DocumentMangler(new ScriptEngineManager()));
+ 0,
+ new DocumentMangler(new ScriptEngineManager()),
+ nodeInfoCache);
// The repository should be empty
assertThat(smartQueryService.getFlowCount(Collections.singletonList(new TimeRangeFilter(0, System.currentTimeMillis()))).get(), equalTo(0L));
diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java
index 2537eb65d412..e71cc237559d 100644
--- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java
+++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java
@@ -101,6 +101,8 @@
import org.opennms.netmgt.flows.filter.api.SnmpInterfaceIdFilter;
import org.opennms.netmgt.flows.filter.api.TimeRangeFilter;
import org.opennms.netmgt.flows.processing.impl.DocumentMangler;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.ImmutableSet;
@@ -143,19 +145,25 @@ public void setUp() throws MalformedURLException, ExecutionException, Interrupte
new RuleBuilder().withName("http").withSrcPort("80").withProtocol("tcp,udp").build(),
new RuleBuilder().withName("https").withSrcPort("443").withProtocol("tcp,udp").build()),
FilterService.NOOP);
+ final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl(
+ new CacheConfigBuilder()
+ .withName("nodeInfoCache")
+ .withMaximumSize(1000)
+ .withExpireAfterWrite(300)
+ .withExpireAfterRead(300)
+ .build(),
+ true,
+ new MetricRegistry(),
+ new MockNodeDao(),
+ new MockIpInterfaceDao(),
+ new MockInterfaceToNodeCache()
+ );
- documentEnricher = new DocumentEnricherImpl(metricRegistry,
- new MockNodeDao(),
- new MockIpInterfaceDao(),
- new MockInterfaceToNodeCache(),
- new MockSessionUtils(),
+ documentEnricher = new DocumentEnricherImpl(new MockSessionUtils(),
classificationEngine,
- new CacheConfigBuilder()
- .withName("flows.node")
- .withMaximumSize(1000)
- .withExpireAfterWrite(300)
- .build(), 0,
- new DocumentMangler(new ScriptEngineManager()));
+ 0,
+ new DocumentMangler(new ScriptEngineManager()),
+ nodeInfoCache);
final RawIndexInitializer initializer = new RawIndexInitializer(client, settings);
diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java
index 843586a61329..12625cbe5ca7 100644
--- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java
+++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java
@@ -61,7 +61,7 @@
import org.opennms.netmgt.flows.persistence.KafkaFlowForwarder;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow;
-import org.opennms.netmgt.flows.processing.enrichment.NodeInfo;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo;
import org.osgi.service.cm.ConfigurationAdmin;
import com.codahale.metrics.MetricRegistry;
diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java
index e3cff0c4cdb0..9b2d09bcce79 100644
--- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java
+++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java
@@ -59,7 +59,7 @@
import org.opennms.netmgt.dao.api.SessionUtils;
import org.opennms.netmgt.dao.api.SnmpInterfaceDao;
import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow;
-import org.opennms.netmgt.flows.processing.enrichment.NodeInfo;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo;
import org.opennms.netmgt.flows.processing.impl.InterfaceMarkerImpl;
import org.opennms.netmgt.model.OnmsSnmpInterface;
import org.opennms.test.JUnitConfigurationEnvironment;
diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java
index 6e05b61d9e74..87ab55af67c2 100644
--- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java
+++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java
@@ -62,6 +62,8 @@
import org.opennms.netmgt.model.NetworkBuilder;
import org.opennms.netmgt.model.OnmsMetaData;
import org.opennms.netmgt.model.OnmsNode;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl;
import org.opennms.test.JUnitConfigurationEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
@@ -115,16 +117,26 @@ public void before() {
@Test
public void testSomething() throws InterruptedException {
final ClassificationEngine classificationEngine = new DefaultClassificationEngine(() -> Collections.emptyList(), FilterService.NOOP);
- final DocumentEnricherImpl documentEnricher = new DocumentEnricherImpl(
- new MetricRegistry(),
- databasePopulator.getNodeDao(), databasePopulator.getIpInterfaceDao(),
- interfaceToNodeCache, sessionUtils, classificationEngine,
+ final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl(
new CacheConfigBuilder()
- .withName("flows.node")
+ .withName("nodeInfoCache")
.withMaximumSize(1000)
.withExpireAfterWrite(300)
- .build(), 0,
- new DocumentMangler(new ScriptEngineManager()));
+ .withExpireAfterRead(300)
+ .build(),
+ true,
+ new MetricRegistry(),
+ databasePopulator.getNodeDao(),
+ databasePopulator.getIpInterfaceDao(),
+ interfaceToNodeCache
+ );
+
+ final DocumentEnricherImpl documentEnricher = new DocumentEnricherImpl(
+ sessionUtils,
+ classificationEngine,
+ 0,
+ new DocumentMangler(new ScriptEngineManager()),
+ nodeInfoCache);
final TestFlow testFlow = new TestFlow();
testFlow.setSrcAddr("1.1.1.1");
diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java
index 7abbcd3b1500..a642a3cb9ec0 100644
--- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java
+++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java
@@ -65,7 +65,7 @@
import org.opennms.netmgt.filter.api.FilterDao;
import org.opennms.integration.api.v1.flows.Flow;
import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow;
-import org.opennms.netmgt.flows.processing.enrichment.NodeInfo;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo;
import org.opennms.netmgt.flows.api.FlowSource;
import org.opennms.netmgt.flows.classification.ClassificationEngine;
import org.opennms.netmgt.flows.classification.ClassificationRuleProvider;
diff --git a/features/flows/processing/pom.xml b/features/flows/processing/pom.xml
index a1ad1170c857..5d2a1298d576 100644
--- a/features/flows/processing/pom.xml
+++ b/features/flows/processing/pom.xml
@@ -72,6 +72,16 @@
org.opennms.features.telemetry.config.api
${project.version}
+
+ org.opennms.features.telemetry.protocols
+ org.opennms.features.telemetry.protocols.common
+ ${project.version}
+
+
+ org.opennms.features.telemetry.protocols
+ org.opennms.features.telemetry.protocols.cache
+ ${project.version}
+
org.opennms.dependencies
oia-dependencies
diff --git a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java b/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java
index d63f3b658f97..accc567c73ca 100644
--- a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java
+++ b/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java
@@ -34,20 +34,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.opennms.core.cache.Cache;
-import org.opennms.core.cache.CacheBuilder;
-import org.opennms.core.cache.CacheConfig;
-import org.opennms.core.cache.CacheConfigBuilder;
-import org.opennms.core.mate.api.ContextKey;
import org.opennms.core.utils.InetAddressUtils;
-import org.opennms.netmgt.dao.api.InterfaceToNodeCache;
-import org.opennms.netmgt.dao.api.IpInterfaceDao;
-import org.opennms.netmgt.dao.api.NodeDao;
import org.opennms.netmgt.dao.api.SessionUtils;
import org.opennms.netmgt.flows.api.Flow;
import org.opennms.netmgt.flows.api.FlowSource;
@@ -55,83 +45,32 @@
import org.opennms.netmgt.flows.classification.ClassificationRequest;
import org.opennms.netmgt.flows.classification.persistence.api.Protocols;
import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow;
-import org.opennms.netmgt.flows.processing.enrichment.NodeInfo;
-import org.opennms.netmgt.model.OnmsCategory;
-import org.opennms.netmgt.model.OnmsIpInterface;
-import org.opennms.netmgt.model.OnmsNode;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Strings;
-import com.google.common.cache.CacheLoader;
-
public class DocumentEnricherImpl {
private static final Logger LOG = LoggerFactory.getLogger(DocumentEnricherImpl.class);
- private static final String NODE_METADATA_CACHE = "flows.node.metadata";
-
- private final NodeDao nodeDao;
-
- private final IpInterfaceDao ipInterfaceDao;
-
- private final InterfaceToNodeCache interfaceToNodeCache;
-
private final SessionUtils sessionUtils;
private final ClassificationEngine classificationEngine;
- // Caches NodeDocument data for a given node Id.
- private final Cache> nodeInfoCache;
-
- // Caches NodeDocument data for a given node metadata.
- private final Cache> nodeMetadataCache;
-
- private final Timer nodeLoadTimer;
+ private final NodeInfoCache nodeInfoCache;
private final long clockSkewCorrectionThreshold;
private final DocumentMangler mangler;
- public DocumentEnricherImpl(final MetricRegistry metricRegistry,
- final NodeDao nodeDao,
- final IpInterfaceDao ipInterfaceDao,
- final InterfaceToNodeCache interfaceToNodeCache,
- final SessionUtils sessionUtils,
+ public DocumentEnricherImpl(final SessionUtils sessionUtils,
final ClassificationEngine classificationEngine,
- final CacheConfig cacheConfig,
final long clockSkewCorrectionThreshold,
- final DocumentMangler mangler) {
- this.nodeDao = Objects.requireNonNull(nodeDao);
- this.ipInterfaceDao = Objects.requireNonNull(ipInterfaceDao);
- this.interfaceToNodeCache = Objects.requireNonNull(interfaceToNodeCache);
+ final DocumentMangler mangler,
+ final NodeInfoCache nodeInfoCache) {
this.sessionUtils = Objects.requireNonNull(sessionUtils);
this.classificationEngine = Objects.requireNonNull(classificationEngine);
-
- this.nodeInfoCache = new CacheBuilder()
- .withConfig(cacheConfig)
- .withCacheLoader(new CacheLoader>() {
- @Override
- public Optional load(InterfaceToNodeCache.Entry entry) {
- return getNodeInfo(entry);
- }
- }).build();
-
- final CacheConfig nodeMetadataCacheConfig = buildMetadataCacheConfig(cacheConfig);
- this.nodeMetadataCache = new CacheBuilder()
- .withConfig(nodeMetadataCacheConfig)
- .withCacheLoader(new CacheLoader>() {
- @Override
- public Optional load(NodeMetadataKey key) {
- return getNodeInfoFromMetadataContext(key.contextKey, key.value);
- }
- }).build();
-
- this.nodeLoadTimer = metricRegistry.timer("nodeLoadTime");
-
+ this.nodeInfoCache = Objects.requireNonNull(nodeInfoCache);
this.clockSkewCorrectionThreshold = clockSkewCorrectionThreshold;
-
this.mangler = Objects.requireNonNull(mangler);
}
@@ -152,12 +91,12 @@ public List enrich(final Collection flows, final FlowSource
document.setLocation(source.getLocation());
// Node data
- getNodeInfoFromCache(source.getLocation(), source.getSourceAddress(), source.getContextKey(), flow.getNodeIdentifier()).ifPresent(document::setExporterNodeInfo);
+ nodeInfoCache.getNodeInfoFromCache(source.getLocation(), source.getSourceAddress(), source.getContextKey(), flow.getNodeIdentifier()).ifPresent(document::setExporterNodeInfo);
if (flow.getDstAddr() != null) {
- getNodeInfoFromCache(source.getLocation(), flow.getDstAddr(), null, null).ifPresent(document::setSrcNodeInfo);
+ nodeInfoCache.getNodeInfoFromCache(source.getLocation(), flow.getDstAddr(), null, null).ifPresent(document::setSrcNodeInfo);
}
if (flow.getSrcAddr() != null) {
- getNodeInfoFromCache(source.getLocation(), flow.getSrcAddr(), null, null).ifPresent(document::setDstNodeInfo);
+ nodeInfoCache.getNodeInfoFromCache(source.getLocation(), flow.getSrcAddr(), null, null).ifPresent(document::setDstNodeInfo);
}
// Locality
@@ -207,107 +146,7 @@ private static boolean isPrivateAddress(String ipAddress) {
return inetAddress.isLoopbackAddress() || inetAddress.isLinkLocalAddress() || inetAddress.isSiteLocalAddress();
}
- private Optional getNodeInfoFromCache(final String location, final String ipAddress, final ContextKey contextKey, final String value) {
- Optional nodeDocument = Optional.empty();
- if (contextKey != null && !Strings.isNullOrEmpty(value)) {
- final NodeMetadataKey metadataKey = new NodeMetadataKey(contextKey, value);
- try {
- nodeDocument = this.nodeMetadataCache.get(metadataKey);
- } catch (ExecutionException e) {
- LOG.error("Error while retrieving NodeDocument from NodeMetadataCache: {}.", e.getMessage(), e);
- throw new RuntimeException(e);
- }
- if(nodeDocument.isPresent()) {
- return nodeDocument;
- }
- }
-
- final var entry = this.interfaceToNodeCache.getFirst(location, InetAddressUtils.addr(ipAddress));
- if(entry.isPresent()) {
- try {
- return this.nodeInfoCache.get(entry.get());
- } catch (ExecutionException e) {
- LOG.error("Error while retrieving NodeDocument from NodeInfoCache: {}.", e.getMessage(), e);
- throw new RuntimeException(e);
- }
- }
- return nodeDocument;
- }
-
-
// Key class, which is used to cache NodeInfo for a given node metadata.
- private static class NodeMetadataKey {
-
- public final ContextKey contextKey;
-
- public final String value;
-
- private NodeMetadataKey(final ContextKey contextKey, final String value) {
- this.contextKey = contextKey;
- this.value = value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- final NodeMetadataKey that = (NodeMetadataKey) o;
- return Objects.equals(contextKey, that.contextKey) &&
- Objects.equals(value, that.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(contextKey, value);
- }
- }
-
- private Optional getNodeInfoFromMetadataContext(ContextKey contextKey, String value) {
- // First, try to find interface
- final List ifaces;
- try (Timer.Context ctx = this.nodeLoadTimer.time()) {
- ifaces = this.ipInterfaceDao.findInterfacesWithMetadata(contextKey.getContext(), contextKey.getKey(), value);
- }
- if (!ifaces.isEmpty()) {
- final var iface = ifaces.get(0);
- return mapOnmsNodeToNodeDocument(iface.getNode(), iface.getId());
- }
-
- // Alternatively, try to find node and chose primary interface
- final List nodes;
- try (Timer.Context ctx = this.nodeLoadTimer.time()) {
- nodes = this.nodeDao.findNodeWithMetaData(contextKey.getContext(), contextKey.getKey(), value);
- }
- if(!nodes.isEmpty()) {
- final var node = nodes.get(0);
- return mapOnmsNodeToNodeDocument(node, node.getPrimaryInterface().getId());
- }
-
- return Optional.empty();
- }
-
- private Optional getNodeInfo(final InterfaceToNodeCache.Entry entry) {
- final OnmsNode onmsNode;
- try (Timer.Context ctx = this.nodeLoadTimer.time()) {
- onmsNode = this.nodeDao.get(entry.nodeId);
- }
-
- return mapOnmsNodeToNodeDocument(onmsNode, entry.interfaceId);
- }
-
- private Optional mapOnmsNodeToNodeDocument(final OnmsNode onmsNode, final int interfaceId) {
- if(onmsNode != null) {
- final NodeInfo nodeDocument = new NodeInfo();
- nodeDocument.setForeignSource(onmsNode.getForeignSource());
- nodeDocument.setForeignId(onmsNode.getForeignId());
- nodeDocument.setNodeId(onmsNode.getId());
- nodeDocument.setInterfaceId(interfaceId);
- nodeDocument.setCategories(onmsNode.getCategories().stream().map(OnmsCategory::getName).collect(Collectors.toList()));
-
- return Optional.of(nodeDocument);
- }
- return Optional.empty();
- }
public static ClassificationRequest createClassificationRequest(EnrichedFlow document) {
final ClassificationRequest request = new ClassificationRequest();
@@ -321,16 +160,4 @@ public static ClassificationRequest createClassificationRequest(EnrichedFlow doc
return request;
}
-
- private CacheConfig buildMetadataCacheConfig(CacheConfig cacheConfig) {
- // Use existing config for the nodes with a new name for node metadata cache.
- final CacheConfig metadataCacheConfig = new CacheConfigBuilder()
- .withName(NODE_METADATA_CACHE)
- .withMaximumSize(cacheConfig.getMaximumSize())
- .withExpireAfterWrite(cacheConfig.getExpireAfterWrite())
- .build();
- cacheConfig.setRecordStats(true);
- cacheConfig.setMetricRegistry(cacheConfig.getMetricRegistry());
- return metadataCacheConfig;
- }
}
diff --git a/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f7bcef8c6d20..dd6cc7f0412b 100644
--- a/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -2,18 +2,12 @@
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0"
xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
-
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0
http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd">
-
-
-
-
-
@@ -25,16 +19,6 @@
-
-
-
-
-
-
-
-
-
-
@@ -73,6 +57,7 @@
+
@@ -81,15 +66,11 @@
-
-
-
-
-
-
+
+
diff --git a/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java b/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java
index f3d72e8e57e4..904035e4ffd8 100644
--- a/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java
+++ b/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java
@@ -55,6 +55,8 @@
import org.opennms.netmgt.flows.classification.persistence.api.RuleBuilder;
import org.opennms.netmgt.flows.processing.impl.DocumentEnricherImpl;
import org.opennms.netmgt.flows.processing.impl.DocumentMangler;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
@@ -88,16 +90,23 @@ public MockDocumentEnricherFactory(final long clockSkewCorrectionThreshold) thro
new RuleBuilder().withName("http").withSrcPort("80").withProtocol("tcp,udp").build(),
new RuleBuilder().withName("https").withSrcPort("443").withProtocol("tcp,udp").build()
), FilterService.NOOP);
- enricher = new DocumentEnricherImpl(
- new MetricRegistry(),
- nodeDao, ipInterfaceDao,
- interfaceToNodeCache, new MockSessionUtils(), classificationEngine,
+ final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl(
new CacheConfigBuilder()
- .withName("flows.node")
- .withMaximumSize(1000)
- .withExpireAfterWrite(300)
- .build(), clockSkewCorrectionThreshold,
- new DocumentMangler(new ScriptEngineManager()));
+ .withName("nodeInfoCache")
+ .withMaximumSize(1000)
+ .withExpireAfterWrite(300)
+ .withExpireAfterRead(300)
+ .build(),
+ true,
+ new MetricRegistry(),
+ nodeDao,
+ ipInterfaceDao,
+ interfaceToNodeCache
+ );
+ enricher = new DocumentEnricherImpl(
+ new MockSessionUtils(), classificationEngine,
+ clockSkewCorrectionThreshold,
+ new DocumentMangler(new ScriptEngineManager()), nodeInfoCache);
// Required for mock node dao
addServiceRegistry(nodeDao);
diff --git a/features/telemetry/protocols/adapters/pom.xml b/features/telemetry/protocols/adapters/pom.xml
index 5fd4e594d910..65615ac593df 100644
--- a/features/telemetry/protocols/adapters/pom.xml
+++ b/features/telemetry/protocols/adapters/pom.xml
@@ -53,6 +53,11 @@
org.opennms
opennms-dao-api
+
+ org.opennms.features.telemetry.protocols
+ org.opennms.features.telemetry.protocols.cache
+ ${project.version}
+
org.osgi
osgi.core
diff --git a/features/telemetry/protocols/cache/pom.xml b/features/telemetry/protocols/cache/pom.xml
new file mode 100644
index 000000000000..38274f17f393
--- /dev/null
+++ b/features/telemetry/protocols/cache/pom.xml
@@ -0,0 +1,78 @@
+
+
+
+ org.opennms.features.telemetry
+ org.opennms.features.telemetry.protocols
+ 31.0.3-SNAPSHOT
+
+ 4.0.0
+ org.opennms.features.telemetry.protocols
+ org.opennms.features.telemetry.protocols.cache
+ OpenNMS :: Features :: Telemetry :: Protocols :: Cache
+ bundle
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ true
+
+
+ JavaSE-1.8
+ ${project.artifactId}
+ ${project.version}
+
+
+
+
+
+
+
+ org.opennms.features.telemetry
+ org.opennms.features.telemetry.api
+ ${project.version}
+
+
+ org.opennms.features.telemetry.config
+ org.opennms.features.telemetry.config.api
+ ${project.version}
+
+
+ org.opennms.features.telemetry
+ org.opennms.features.telemetry.listeners
+ ${project.version}
+
+
+ org.opennms.core
+ org.opennms.core.lib
+
+
+ org.opennms.core.ipc.sink
+ org.opennms.core.ipc.sink.api
+
+
+ org.opennms.dependencies
+ spring-dependencies
+ pom
+
+
+ org.mongodb
+ bson
+
+
+ org.opennms.core.mate
+ org.opennms.core.mate.api
+ ${project.version}
+
+
+ org.opennms.core
+ org.opennms.core.cache
+ ${project.version}
+
+
+ org.opennms.features.flows
+ org.opennms.features.flows.api
+ ${project.version}
+
+
+
diff --git a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/enrichment/NodeInfo.java b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfo.java
similarity index 97%
rename from features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/enrichment/NodeInfo.java
rename to features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfo.java
index c84f71c4d845..eef765232c2f 100644
--- a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/enrichment/NodeInfo.java
+++ b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfo.java
@@ -26,7 +26,7 @@
* http://www.opennms.com/
*******************************************************************************/
-package org.opennms.netmgt.flows.processing.enrichment;
+package org.opennms.netmgt.telemetry.protocols.cache;
import java.util.List;
diff --git a/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCache.java b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCache.java
new file mode 100644
index 000000000000..a47e4bca1a95
--- /dev/null
+++ b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCache.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * This file is part of OpenNMS(R).
+ *
+ * Copyright (C) 2024 The OpenNMS Group, Inc.
+ * OpenNMS(R) is Copyright (C) 1999-2024 The OpenNMS Group, Inc.
+ *
+ * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
+ *
+ * OpenNMS(R) is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License,
+ * or (at your option) any later version.
+ *
+ * OpenNMS(R) is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with OpenNMS(R). If not, see:
+ * http://www.gnu.org/licenses/
+ *
+ * For more information contact:
+ * OpenNMS(R) Licensing
+ * http://www.opennms.org/
+ * http://www.opennms.com/
+ *******************************************************************************/
+package org.opennms.netmgt.telemetry.protocols.cache;
+
+import java.util.Optional;
+
+import org.opennms.core.mate.api.ContextKey;
+
+public interface NodeInfoCache {
+ Optional getNodeInfoFromCache(final String location, final String ipAddress, final ContextKey contextKey, final String value);
+
+}
diff --git a/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCacheImpl.java b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCacheImpl.java
new file mode 100644
index 000000000000..a4dc792c6422
--- /dev/null
+++ b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCacheImpl.java
@@ -0,0 +1,197 @@
+/*******************************************************************************
+ * This file is part of OpenNMS(R).
+ *
+ * Copyright (C) 2024 The OpenNMS Group, Inc.
+ * OpenNMS(R) is Copyright (C) 1999-2024 The OpenNMS Group, Inc.
+ *
+ * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
+ *
+ * OpenNMS(R) is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License,
+ * or (at your option) any later version.
+ *
+ * OpenNMS(R) is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with OpenNMS(R). If not, see:
+ * http://www.gnu.org/licenses/
+ *
+ * For more information contact:
+ * OpenNMS(R) Licensing
+ * http://www.opennms.org/
+ * http://www.opennms.com/
+ *******************************************************************************/
+package org.opennms.netmgt.telemetry.protocols.cache;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.opennms.core.cache.Cache;
+import org.opennms.core.cache.CacheBuilder;
+import org.opennms.core.cache.CacheConfig;
+import org.opennms.core.cache.CacheConfigBuilder;
+import org.opennms.core.mate.api.ContextKey;
+import org.opennms.core.utils.InetAddressUtils;
+import org.opennms.netmgt.dao.api.InterfaceToNodeCache;
+import org.opennms.netmgt.dao.api.IpInterfaceDao;
+import org.opennms.netmgt.dao.api.NodeDao;
+import org.opennms.netmgt.model.OnmsCategory;
+import org.opennms.netmgt.model.OnmsIpInterface;
+import org.opennms.netmgt.model.OnmsNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Strings;
+import com.google.common.cache.CacheLoader;
+
+public class NodeInfoCacheImpl implements NodeInfoCache {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeInfoCacheImpl.class);
+
+ private static class NodeMetadataKey {
+
+ public final ContextKey contextKey;
+
+ public final String value;
+
+ private NodeMetadataKey(final ContextKey contextKey, final String value) {
+ this.contextKey = contextKey;
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final NodeMetadataKey that = (NodeMetadataKey) o;
+ return Objects.equals(contextKey, that.contextKey) &&
+ Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(contextKey, value);
+ }
+ }
+ private final NodeDao nodeDao;
+ private final IpInterfaceDao ipInterfaceDao;
+ private final InterfaceToNodeCache interfaceToNodeCache;
+ private final Cache> nodeMetadataCache;
+ private final Cache> nodeInfoCache;
+ private final Timer nodeLoadTimer;
+
+ public NodeInfoCacheImpl(final CacheConfig nodeInfoCacheConfig, final boolean nodeMetadataEnabled, final MetricRegistry metricRegistry, final NodeDao nodeDao, final IpInterfaceDao ipInterfaceDao, final InterfaceToNodeCache interfaceToNodeCache) {
+ this.nodeDao = Objects.requireNonNull(nodeDao);
+ this.ipInterfaceDao = Objects.requireNonNull(ipInterfaceDao);
+ this.interfaceToNodeCache = Objects.requireNonNull(interfaceToNodeCache);
+
+ final CacheConfig nodeMetadataCacheConfig = new CacheConfigBuilder()
+ .withName("nodeMetadataCache")
+ .withExpireAfterRead(nodeInfoCacheConfig.getExpireAfterRead())
+ .withExpireAfterWrite(nodeInfoCacheConfig.getExpireAfterWrite())
+ .withMaximumSize(nodeInfoCacheConfig.getMaximumSize())
+ .build();
+
+ this.nodeInfoCache = new CacheBuilder()
+ .withConfig(Objects.requireNonNull(nodeInfoCacheConfig))
+ .withCacheLoader(new CacheLoader>() {
+ @Override
+ public Optional load(InterfaceToNodeCache.Entry entry) {
+ return getNodeInfo(entry);
+ }
+ }).build();
+
+ this.nodeMetadataCache = new CacheBuilder()
+ .withConfig(nodeMetadataCacheConfig)
+ .withCacheLoader(new CacheLoader>() {
+ @Override
+ public Optional load(NodeMetadataKey key) {
+ return getNodeInfoFromMetadataContext(key.contextKey, key.value);
+ }
+ }).build();
+
+ this.nodeLoadTimer = Objects.requireNonNull(metricRegistry).timer("nodeLoadTime");
+ }
+
+ public Optional getNodeInfoFromCache(final String location, final String ipAddress, final ContextKey contextKey, final String value) {
+ Optional nodeDocument = Optional.empty();
+ if (contextKey != null && !Strings.isNullOrEmpty(value)) {
+ final NodeMetadataKey metadataKey = new NodeMetadataKey(contextKey, value);
+ try {
+ nodeDocument = this.nodeMetadataCache.get(metadataKey);
+ } catch (ExecutionException e) {
+ LOG.error("Error while retrieving NodeDocument from NodeMetadataCache: {}.", e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ if(nodeDocument.isPresent()) {
+ return nodeDocument;
+ }
+ }
+
+ final var entry = this.interfaceToNodeCache.getFirst(location, InetAddressUtils.addr(ipAddress));
+ if(entry.isPresent()) {
+ try {
+ return this.nodeInfoCache.get(entry.get());
+ } catch (ExecutionException e) {
+ LOG.error("Error while retrieving NodeDocument from NodeInfoCache: {}.", e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ return nodeDocument;
+ }
+
+ private Optional getNodeInfoFromMetadataContext(ContextKey contextKey, String value) {
+ // First, try to find interface
+ final List ifaces;
+ try (Timer.Context ctx = this.nodeLoadTimer.time()) {
+ ifaces = this.ipInterfaceDao.findInterfacesWithMetadata(contextKey.getContext(), contextKey.getKey(), value);
+ }
+ if (!ifaces.isEmpty()) {
+ final var iface = ifaces.get(0);
+ return mapOnmsNodeToNodeDocument(iface.getNode(), iface.getId());
+ }
+
+ // Alternatively, try to find node and chose primary interface
+ final List nodes;
+ try (Timer.Context ctx = this.nodeLoadTimer.time()) {
+ nodes = this.nodeDao.findNodeWithMetaData(contextKey.getContext(), contextKey.getKey(), value);
+ }
+ if(!nodes.isEmpty()) {
+ final var node = nodes.get(0);
+ return mapOnmsNodeToNodeDocument(node, node.getPrimaryInterface().getId());
+ }
+
+ return Optional.empty();
+ }
+
+ private Optional getNodeInfo(final InterfaceToNodeCache.Entry entry) {
+ final OnmsNode onmsNode;
+ try (Timer.Context ctx = this.nodeLoadTimer.time()) {
+ onmsNode = this.nodeDao.get(entry.nodeId);
+ }
+
+ return mapOnmsNodeToNodeDocument(onmsNode, entry.interfaceId);
+ }
+
+ private Optional mapOnmsNodeToNodeDocument(final OnmsNode onmsNode, final int interfaceId) {
+ if(onmsNode != null) {
+ final NodeInfo nodeDocument = new NodeInfo();
+ nodeDocument.setForeignSource(onmsNode.getForeignSource());
+ nodeDocument.setForeignId(onmsNode.getForeignId());
+ nodeDocument.setNodeId(onmsNode.getId());
+ nodeDocument.setInterfaceId(interfaceId);
+ nodeDocument.setCategories(onmsNode.getCategories().stream().map(OnmsCategory::getName).collect(Collectors.toList()));
+
+ return Optional.of(nodeDocument);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/features/telemetry/protocols/cache/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/features/telemetry/protocols/cache/src/main/resources/OSGI-INF/blueprint/blueprint.xml
new file mode 100644
index 000000000000..00e22060c7fb
--- /dev/null
+++ b/features/telemetry/protocols/cache/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -0,0 +1,74 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java
index 899ca7ee887d..22ae356a548d 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java
+++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java
@@ -37,27 +37,36 @@
import javax.script.ScriptException;
+import org.opennms.core.mate.api.ContextKey;
import org.opennms.netmgt.collection.api.CollectionAgent;
import org.opennms.netmgt.collection.api.CollectionAgentFactory;
import org.opennms.netmgt.dao.api.InterfaceToNodeCache;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo;
import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLog;
import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLogEntry;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
import org.opennms.netmgt.telemetry.protocols.collection.AbstractScriptedCollectionAdapter;
import org.opennms.netmgt.telemetry.protocols.collection.CollectionSetWithAgent;
import org.opennms.netmgt.telemetry.protocols.collection.ScriptedCollectionSetBuilder;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.opennms.netmgt.telemetry.protocols.netflow.transport.FlowMessage;
import org.opennms.netmgt.telemetry.protocols.netflow.transport.Value;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
public class NetflowTelemetryAdapter extends AbstractScriptedCollectionAdapter {
private InterfaceToNodeCache interfaceToNodeCache;
private CollectionAgentFactory collectionAgentFactory;
- protected NetflowTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) {
+ private ContextKey contextKey;
+ private String metaDataNodeLookup;
+ private final NodeInfoCache nodeInfoCache;
+
+ protected NetflowTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeInfoCache) {
super(adapterConfig, metricRegistry);
+ this.nodeInfoCache = nodeInfoCache;
}
@Override
@@ -83,11 +92,11 @@ public Stream handleCollectionMessage(final TelemetryMes
return Stream.empty();
}
- final Optional nodeId = interfaceToNodeCache.getFirstNodeId(messageLog.getLocation(), inetAddress);
+ final Optional nodeInfo = nodeInfoCache.getNodeInfoFromCache(messageLog.getLocation(), messageLog.getSourceAddress(), contextKey, flowMessage.getNodeIdentifier());
final CollectionAgent agent;
- if (nodeId.isPresent()) {
- agent = collectionAgentFactory.createCollectionAgent(Integer.toString(nodeId.get()), inetAddress);
+ if (nodeInfo.isPresent()) {
+ agent = collectionAgentFactory.createCollectionAgent(Integer.toString(nodeInfo.get().getNodeId()), inetAddress);
} else {
LOG.warn("Unable to find node and interface for agent address: {}", address);
@@ -154,4 +163,22 @@ public void setCollectionAgentFactory(final CollectionAgentFactory collectionAge
public void setInterfaceToNodeCache(final InterfaceToNodeCache interfaceToNodeCache) {
this.interfaceToNodeCache = interfaceToNodeCache;
}
+
+ public ContextKey getContextKey() {
+ return contextKey;
+ }
+
+ public String getMetaDataNodeLookup() {
+ return metaDataNodeLookup;
+ }
+
+ public void setMetaDataNodeLookup(String metaDataNodeLookup) {
+ this.metaDataNodeLookup = metaDataNodeLookup;
+
+ if (!Strings.isNullOrEmpty(this.metaDataNodeLookup)) {
+ this.contextKey = new ContextKey(metaDataNodeLookup);
+ } else {
+ this.contextKey = null;
+ }
+ }
}
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java
index 7cf18d426912..6a992538083e 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java
+++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java
@@ -28,12 +28,13 @@
package org.opennms.netmgt.telemetry.protocols.netflow.adapter.ipfix;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.opennms.netmgt.telemetry.protocols.netflow.adapter.common.NetflowTelemetryAdapter;
import com.codahale.metrics.MetricRegistry;
public class IpfixTelemetryAdapter extends NetflowTelemetryAdapter {
- public IpfixTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) {
- super(adapterConfig, metricRegistry);
+ public IpfixTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeMetadataCache) {
+ super(adapterConfig, metricRegistry, nodeMetadataCache);
}
}
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java
index adb9f1d8bd28..89600cf93402 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java
+++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java
@@ -30,10 +30,13 @@
import org.opennms.netmgt.telemetry.api.adapter.Adapter;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
import org.opennms.netmgt.telemetry.protocols.collection.AbstractCollectionAdapterFactory;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.osgi.framework.BundleContext;
public class IpfixTelemetryAdapterFactory extends AbstractCollectionAdapterFactory {
+ private NodeInfoCache nodeInfoCache;
+
public IpfixTelemetryAdapterFactory() {
super(null);
}
@@ -49,7 +52,7 @@ public Class extends Adapter> getBeanClass() {
@Override
public Adapter createBean(final AdapterDefinition adapterConfig) {
- final IpfixTelemetryAdapter adapter = new IpfixTelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry());
+ final IpfixTelemetryAdapter adapter = new IpfixTelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry(), nodeInfoCache);
adapter.setCollectionAgentFactory(getCollectionAgentFactory());
adapter.setPersisterFactory(getPersisterFactory());
adapter.setFilterDao(getFilterDao());
@@ -57,7 +60,12 @@ public Adapter createBean(final AdapterDefinition adapterConfig) {
adapter.setInterfaceToNodeCache(getInterfaceToNodeCache());
adapter.setThresholdingService(getThresholdingService());
adapter.setBundleContext(getBundleContext());
+ adapter.setMetaDataNodeLookup(adapterConfig.getParameterMap().get("metaDataNodeLookup"));
return adapter;
}
+
+ public void setNodeInfoCache(NodeInfoCache nodeInfoCache) {
+ this.nodeInfoCache = nodeInfoCache;
+ }
}
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java
index 73ab9457bef2..28c369be1646 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java
+++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java
@@ -28,12 +28,13 @@
package org.opennms.netmgt.telemetry.protocols.netflow.adapter.netflow5;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.opennms.netmgt.telemetry.protocols.netflow.adapter.common.NetflowTelemetryAdapter;
import com.codahale.metrics.MetricRegistry;
public class Netflow5TelemetryAdapter extends NetflowTelemetryAdapter {
- public Netflow5TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) {
- super(adapterConfig, metricRegistry);
+ public Netflow5TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeMetadataCache) {
+ super(adapterConfig, metricRegistry, nodeMetadataCache);
}
}
\ No newline at end of file
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java
index 9bc0f64ecd8d..a48454563c60 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java
+++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java
@@ -30,10 +30,13 @@
import org.opennms.netmgt.telemetry.api.adapter.Adapter;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
import org.opennms.netmgt.telemetry.protocols.collection.AbstractCollectionAdapterFactory;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.osgi.framework.BundleContext;
public class Netflow5TelemetryAdapterFactory extends AbstractCollectionAdapterFactory {
+ private NodeInfoCache nodeInfoCache;
+
public Netflow5TelemetryAdapterFactory() {
super(null);
}
@@ -49,7 +52,7 @@ public Class extends Adapter> getBeanClass() {
@Override
public Adapter createBean(final AdapterDefinition adapterConfig) {
- final Netflow5TelemetryAdapter adapter = new Netflow5TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry());
+ final Netflow5TelemetryAdapter adapter = new Netflow5TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry(), nodeInfoCache);
adapter.setCollectionAgentFactory(getCollectionAgentFactory());
adapter.setPersisterFactory(getPersisterFactory());
adapter.setFilterDao(getFilterDao());
@@ -57,8 +60,13 @@ public Adapter createBean(final AdapterDefinition adapterConfig) {
adapter.setInterfaceToNodeCache(getInterfaceToNodeCache());
adapter.setThresholdingService(getThresholdingService());
adapter.setBundleContext(getBundleContext());
+ adapter.setMetaDataNodeLookup(adapterConfig.getParameterMap().get("metaDataNodeLookup"));
return adapter;
}
+
+ public void setNodeInfoCache(NodeInfoCache nodeInfoCache) {
+ this.nodeInfoCache = nodeInfoCache;
+ }
}
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java
index 662112f13382..2bf85676b904 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java
+++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java
@@ -28,12 +28,13 @@
package org.opennms.netmgt.telemetry.protocols.netflow.adapter.netflow9;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.opennms.netmgt.telemetry.protocols.netflow.adapter.common.NetflowTelemetryAdapter;
import com.codahale.metrics.MetricRegistry;
public class Netflow9TelemetryAdapter extends NetflowTelemetryAdapter {
- public Netflow9TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) {
- super(adapterConfig, metricRegistry);
+ public Netflow9TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeMetadataCache) {
+ super(adapterConfig, metricRegistry, nodeMetadataCache);
}
}
\ No newline at end of file
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java
index efc2bf22ae8b..b6ca2794059e 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java
+++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java
@@ -30,10 +30,13 @@
import org.opennms.netmgt.telemetry.api.adapter.Adapter;
import org.opennms.netmgt.telemetry.config.api.AdapterDefinition;
import org.opennms.netmgt.telemetry.protocols.collection.AbstractCollectionAdapterFactory;
+import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache;
import org.osgi.framework.BundleContext;
public class Netflow9TelemetryAdapterFactory extends AbstractCollectionAdapterFactory {
+ private NodeInfoCache nodeInfoCache;
+
public Netflow9TelemetryAdapterFactory() {
super(null);
}
@@ -49,7 +52,7 @@ public Class extends Adapter> getBeanClass() {
@Override
public Adapter createBean(final AdapterDefinition adapterConfig) {
- final Netflow9TelemetryAdapter adapter = new Netflow9TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry());
+ final Netflow9TelemetryAdapter adapter = new Netflow9TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry(), nodeInfoCache);
adapter.setCollectionAgentFactory(getCollectionAgentFactory());
adapter.setPersisterFactory(getPersisterFactory());
adapter.setFilterDao(getFilterDao());
@@ -57,8 +60,13 @@ public Adapter createBean(final AdapterDefinition adapterConfig) {
adapter.setInterfaceToNodeCache(getInterfaceToNodeCache());
adapter.setThresholdingService(getThresholdingService());
adapter.setBundleContext(getBundleContext());
+ adapter.setMetaDataNodeLookup(adapterConfig.getParameterMap().get("metaDataNodeLookup"));
return adapter;
}
+
+ public void setNodeInfoCache(NodeInfoCache nodeInfoCache) {
+ this.nodeInfoCache = nodeInfoCache;
+ }
}
diff --git a/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index cd6ecb6d8d4f..71a749136ac1 100644
--- a/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -18,6 +18,7 @@
+
@@ -41,6 +42,7 @@
+
@@ -70,6 +72,7 @@
+
@@ -99,6 +102,7 @@
+
diff --git a/features/telemetry/protocols/pom.xml b/features/telemetry/protocols/pom.xml
index 63d8da21dec8..2e89ca5433b6 100644
--- a/features/telemetry/protocols/pom.xml
+++ b/features/telemetry/protocols/pom.xml
@@ -13,6 +13,7 @@
adapters
bmp
+ cache
common
flows
graphite