diff --git a/container/features/src/main/resources/features-sentinel.xml b/container/features/src/main/resources/features-sentinel.xml index d79751cfecc2..48b11d431c16 100644 --- a/container/features/src/main/resources/features-sentinel.xml +++ b/container/features/src/main/resources/features-sentinel.xml @@ -139,6 +139,7 @@ wrap:mvn:org.freemarker/freemarker/${freemarkerVersion}$Bundle-SymbolicName=org.freemarker&Bundle-Version=${freemarkerVersion} mvn:org.opennms.core/org.opennms.core.cache/${project.version} mvn:org.opennms.features.telemetry.protocols.netflow/org.opennms.features.telemetry.protocols.netflow.transport/${project.version} + mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.cache/${project.version} mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.common/${project.version} mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.flows/${project.version} mvn:org.opennms.features.telemetry.protocols.netflow/org.opennms.features.telemetry.protocols.netflow.adapter/${project.version} diff --git a/container/features/src/main/resources/features.xml b/container/features/src/main/resources/features.xml index e79bfcb71608..bf661b65558e 100644 --- a/container/features/src/main/resources/features.xml +++ b/container/features/src/main/resources/features.xml @@ -1151,6 +1151,7 @@ --> mvn:com.google.protobuf/protobuf-java/${protobufVersion} wrap:mvn:com.google.protobuf/protobuf-java-util/${protobufVersion}$overwrite=merge&Import-Package=javax.annotation;version=!,* + mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.cache/${project.version} mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.common/${project.version} mvn:org.opennms.features.telemetry.protocols/org.opennms.features.telemetry.protocols.flows/${project.version} mvn:org.opennms.features.telemetry.protocols.netflow/org.opennms.features.telemetry.protocols.netflow.parser/${project.version} diff --git a/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java b/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java index a5618e267fc5..f00fb9c77ce1 100644 --- a/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java +++ b/features/flows/elastic/src/main/java/org/opennms/netmgt/flows/elastic/FlowDocument.java @@ -29,13 +29,10 @@ package org.opennms.netmgt.flows.elastic; import java.util.LinkedHashSet; -import java.util.List; import java.util.Objects; import java.util.Set; import org.opennms.integration.api.v1.flows.Flow; -import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow; -import org.opennms.netmgt.flows.processing.enrichment.NodeInfo; import com.google.gson.annotations.SerializedName; diff --git a/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java b/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java index 81cb0f30451f..f11ba4a1a99d 100644 --- a/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java +++ b/features/flows/elastic/src/test/java/org/opennms/netmgt/flows/elastic/FlowDocumentTest.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.time.Instant; import java.util.List; import java.util.Optional; @@ -45,7 +44,7 @@ import org.opennms.core.test.xml.JsonTest; import org.opennms.netmgt.flows.api.Flow; import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow; -import org.opennms.netmgt.flows.processing.enrichment.NodeInfo; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo; import com.google.common.io.Resources; import com.google.gson.Gson; diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java index ff2e5006a375..9bf904bf27b0 100644 --- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java +++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/AggregatedFlowQueryIT.java @@ -93,13 +93,15 @@ import org.opennms.netmgt.flows.classification.persistence.api.RuleBuilder; import org.opennms.netmgt.flows.elastic.agg.AggregatedFlowQueryService; import org.opennms.netmgt.flows.processing.impl.DocumentEnricherImpl; -import org.opennms.netmgt.flows.processing.enrichment.NodeInfo; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo; import org.opennms.netmgt.flows.filter.api.Filter; import org.opennms.netmgt.flows.filter.api.SnmpInterfaceIdFilter; import org.opennms.netmgt.flows.filter.api.TimeRangeFilter; import org.opennms.netmgt.flows.persistence.FlowDocumentBuilder; import org.opennms.netmgt.flows.processing.FlowBuilder; import org.opennms.netmgt.flows.processing.impl.DocumentMangler; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableSet; @@ -168,18 +170,24 @@ public void setUp() throws MalformedURLException, ExecutionException, Interrupte new RuleBuilder().withName("https").withSrcPort("443").withProtocol("tcp,udp").build()), FilterService.NOOP); - documentEnricher = new DocumentEnricherImpl(metricRegistry, - new MockNodeDao(), - new MockIpInterfaceDao(), - new MockInterfaceToNodeCache(), - new MockSessionUtils(), + final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl( + new CacheConfigBuilder() + .withName("nodeInfoCache") + .withMaximumSize(1000) + .withExpireAfterWrite(300) + .withExpireAfterRead(300) + .build(), + true, + new MetricRegistry(), + new MockNodeDao(), + new MockIpInterfaceDao(), + new MockInterfaceToNodeCache() + ); + documentEnricher = new DocumentEnricherImpl(new MockSessionUtils(), classificationEngine, - new CacheConfigBuilder() - .withName("flows.node") - .withMaximumSize(1000) - .withExpireAfterWrite(300) - .build(), 0, - new DocumentMangler(new ScriptEngineManager())); + 0, + new DocumentMangler(new ScriptEngineManager()), + nodeInfoCache); // The repository should be empty assertThat(smartQueryService.getFlowCount(Collections.singletonList(new TimeRangeFilter(0, System.currentTimeMillis()))).get(), equalTo(0L)); diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java index 2537eb65d412..e71cc237559d 100644 --- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java +++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/FlowQueryIT.java @@ -101,6 +101,8 @@ import org.opennms.netmgt.flows.filter.api.SnmpInterfaceIdFilter; import org.opennms.netmgt.flows.filter.api.TimeRangeFilter; import org.opennms.netmgt.flows.processing.impl.DocumentMangler; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableSet; @@ -143,19 +145,25 @@ public void setUp() throws MalformedURLException, ExecutionException, Interrupte new RuleBuilder().withName("http").withSrcPort("80").withProtocol("tcp,udp").build(), new RuleBuilder().withName("https").withSrcPort("443").withProtocol("tcp,udp").build()), FilterService.NOOP); + final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl( + new CacheConfigBuilder() + .withName("nodeInfoCache") + .withMaximumSize(1000) + .withExpireAfterWrite(300) + .withExpireAfterRead(300) + .build(), + true, + new MetricRegistry(), + new MockNodeDao(), + new MockIpInterfaceDao(), + new MockInterfaceToNodeCache() + ); - documentEnricher = new DocumentEnricherImpl(metricRegistry, - new MockNodeDao(), - new MockIpInterfaceDao(), - new MockInterfaceToNodeCache(), - new MockSessionUtils(), + documentEnricher = new DocumentEnricherImpl(new MockSessionUtils(), classificationEngine, - new CacheConfigBuilder() - .withName("flows.node") - .withMaximumSize(1000) - .withExpireAfterWrite(300) - .build(), 0, - new DocumentMangler(new ScriptEngineManager())); + 0, + new DocumentMangler(new ScriptEngineManager()), + nodeInfoCache); final RawIndexInitializer initializer = new RawIndexInitializer(client, settings); diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java index 843586a61329..12625cbe5ca7 100644 --- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java +++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/KafkaFlowForwarderIT.java @@ -61,7 +61,7 @@ import org.opennms.netmgt.flows.persistence.KafkaFlowForwarder; import org.opennms.netmgt.flows.persistence.model.FlowDocument; import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow; -import org.opennms.netmgt.flows.processing.enrichment.NodeInfo; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo; import org.osgi.service.cm.ConfigurationAdmin; import com.codahale.metrics.MetricRegistry; diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java index e3cff0c4cdb0..9b2d09bcce79 100644 --- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java +++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/MarkerCacheIT.java @@ -59,7 +59,7 @@ import org.opennms.netmgt.dao.api.SessionUtils; import org.opennms.netmgt.dao.api.SnmpInterfaceDao; import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow; -import org.opennms.netmgt.flows.processing.enrichment.NodeInfo; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo; import org.opennms.netmgt.flows.processing.impl.InterfaceMarkerImpl; import org.opennms.netmgt.model.OnmsSnmpInterface; import org.opennms.test.JUnitConfigurationEnvironment; diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java index 6e05b61d9e74..87ab55af67c2 100644 --- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java +++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/NodeIdentificationIT.java @@ -62,6 +62,8 @@ import org.opennms.netmgt.model.NetworkBuilder; import org.opennms.netmgt.model.OnmsMetaData; import org.opennms.netmgt.model.OnmsNode; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl; import org.opennms.test.JUnitConfigurationEnvironment; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; @@ -115,16 +117,26 @@ public void before() { @Test public void testSomething() throws InterruptedException { final ClassificationEngine classificationEngine = new DefaultClassificationEngine(() -> Collections.emptyList(), FilterService.NOOP); - final DocumentEnricherImpl documentEnricher = new DocumentEnricherImpl( - new MetricRegistry(), - databasePopulator.getNodeDao(), databasePopulator.getIpInterfaceDao(), - interfaceToNodeCache, sessionUtils, classificationEngine, + final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl( new CacheConfigBuilder() - .withName("flows.node") + .withName("nodeInfoCache") .withMaximumSize(1000) .withExpireAfterWrite(300) - .build(), 0, - new DocumentMangler(new ScriptEngineManager())); + .withExpireAfterRead(300) + .build(), + true, + new MetricRegistry(), + databasePopulator.getNodeDao(), + databasePopulator.getIpInterfaceDao(), + interfaceToNodeCache + ); + + final DocumentEnricherImpl documentEnricher = new DocumentEnricherImpl( + sessionUtils, + classificationEngine, + 0, + new DocumentMangler(new ScriptEngineManager()), + nodeInfoCache); final TestFlow testFlow = new TestFlow(); testFlow.setSrcAddr("1.1.1.1"); diff --git a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java index 7abbcd3b1500..a642a3cb9ec0 100644 --- a/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java +++ b/features/flows/itests/src/test/java/org/opennms/netmgt/flows/elastic/ThresholdingIT.java @@ -65,7 +65,7 @@ import org.opennms.netmgt.filter.api.FilterDao; import org.opennms.integration.api.v1.flows.Flow; import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow; -import org.opennms.netmgt.flows.processing.enrichment.NodeInfo; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo; import org.opennms.netmgt.flows.api.FlowSource; import org.opennms.netmgt.flows.classification.ClassificationEngine; import org.opennms.netmgt.flows.classification.ClassificationRuleProvider; diff --git a/features/flows/processing/pom.xml b/features/flows/processing/pom.xml index a1ad1170c857..5d2a1298d576 100644 --- a/features/flows/processing/pom.xml +++ b/features/flows/processing/pom.xml @@ -72,6 +72,16 @@ org.opennms.features.telemetry.config.api ${project.version} + + org.opennms.features.telemetry.protocols + org.opennms.features.telemetry.protocols.common + ${project.version} + + + org.opennms.features.telemetry.protocols + org.opennms.features.telemetry.protocols.cache + ${project.version} + org.opennms.dependencies oia-dependencies diff --git a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java b/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java index d63f3b658f97..accc567c73ca 100644 --- a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java +++ b/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/impl/DocumentEnricherImpl.java @@ -34,20 +34,10 @@ import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.opennms.core.cache.Cache; -import org.opennms.core.cache.CacheBuilder; -import org.opennms.core.cache.CacheConfig; -import org.opennms.core.cache.CacheConfigBuilder; -import org.opennms.core.mate.api.ContextKey; import org.opennms.core.utils.InetAddressUtils; -import org.opennms.netmgt.dao.api.InterfaceToNodeCache; -import org.opennms.netmgt.dao.api.IpInterfaceDao; -import org.opennms.netmgt.dao.api.NodeDao; import org.opennms.netmgt.dao.api.SessionUtils; import org.opennms.netmgt.flows.api.Flow; import org.opennms.netmgt.flows.api.FlowSource; @@ -55,83 +45,32 @@ import org.opennms.netmgt.flows.classification.ClassificationRequest; import org.opennms.netmgt.flows.classification.persistence.api.Protocols; import org.opennms.netmgt.flows.processing.enrichment.EnrichedFlow; -import org.opennms.netmgt.flows.processing.enrichment.NodeInfo; -import org.opennms.netmgt.model.OnmsCategory; -import org.opennms.netmgt.model.OnmsIpInterface; -import org.opennms.netmgt.model.OnmsNode; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import com.google.common.base.Strings; -import com.google.common.cache.CacheLoader; - public class DocumentEnricherImpl { private static final Logger LOG = LoggerFactory.getLogger(DocumentEnricherImpl.class); - private static final String NODE_METADATA_CACHE = "flows.node.metadata"; - - private final NodeDao nodeDao; - - private final IpInterfaceDao ipInterfaceDao; - - private final InterfaceToNodeCache interfaceToNodeCache; - private final SessionUtils sessionUtils; private final ClassificationEngine classificationEngine; - // Caches NodeDocument data for a given node Id. - private final Cache> nodeInfoCache; - - // Caches NodeDocument data for a given node metadata. - private final Cache> nodeMetadataCache; - - private final Timer nodeLoadTimer; + private final NodeInfoCache nodeInfoCache; private final long clockSkewCorrectionThreshold; private final DocumentMangler mangler; - public DocumentEnricherImpl(final MetricRegistry metricRegistry, - final NodeDao nodeDao, - final IpInterfaceDao ipInterfaceDao, - final InterfaceToNodeCache interfaceToNodeCache, - final SessionUtils sessionUtils, + public DocumentEnricherImpl(final SessionUtils sessionUtils, final ClassificationEngine classificationEngine, - final CacheConfig cacheConfig, final long clockSkewCorrectionThreshold, - final DocumentMangler mangler) { - this.nodeDao = Objects.requireNonNull(nodeDao); - this.ipInterfaceDao = Objects.requireNonNull(ipInterfaceDao); - this.interfaceToNodeCache = Objects.requireNonNull(interfaceToNodeCache); + final DocumentMangler mangler, + final NodeInfoCache nodeInfoCache) { this.sessionUtils = Objects.requireNonNull(sessionUtils); this.classificationEngine = Objects.requireNonNull(classificationEngine); - - this.nodeInfoCache = new CacheBuilder() - .withConfig(cacheConfig) - .withCacheLoader(new CacheLoader>() { - @Override - public Optional load(InterfaceToNodeCache.Entry entry) { - return getNodeInfo(entry); - } - }).build(); - - final CacheConfig nodeMetadataCacheConfig = buildMetadataCacheConfig(cacheConfig); - this.nodeMetadataCache = new CacheBuilder() - .withConfig(nodeMetadataCacheConfig) - .withCacheLoader(new CacheLoader>() { - @Override - public Optional load(NodeMetadataKey key) { - return getNodeInfoFromMetadataContext(key.contextKey, key.value); - } - }).build(); - - this.nodeLoadTimer = metricRegistry.timer("nodeLoadTime"); - + this.nodeInfoCache = Objects.requireNonNull(nodeInfoCache); this.clockSkewCorrectionThreshold = clockSkewCorrectionThreshold; - this.mangler = Objects.requireNonNull(mangler); } @@ -152,12 +91,12 @@ public List enrich(final Collection flows, final FlowSource document.setLocation(source.getLocation()); // Node data - getNodeInfoFromCache(source.getLocation(), source.getSourceAddress(), source.getContextKey(), flow.getNodeIdentifier()).ifPresent(document::setExporterNodeInfo); + nodeInfoCache.getNodeInfoFromCache(source.getLocation(), source.getSourceAddress(), source.getContextKey(), flow.getNodeIdentifier()).ifPresent(document::setExporterNodeInfo); if (flow.getDstAddr() != null) { - getNodeInfoFromCache(source.getLocation(), flow.getDstAddr(), null, null).ifPresent(document::setSrcNodeInfo); + nodeInfoCache.getNodeInfoFromCache(source.getLocation(), flow.getDstAddr(), null, null).ifPresent(document::setSrcNodeInfo); } if (flow.getSrcAddr() != null) { - getNodeInfoFromCache(source.getLocation(), flow.getSrcAddr(), null, null).ifPresent(document::setDstNodeInfo); + nodeInfoCache.getNodeInfoFromCache(source.getLocation(), flow.getSrcAddr(), null, null).ifPresent(document::setDstNodeInfo); } // Locality @@ -207,107 +146,7 @@ private static boolean isPrivateAddress(String ipAddress) { return inetAddress.isLoopbackAddress() || inetAddress.isLinkLocalAddress() || inetAddress.isSiteLocalAddress(); } - private Optional getNodeInfoFromCache(final String location, final String ipAddress, final ContextKey contextKey, final String value) { - Optional nodeDocument = Optional.empty(); - if (contextKey != null && !Strings.isNullOrEmpty(value)) { - final NodeMetadataKey metadataKey = new NodeMetadataKey(contextKey, value); - try { - nodeDocument = this.nodeMetadataCache.get(metadataKey); - } catch (ExecutionException e) { - LOG.error("Error while retrieving NodeDocument from NodeMetadataCache: {}.", e.getMessage(), e); - throw new RuntimeException(e); - } - if(nodeDocument.isPresent()) { - return nodeDocument; - } - } - - final var entry = this.interfaceToNodeCache.getFirst(location, InetAddressUtils.addr(ipAddress)); - if(entry.isPresent()) { - try { - return this.nodeInfoCache.get(entry.get()); - } catch (ExecutionException e) { - LOG.error("Error while retrieving NodeDocument from NodeInfoCache: {}.", e.getMessage(), e); - throw new RuntimeException(e); - } - } - return nodeDocument; - } - - // Key class, which is used to cache NodeInfo for a given node metadata. - private static class NodeMetadataKey { - - public final ContextKey contextKey; - - public final String value; - - private NodeMetadataKey(final ContextKey contextKey, final String value) { - this.contextKey = contextKey; - this.value = value; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - final NodeMetadataKey that = (NodeMetadataKey) o; - return Objects.equals(contextKey, that.contextKey) && - Objects.equals(value, that.value); - } - - @Override - public int hashCode() { - return Objects.hash(contextKey, value); - } - } - - private Optional getNodeInfoFromMetadataContext(ContextKey contextKey, String value) { - // First, try to find interface - final List ifaces; - try (Timer.Context ctx = this.nodeLoadTimer.time()) { - ifaces = this.ipInterfaceDao.findInterfacesWithMetadata(contextKey.getContext(), contextKey.getKey(), value); - } - if (!ifaces.isEmpty()) { - final var iface = ifaces.get(0); - return mapOnmsNodeToNodeDocument(iface.getNode(), iface.getId()); - } - - // Alternatively, try to find node and chose primary interface - final List nodes; - try (Timer.Context ctx = this.nodeLoadTimer.time()) { - nodes = this.nodeDao.findNodeWithMetaData(contextKey.getContext(), contextKey.getKey(), value); - } - if(!nodes.isEmpty()) { - final var node = nodes.get(0); - return mapOnmsNodeToNodeDocument(node, node.getPrimaryInterface().getId()); - } - - return Optional.empty(); - } - - private Optional getNodeInfo(final InterfaceToNodeCache.Entry entry) { - final OnmsNode onmsNode; - try (Timer.Context ctx = this.nodeLoadTimer.time()) { - onmsNode = this.nodeDao.get(entry.nodeId); - } - - return mapOnmsNodeToNodeDocument(onmsNode, entry.interfaceId); - } - - private Optional mapOnmsNodeToNodeDocument(final OnmsNode onmsNode, final int interfaceId) { - if(onmsNode != null) { - final NodeInfo nodeDocument = new NodeInfo(); - nodeDocument.setForeignSource(onmsNode.getForeignSource()); - nodeDocument.setForeignId(onmsNode.getForeignId()); - nodeDocument.setNodeId(onmsNode.getId()); - nodeDocument.setInterfaceId(interfaceId); - nodeDocument.setCategories(onmsNode.getCategories().stream().map(OnmsCategory::getName).collect(Collectors.toList())); - - return Optional.of(nodeDocument); - } - return Optional.empty(); - } public static ClassificationRequest createClassificationRequest(EnrichedFlow document) { final ClassificationRequest request = new ClassificationRequest(); @@ -321,16 +160,4 @@ public static ClassificationRequest createClassificationRequest(EnrichedFlow doc return request; } - - private CacheConfig buildMetadataCacheConfig(CacheConfig cacheConfig) { - // Use existing config for the nodes with a new name for node metadata cache. - final CacheConfig metadataCacheConfig = new CacheConfigBuilder() - .withName(NODE_METADATA_CACHE) - .withMaximumSize(cacheConfig.getMaximumSize()) - .withExpireAfterWrite(cacheConfig.getExpireAfterWrite()) - .build(); - cacheConfig.setRecordStats(true); - cacheConfig.setMetricRegistry(cacheConfig.getMetricRegistry()); - return metadataCacheConfig; - } } diff --git a/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml index f7bcef8c6d20..dd6cc7f0412b 100644 --- a/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/features/flows/processing/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -2,18 +2,12 @@ xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0" xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd - http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0 http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd"> - - - - - @@ -25,16 +19,6 @@ - - - - - - - - - - @@ -73,6 +57,7 @@ + @@ -81,15 +66,11 @@ - - - - - - + + diff --git a/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java b/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java index f3d72e8e57e4..904035e4ffd8 100644 --- a/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java +++ b/features/flows/processing/src/test/java/org/opennms/netmgt/flows/processing/enrichment/MockDocumentEnricherFactory.java @@ -55,6 +55,8 @@ import org.opennms.netmgt.flows.classification.persistence.api.RuleBuilder; import org.opennms.netmgt.flows.processing.impl.DocumentEnricherImpl; import org.opennms.netmgt.flows.processing.impl.DocumentMangler; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCacheImpl; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Lists; @@ -88,16 +90,23 @@ public MockDocumentEnricherFactory(final long clockSkewCorrectionThreshold) thro new RuleBuilder().withName("http").withSrcPort("80").withProtocol("tcp,udp").build(), new RuleBuilder().withName("https").withSrcPort("443").withProtocol("tcp,udp").build() ), FilterService.NOOP); - enricher = new DocumentEnricherImpl( - new MetricRegistry(), - nodeDao, ipInterfaceDao, - interfaceToNodeCache, new MockSessionUtils(), classificationEngine, + final NodeInfoCache nodeInfoCache = new NodeInfoCacheImpl( new CacheConfigBuilder() - .withName("flows.node") - .withMaximumSize(1000) - .withExpireAfterWrite(300) - .build(), clockSkewCorrectionThreshold, - new DocumentMangler(new ScriptEngineManager())); + .withName("nodeInfoCache") + .withMaximumSize(1000) + .withExpireAfterWrite(300) + .withExpireAfterRead(300) + .build(), + true, + new MetricRegistry(), + nodeDao, + ipInterfaceDao, + interfaceToNodeCache + ); + enricher = new DocumentEnricherImpl( + new MockSessionUtils(), classificationEngine, + clockSkewCorrectionThreshold, + new DocumentMangler(new ScriptEngineManager()), nodeInfoCache); // Required for mock node dao addServiceRegistry(nodeDao); diff --git a/features/telemetry/protocols/adapters/pom.xml b/features/telemetry/protocols/adapters/pom.xml index 5fd4e594d910..65615ac593df 100644 --- a/features/telemetry/protocols/adapters/pom.xml +++ b/features/telemetry/protocols/adapters/pom.xml @@ -53,6 +53,11 @@ org.opennms opennms-dao-api + + org.opennms.features.telemetry.protocols + org.opennms.features.telemetry.protocols.cache + ${project.version} + org.osgi osgi.core diff --git a/features/telemetry/protocols/cache/pom.xml b/features/telemetry/protocols/cache/pom.xml new file mode 100644 index 000000000000..38274f17f393 --- /dev/null +++ b/features/telemetry/protocols/cache/pom.xml @@ -0,0 +1,78 @@ + + + + org.opennms.features.telemetry + org.opennms.features.telemetry.protocols + 31.0.3-SNAPSHOT + + 4.0.0 + org.opennms.features.telemetry.protocols + org.opennms.features.telemetry.protocols.cache + OpenNMS :: Features :: Telemetry :: Protocols :: Cache + bundle + + + + org.apache.felix + maven-bundle-plugin + true + + + JavaSE-1.8 + ${project.artifactId} + ${project.version} + + + + + + + + org.opennms.features.telemetry + org.opennms.features.telemetry.api + ${project.version} + + + org.opennms.features.telemetry.config + org.opennms.features.telemetry.config.api + ${project.version} + + + org.opennms.features.telemetry + org.opennms.features.telemetry.listeners + ${project.version} + + + org.opennms.core + org.opennms.core.lib + + + org.opennms.core.ipc.sink + org.opennms.core.ipc.sink.api + + + org.opennms.dependencies + spring-dependencies + pom + + + org.mongodb + bson + + + org.opennms.core.mate + org.opennms.core.mate.api + ${project.version} + + + org.opennms.core + org.opennms.core.cache + ${project.version} + + + org.opennms.features.flows + org.opennms.features.flows.api + ${project.version} + + + diff --git a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/enrichment/NodeInfo.java b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfo.java similarity index 97% rename from features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/enrichment/NodeInfo.java rename to features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfo.java index c84f71c4d845..eef765232c2f 100644 --- a/features/flows/processing/src/main/java/org/opennms/netmgt/flows/processing/enrichment/NodeInfo.java +++ b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfo.java @@ -26,7 +26,7 @@ * http://www.opennms.com/ *******************************************************************************/ -package org.opennms.netmgt.flows.processing.enrichment; +package org.opennms.netmgt.telemetry.protocols.cache; import java.util.List; diff --git a/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCache.java b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCache.java new file mode 100644 index 000000000000..a47e4bca1a95 --- /dev/null +++ b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCache.java @@ -0,0 +1,37 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2024 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2024 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ +package org.opennms.netmgt.telemetry.protocols.cache; + +import java.util.Optional; + +import org.opennms.core.mate.api.ContextKey; + +public interface NodeInfoCache { + Optional getNodeInfoFromCache(final String location, final String ipAddress, final ContextKey contextKey, final String value); + +} diff --git a/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCacheImpl.java b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCacheImpl.java new file mode 100644 index 000000000000..a4dc792c6422 --- /dev/null +++ b/features/telemetry/protocols/cache/src/main/java/org/opennms/netmgt/telemetry/protocols/cache/NodeInfoCacheImpl.java @@ -0,0 +1,197 @@ +/******************************************************************************* + * This file is part of OpenNMS(R). + * + * Copyright (C) 2024 The OpenNMS Group, Inc. + * OpenNMS(R) is Copyright (C) 1999-2024 The OpenNMS Group, Inc. + * + * OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc. + * + * OpenNMS(R) is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * OpenNMS(R) is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with OpenNMS(R). If not, see: + * http://www.gnu.org/licenses/ + * + * For more information contact: + * OpenNMS(R) Licensing + * http://www.opennms.org/ + * http://www.opennms.com/ + *******************************************************************************/ +package org.opennms.netmgt.telemetry.protocols.cache; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.opennms.core.cache.Cache; +import org.opennms.core.cache.CacheBuilder; +import org.opennms.core.cache.CacheConfig; +import org.opennms.core.cache.CacheConfigBuilder; +import org.opennms.core.mate.api.ContextKey; +import org.opennms.core.utils.InetAddressUtils; +import org.opennms.netmgt.dao.api.InterfaceToNodeCache; +import org.opennms.netmgt.dao.api.IpInterfaceDao; +import org.opennms.netmgt.dao.api.NodeDao; +import org.opennms.netmgt.model.OnmsCategory; +import org.opennms.netmgt.model.OnmsIpInterface; +import org.opennms.netmgt.model.OnmsNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.common.base.Strings; +import com.google.common.cache.CacheLoader; + +public class NodeInfoCacheImpl implements NodeInfoCache { + private static final Logger LOG = LoggerFactory.getLogger(NodeInfoCacheImpl.class); + + private static class NodeMetadataKey { + + public final ContextKey contextKey; + + public final String value; + + private NodeMetadataKey(final ContextKey contextKey, final String value) { + this.contextKey = contextKey; + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final NodeMetadataKey that = (NodeMetadataKey) o; + return Objects.equals(contextKey, that.contextKey) && + Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(contextKey, value); + } + } + private final NodeDao nodeDao; + private final IpInterfaceDao ipInterfaceDao; + private final InterfaceToNodeCache interfaceToNodeCache; + private final Cache> nodeMetadataCache; + private final Cache> nodeInfoCache; + private final Timer nodeLoadTimer; + + public NodeInfoCacheImpl(final CacheConfig nodeInfoCacheConfig, final boolean nodeMetadataEnabled, final MetricRegistry metricRegistry, final NodeDao nodeDao, final IpInterfaceDao ipInterfaceDao, final InterfaceToNodeCache interfaceToNodeCache) { + this.nodeDao = Objects.requireNonNull(nodeDao); + this.ipInterfaceDao = Objects.requireNonNull(ipInterfaceDao); + this.interfaceToNodeCache = Objects.requireNonNull(interfaceToNodeCache); + + final CacheConfig nodeMetadataCacheConfig = new CacheConfigBuilder() + .withName("nodeMetadataCache") + .withExpireAfterRead(nodeInfoCacheConfig.getExpireAfterRead()) + .withExpireAfterWrite(nodeInfoCacheConfig.getExpireAfterWrite()) + .withMaximumSize(nodeInfoCacheConfig.getMaximumSize()) + .build(); + + this.nodeInfoCache = new CacheBuilder() + .withConfig(Objects.requireNonNull(nodeInfoCacheConfig)) + .withCacheLoader(new CacheLoader>() { + @Override + public Optional load(InterfaceToNodeCache.Entry entry) { + return getNodeInfo(entry); + } + }).build(); + + this.nodeMetadataCache = new CacheBuilder() + .withConfig(nodeMetadataCacheConfig) + .withCacheLoader(new CacheLoader>() { + @Override + public Optional load(NodeMetadataKey key) { + return getNodeInfoFromMetadataContext(key.contextKey, key.value); + } + }).build(); + + this.nodeLoadTimer = Objects.requireNonNull(metricRegistry).timer("nodeLoadTime"); + } + + public Optional getNodeInfoFromCache(final String location, final String ipAddress, final ContextKey contextKey, final String value) { + Optional nodeDocument = Optional.empty(); + if (contextKey != null && !Strings.isNullOrEmpty(value)) { + final NodeMetadataKey metadataKey = new NodeMetadataKey(contextKey, value); + try { + nodeDocument = this.nodeMetadataCache.get(metadataKey); + } catch (ExecutionException e) { + LOG.error("Error while retrieving NodeDocument from NodeMetadataCache: {}.", e.getMessage(), e); + throw new RuntimeException(e); + } + if(nodeDocument.isPresent()) { + return nodeDocument; + } + } + + final var entry = this.interfaceToNodeCache.getFirst(location, InetAddressUtils.addr(ipAddress)); + if(entry.isPresent()) { + try { + return this.nodeInfoCache.get(entry.get()); + } catch (ExecutionException e) { + LOG.error("Error while retrieving NodeDocument from NodeInfoCache: {}.", e.getMessage(), e); + throw new RuntimeException(e); + } + } + return nodeDocument; + } + + private Optional getNodeInfoFromMetadataContext(ContextKey contextKey, String value) { + // First, try to find interface + final List ifaces; + try (Timer.Context ctx = this.nodeLoadTimer.time()) { + ifaces = this.ipInterfaceDao.findInterfacesWithMetadata(contextKey.getContext(), contextKey.getKey(), value); + } + if (!ifaces.isEmpty()) { + final var iface = ifaces.get(0); + return mapOnmsNodeToNodeDocument(iface.getNode(), iface.getId()); + } + + // Alternatively, try to find node and chose primary interface + final List nodes; + try (Timer.Context ctx = this.nodeLoadTimer.time()) { + nodes = this.nodeDao.findNodeWithMetaData(contextKey.getContext(), contextKey.getKey(), value); + } + if(!nodes.isEmpty()) { + final var node = nodes.get(0); + return mapOnmsNodeToNodeDocument(node, node.getPrimaryInterface().getId()); + } + + return Optional.empty(); + } + + private Optional getNodeInfo(final InterfaceToNodeCache.Entry entry) { + final OnmsNode onmsNode; + try (Timer.Context ctx = this.nodeLoadTimer.time()) { + onmsNode = this.nodeDao.get(entry.nodeId); + } + + return mapOnmsNodeToNodeDocument(onmsNode, entry.interfaceId); + } + + private Optional mapOnmsNodeToNodeDocument(final OnmsNode onmsNode, final int interfaceId) { + if(onmsNode != null) { + final NodeInfo nodeDocument = new NodeInfo(); + nodeDocument.setForeignSource(onmsNode.getForeignSource()); + nodeDocument.setForeignId(onmsNode.getForeignId()); + nodeDocument.setNodeId(onmsNode.getId()); + nodeDocument.setInterfaceId(interfaceId); + nodeDocument.setCategories(onmsNode.getCategories().stream().map(OnmsCategory::getName).collect(Collectors.toList())); + + return Optional.of(nodeDocument); + } + return Optional.empty(); + } +} diff --git a/features/telemetry/protocols/cache/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/features/telemetry/protocols/cache/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100644 index 000000000000..00e22060c7fb --- /dev/null +++ b/features/telemetry/protocols/cache/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,74 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java index 899ca7ee887d..22ae356a548d 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java +++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/common/NetflowTelemetryAdapter.java @@ -37,27 +37,36 @@ import javax.script.ScriptException; +import org.opennms.core.mate.api.ContextKey; import org.opennms.netmgt.collection.api.CollectionAgent; import org.opennms.netmgt.collection.api.CollectionAgentFactory; import org.opennms.netmgt.dao.api.InterfaceToNodeCache; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfo; import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLog; import org.opennms.netmgt.telemetry.api.adapter.TelemetryMessageLogEntry; import org.opennms.netmgt.telemetry.config.api.AdapterDefinition; import org.opennms.netmgt.telemetry.protocols.collection.AbstractScriptedCollectionAdapter; import org.opennms.netmgt.telemetry.protocols.collection.CollectionSetWithAgent; import org.opennms.netmgt.telemetry.protocols.collection.ScriptedCollectionSetBuilder; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.opennms.netmgt.telemetry.protocols.netflow.transport.FlowMessage; import org.opennms.netmgt.telemetry.protocols.netflow.transport.Value; import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; public class NetflowTelemetryAdapter extends AbstractScriptedCollectionAdapter { private InterfaceToNodeCache interfaceToNodeCache; private CollectionAgentFactory collectionAgentFactory; - protected NetflowTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) { + private ContextKey contextKey; + private String metaDataNodeLookup; + private final NodeInfoCache nodeInfoCache; + + protected NetflowTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeInfoCache) { super(adapterConfig, metricRegistry); + this.nodeInfoCache = nodeInfoCache; } @Override @@ -83,11 +92,11 @@ public Stream handleCollectionMessage(final TelemetryMes return Stream.empty(); } - final Optional nodeId = interfaceToNodeCache.getFirstNodeId(messageLog.getLocation(), inetAddress); + final Optional nodeInfo = nodeInfoCache.getNodeInfoFromCache(messageLog.getLocation(), messageLog.getSourceAddress(), contextKey, flowMessage.getNodeIdentifier()); final CollectionAgent agent; - if (nodeId.isPresent()) { - agent = collectionAgentFactory.createCollectionAgent(Integer.toString(nodeId.get()), inetAddress); + if (nodeInfo.isPresent()) { + agent = collectionAgentFactory.createCollectionAgent(Integer.toString(nodeInfo.get().getNodeId()), inetAddress); } else { LOG.warn("Unable to find node and interface for agent address: {}", address); @@ -154,4 +163,22 @@ public void setCollectionAgentFactory(final CollectionAgentFactory collectionAge public void setInterfaceToNodeCache(final InterfaceToNodeCache interfaceToNodeCache) { this.interfaceToNodeCache = interfaceToNodeCache; } + + public ContextKey getContextKey() { + return contextKey; + } + + public String getMetaDataNodeLookup() { + return metaDataNodeLookup; + } + + public void setMetaDataNodeLookup(String metaDataNodeLookup) { + this.metaDataNodeLookup = metaDataNodeLookup; + + if (!Strings.isNullOrEmpty(this.metaDataNodeLookup)) { + this.contextKey = new ContextKey(metaDataNodeLookup); + } else { + this.contextKey = null; + } + } } diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java index 7cf18d426912..6a992538083e 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java +++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapter.java @@ -28,12 +28,13 @@ package org.opennms.netmgt.telemetry.protocols.netflow.adapter.ipfix; import org.opennms.netmgt.telemetry.config.api.AdapterDefinition; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.opennms.netmgt.telemetry.protocols.netflow.adapter.common.NetflowTelemetryAdapter; import com.codahale.metrics.MetricRegistry; public class IpfixTelemetryAdapter extends NetflowTelemetryAdapter { - public IpfixTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) { - super(adapterConfig, metricRegistry); + public IpfixTelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeMetadataCache) { + super(adapterConfig, metricRegistry, nodeMetadataCache); } } diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java index adb9f1d8bd28..89600cf93402 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java +++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/ipfix/IpfixTelemetryAdapterFactory.java @@ -30,10 +30,13 @@ import org.opennms.netmgt.telemetry.api.adapter.Adapter; import org.opennms.netmgt.telemetry.config.api.AdapterDefinition; import org.opennms.netmgt.telemetry.protocols.collection.AbstractCollectionAdapterFactory; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.osgi.framework.BundleContext; public class IpfixTelemetryAdapterFactory extends AbstractCollectionAdapterFactory { + private NodeInfoCache nodeInfoCache; + public IpfixTelemetryAdapterFactory() { super(null); } @@ -49,7 +52,7 @@ public Class getBeanClass() { @Override public Adapter createBean(final AdapterDefinition adapterConfig) { - final IpfixTelemetryAdapter adapter = new IpfixTelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry()); + final IpfixTelemetryAdapter adapter = new IpfixTelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry(), nodeInfoCache); adapter.setCollectionAgentFactory(getCollectionAgentFactory()); adapter.setPersisterFactory(getPersisterFactory()); adapter.setFilterDao(getFilterDao()); @@ -57,7 +60,12 @@ public Adapter createBean(final AdapterDefinition adapterConfig) { adapter.setInterfaceToNodeCache(getInterfaceToNodeCache()); adapter.setThresholdingService(getThresholdingService()); adapter.setBundleContext(getBundleContext()); + adapter.setMetaDataNodeLookup(adapterConfig.getParameterMap().get("metaDataNodeLookup")); return adapter; } + + public void setNodeInfoCache(NodeInfoCache nodeInfoCache) { + this.nodeInfoCache = nodeInfoCache; + } } diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java index 73ab9457bef2..28c369be1646 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java +++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapter.java @@ -28,12 +28,13 @@ package org.opennms.netmgt.telemetry.protocols.netflow.adapter.netflow5; import org.opennms.netmgt.telemetry.config.api.AdapterDefinition; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.opennms.netmgt.telemetry.protocols.netflow.adapter.common.NetflowTelemetryAdapter; import com.codahale.metrics.MetricRegistry; public class Netflow5TelemetryAdapter extends NetflowTelemetryAdapter { - public Netflow5TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) { - super(adapterConfig, metricRegistry); + public Netflow5TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeMetadataCache) { + super(adapterConfig, metricRegistry, nodeMetadataCache); } } \ No newline at end of file diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java index 9bc0f64ecd8d..a48454563c60 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java +++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow5/Netflow5TelemetryAdapterFactory.java @@ -30,10 +30,13 @@ import org.opennms.netmgt.telemetry.api.adapter.Adapter; import org.opennms.netmgt.telemetry.config.api.AdapterDefinition; import org.opennms.netmgt.telemetry.protocols.collection.AbstractCollectionAdapterFactory; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.osgi.framework.BundleContext; public class Netflow5TelemetryAdapterFactory extends AbstractCollectionAdapterFactory { + private NodeInfoCache nodeInfoCache; + public Netflow5TelemetryAdapterFactory() { super(null); } @@ -49,7 +52,7 @@ public Class getBeanClass() { @Override public Adapter createBean(final AdapterDefinition adapterConfig) { - final Netflow5TelemetryAdapter adapter = new Netflow5TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry()); + final Netflow5TelemetryAdapter adapter = new Netflow5TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry(), nodeInfoCache); adapter.setCollectionAgentFactory(getCollectionAgentFactory()); adapter.setPersisterFactory(getPersisterFactory()); adapter.setFilterDao(getFilterDao()); @@ -57,8 +60,13 @@ public Adapter createBean(final AdapterDefinition adapterConfig) { adapter.setInterfaceToNodeCache(getInterfaceToNodeCache()); adapter.setThresholdingService(getThresholdingService()); adapter.setBundleContext(getBundleContext()); + adapter.setMetaDataNodeLookup(adapterConfig.getParameterMap().get("metaDataNodeLookup")); return adapter; } + + public void setNodeInfoCache(NodeInfoCache nodeInfoCache) { + this.nodeInfoCache = nodeInfoCache; + } } diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java index 662112f13382..2bf85676b904 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java +++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapter.java @@ -28,12 +28,13 @@ package org.opennms.netmgt.telemetry.protocols.netflow.adapter.netflow9; import org.opennms.netmgt.telemetry.config.api.AdapterDefinition; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.opennms.netmgt.telemetry.protocols.netflow.adapter.common.NetflowTelemetryAdapter; import com.codahale.metrics.MetricRegistry; public class Netflow9TelemetryAdapter extends NetflowTelemetryAdapter { - public Netflow9TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry) { - super(adapterConfig, metricRegistry); + public Netflow9TelemetryAdapter(final AdapterDefinition adapterConfig, final MetricRegistry metricRegistry, final NodeInfoCache nodeMetadataCache) { + super(adapterConfig, metricRegistry, nodeMetadataCache); } } \ No newline at end of file diff --git a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java index efc2bf22ae8b..b6ca2794059e 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java +++ b/features/telemetry/protocols/netflow/adapter/src/main/java/org/opennms/netmgt/telemetry/protocols/netflow/adapter/netflow9/Netflow9TelemetryAdapterFactory.java @@ -30,10 +30,13 @@ import org.opennms.netmgt.telemetry.api.adapter.Adapter; import org.opennms.netmgt.telemetry.config.api.AdapterDefinition; import org.opennms.netmgt.telemetry.protocols.collection.AbstractCollectionAdapterFactory; +import org.opennms.netmgt.telemetry.protocols.cache.NodeInfoCache; import org.osgi.framework.BundleContext; public class Netflow9TelemetryAdapterFactory extends AbstractCollectionAdapterFactory { + private NodeInfoCache nodeInfoCache; + public Netflow9TelemetryAdapterFactory() { super(null); } @@ -49,7 +52,7 @@ public Class getBeanClass() { @Override public Adapter createBean(final AdapterDefinition adapterConfig) { - final Netflow9TelemetryAdapter adapter = new Netflow9TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry()); + final Netflow9TelemetryAdapter adapter = new Netflow9TelemetryAdapter(adapterConfig, getTelemetryRegistry().getMetricRegistry(), nodeInfoCache); adapter.setCollectionAgentFactory(getCollectionAgentFactory()); adapter.setPersisterFactory(getPersisterFactory()); adapter.setFilterDao(getFilterDao()); @@ -57,8 +60,13 @@ public Adapter createBean(final AdapterDefinition adapterConfig) { adapter.setInterfaceToNodeCache(getInterfaceToNodeCache()); adapter.setThresholdingService(getThresholdingService()); adapter.setBundleContext(getBundleContext()); + adapter.setMetaDataNodeLookup(adapterConfig.getParameterMap().get("metaDataNodeLookup")); return adapter; } + + public void setNodeInfoCache(NodeInfoCache nodeInfoCache) { + this.nodeInfoCache = nodeInfoCache; + } } diff --git a/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml index cd6ecb6d8d4f..71a749136ac1 100644 --- a/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/features/telemetry/protocols/netflow/adapter/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -18,6 +18,7 @@ + @@ -41,6 +42,7 @@ + @@ -70,6 +72,7 @@ + @@ -99,6 +102,7 @@ + diff --git a/features/telemetry/protocols/pom.xml b/features/telemetry/protocols/pom.xml index 63d8da21dec8..2e89ca5433b6 100644 --- a/features/telemetry/protocols/pom.xml +++ b/features/telemetry/protocols/pom.xml @@ -13,6 +13,7 @@ adapters bmp + cache common flows graphite