diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java index 85bde1d2d59..1efa200693c 100644 --- a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/protocol/AuditData.java @@ -17,6 +17,9 @@ package org.apache.inlong.audit.protocol; +import lombok.Data; + +@Data public class AuditData { private String ip; @@ -33,115 +36,4 @@ public class AuditData { private long size; private long delay; private long auditVersion; - - public long getAuditVersion() { - return auditVersion; - } - - public void setAuditVersion(long auditVersion) { - this.auditVersion = auditVersion; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getDockerId() { - return dockerId; - } - - public void setDockerId(String dockerId) { - this.dockerId = dockerId; - } - - public String getThreadId() { - return threadId; - } - - public void setThreadId(String threadId) { - this.threadId = threadId; - } - - public long getSdkTs() { - return sdkTs; - } - - public void setSdkTs(long sdkTs) { - this.sdkTs = sdkTs; - } - - public long getPacketId() { - return packetId; - } - - public void setPacketId(long packetId) { - this.packetId = packetId; - } - - public long getLogTs() { - return logTs; - } - - public void setLogTs(long logTs) { - this.logTs = logTs; - } - - public String getInlongGroupId() { - return inlongGroupId; - } - - public void setInlongGroupId(String inlongGroupId) { - this.inlongGroupId = inlongGroupId; - } - - public String getInlongStreamId() { - return inlongStreamId; - } - - public void setInlongStreamId(String inlongStreamId) { - this.inlongStreamId = inlongStreamId; - } - - public String getAuditId() { - return auditId; - } - - public void setAuditId(String auditId) { - this.auditId = auditId; - } - - public String getAuditTag() { - return auditTag; - } - - public void setAuditTag(String auditTag) { - this.auditTag = auditTag; - } - public long getCount() { - return count; - } - - public void setCount(long count) { - this.count = count; - } - - public long getSize() { - return size; - } - - public void setSize(long size) { - this.size = size; - } - - public long getDelay() { - return delay; - } - - public void setDelay(long delay) { - this.delay = delay; - } } diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java new file mode 100644 index 00000000000..76f6ee9510c --- /dev/null +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/DataUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.utils; + +import org.apache.commons.lang3.StringUtils; + +public class DataUtils { + + private static final int MAX_AUDIT_ITEM_LENGTH = 256; + + /** + * Checks if the timestamp is within the specified deviation range. + * + * @param dataTime The timestamp to check. + * @param deviation The allowed time deviation. + * @return true if the timestamp is within the deviation range, false otherwise. + */ + public static boolean isDataTimeValid(long dataTime, long deviation) { + long currentTime = System.currentTimeMillis(); + long timeDiff = Math.abs(currentTime - dataTime); + return timeDiff <= deviation; + } + + /** + * Checks if the audit item is valid. + * + * @param auditItem The audit item to check. + * @return true if the audit item is blank or its length is less than the maximum length, false otherwise. + */ + public static boolean isAuditItemValid(String auditItem) { + return StringUtils.isBlank(auditItem) || auditItem.length() < MAX_AUDIT_ITEM_LENGTH; + } +} \ No newline at end of file diff --git a/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java b/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java new file mode 100644 index 00000000000..6b31bac1a3f --- /dev/null +++ b/inlong-audit/audit-common/src/test/java/org/apache/inlong/audit/utils/DataUtilsTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.utils; + +import org.junit.Test; + +import java.util.Random; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DataUtilsTest { + + @Test + public void testIsDataTimeValid() { + long deviation = 604800000; + long dataTime = System.currentTimeMillis(); + boolean valid = DataUtils.isDataTimeValid(dataTime, deviation); + assertTrue(valid); + + dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 2; + valid = DataUtils.isDataTimeValid(dataTime, deviation); + assertTrue(valid); + + dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 2; + valid = DataUtils.isDataTimeValid(dataTime, deviation); + assertTrue(valid); + + dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 8; + valid = DataUtils.isDataTimeValid(dataTime, deviation); + assertFalse(valid); + + dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 8; + valid = DataUtils.isDataTimeValid(dataTime, deviation); + assertFalse(valid); + + dataTime = 1734356619540000L; + valid = DataUtils.isDataTimeValid(dataTime, deviation); + assertFalse(valid); + + dataTime = 1L; + valid = DataUtils.isDataTimeValid(dataTime, deviation); + assertFalse(valid); + } + + @Test + public void testIsAuditItemValid() { + String auditItem = null; + boolean valid = DataUtils.isAuditItemValid(auditItem); + assertTrue(valid); + + auditItem = ""; + valid = DataUtils.isAuditItemValid(auditItem); + assertTrue(valid); + + auditItem = "1@dff"; + valid = DataUtils.isAuditItemValid(auditItem); + assertTrue(valid); + + auditItem = "fb320c7e51"; + valid = DataUtils.isAuditItemValid(auditItem); + assertTrue(valid); + + auditItem = "127.0.0.1"; + valid = DataUtils.isAuditItemValid(auditItem); + assertTrue(valid); + + Random random = new Random(); + StringBuilder stringBuilder128 = new StringBuilder(128); + for (int i = 0; i < 128; i++) { + char c = (char) (random.nextInt(26) + 'a'); + stringBuilder128.append(c); + } + valid = DataUtils.isAuditItemValid(stringBuilder128.toString()); + assertTrue(valid); + + StringBuilder stringBuilder256 = new StringBuilder(256); + for (int i = 0; i < 256; i++) { + char c = (char) (random.nextInt(26) + 'a'); + stringBuilder256.append(c); + } + valid = DataUtils.isAuditItemValid(stringBuilder256.toString()); + assertFalse(valid); + + } +} \ No newline at end of file diff --git a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java index 7595586dd4c..d792610002d 100644 --- a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java +++ b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java @@ -161,12 +161,9 @@ private AuditReply handleRequest(AuditRequest auditRequest) throws Exception { LOGGER.debug("Receive message count: {}", auditRequest.getMsgBodyCount()); for (AuditMessageBody auditMessageBody : bodyList) { long msgDays = messageDays(auditMessageBody.getLogTs()); - if (msgDays >= this.msgValidThresholdDays) { - LOGGER.debug("Discard the data as it is from {} days ago, only the data with a log timestamp" - + " less than {} days is valid", msgDays, this.msgValidThresholdDays); - + if (Math.abs(msgDays) >= this.msgValidThresholdDays) { + LOGGER.debug("Discard the invalid audit data: {}", auditMessageBody); MetricsManager.getInstance().addReceiveCountExpired(1); - continue; } AuditData auditData = new AuditData(); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java index 0b6c7c36bb7..7b059817d4f 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/config/JdbcConfig.java @@ -41,4 +41,6 @@ public class JdbcConfig { private int processIntervalMs; @Value("${audit.store.data.queue.size:1000000}") private int dataQueueSize; + @Value("${audit.store.valid.datatime.range.ms:604800000}") + private long validDataTimeRangeMs; } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java index 02c2258dbc4..baa4bf21bb8 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricDimension.java @@ -23,7 +23,8 @@ public enum MetricDimension { RECEIVE_FAILED("receiveFailed"), SEND_COUNT_SUCCESS("sendCountSuccess"), SEND_COUNT_FAILED("sendCountFailed"), - SEND_DURATION("sendDuration"); + SEND_DURATION("sendDuration"), + INVALID_DATA("invalidData"); private final String key; diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java index 0e5dd9ad18b..848f79b5db7 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricItem.java @@ -30,11 +30,13 @@ public class MetricItem { private AtomicLong sendCountSuccess = new AtomicLong(0); private AtomicLong sendCountFailed = new AtomicLong(0); private AtomicLong sendDuration = new AtomicLong(0); + private AtomicLong invalidData = new AtomicLong(0); public void resetAllMetrics() { receiveCountSuccess.set(0); receiveFailed.set(0); sendCountSuccess.set(0); sendCountFailed.set(0); sendDuration.set(0); + invalidData.set(0); } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java index 68b69609cfa..305c05df351 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/MetricsManager.java @@ -87,6 +87,9 @@ public void addSendFailed(long count, long duration) { metricItem.getSendCountFailed().addAndGet(count); metricItem.getSendDuration().addAndGet(duration); } + public void addInvalidData() { + metricItem.getInvalidData().addAndGet(1); + } public void shutdown() { timer.shutdown(); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java index bb72b7a1780..8f1ad2d2c17 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/metric/prometheus/StorePrometheusMetric.java @@ -64,7 +64,8 @@ public List collect() { createSample(MetricDimension.RECEIVE_FAILED, metricItem.getReceiveFailed().doubleValue()), createSample(MetricDimension.SEND_COUNT_SUCCESS, metricItem.getSendCountSuccess().doubleValue()), createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue()), - createSample(MetricDimension.SEND_DURATION, metricItem.getSendDuration().doubleValue())); + createSample(MetricDimension.SEND_DURATION, metricItem.getSendDuration().doubleValue()), + createSample(MetricDimension.INVALID_DATA, metricItem.getInvalidData().doubleValue())); MetricFamilySamples metricFamilySamples = new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE, HELP_DESCRIPTION, samples); diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java index 68ba584d739..fd1314c258c 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/JdbcService.java @@ -21,6 +21,8 @@ import org.apache.inlong.audit.store.config.JdbcConfig; import org.apache.inlong.audit.store.entities.JdbcDataPo; import org.apache.inlong.audit.store.metric.MetricsManager; +import org.apache.inlong.audit.store.utils.PulsarUtils; +import org.apache.inlong.audit.utils.DataUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; @@ -185,6 +187,12 @@ public void insert(AuditData msgBody) { @Override public void insert(AuditData msgBody, Consumer consumer, MessageId messageId) { + if (!isAuditDataValid(msgBody)) { + MetricsManager.getInstance().addInvalidData(); + PulsarUtils.acknowledge(consumer, messageId); + LOG.error("Invalid audit data: {} ", msgBody); + return; + } JdbcDataPo data = new JdbcDataPo(); data.setConsumer(consumer); data.setMessageId(messageId); @@ -230,4 +238,29 @@ private void acknowledge(List dataList) { } } } + + private boolean isAuditDataValid(AuditData auditData) { + // Check if any of the timestamp fields are within the valid range + if (!isDataTimeValid(auditData)) { + return false; + } + // Check if any of the audit items are valid + return isAuditItemValid(auditData); + } + + private boolean isDataTimeValid(AuditData auditData) { + long validDataTimeRangeMs = jdbcConfig.getValidDataTimeRangeMs(); + return DataUtils.isDataTimeValid(auditData.getLogTs(), validDataTimeRangeMs) && + DataUtils.isDataTimeValid(auditData.getSdkTs(), validDataTimeRangeMs); + } + + private boolean isAuditItemValid(AuditData auditData) { + return DataUtils.isAuditItemValid(auditData.getInlongGroupId()) && + DataUtils.isAuditItemValid(auditData.getInlongStreamId()) && + DataUtils.isAuditItemValid(auditData.getAuditId()) && + DataUtils.isAuditItemValid(auditData.getAuditTag()) && + DataUtils.isAuditItemValid(auditData.getIp()) && + DataUtils.isAuditItemValid(auditData.getDockerId()) && + DataUtils.isAuditItemValid(auditData.getThreadId()); + } } diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java index c1a5fe92f25..e90ebfc25f5 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/service/consume/PulsarConsume.java @@ -21,6 +21,7 @@ import org.apache.inlong.audit.store.config.StoreConfig; import org.apache.inlong.audit.store.metric.MetricsManager; import org.apache.inlong.audit.store.service.InsertData; +import org.apache.inlong.audit.store.utils.PulsarUtils; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -128,29 +129,14 @@ protected Consumer createConsumer(PulsarClient pulsarClient, String topi .messageListener(new MessageListener() { public void received(Consumer consumer, Message msg) { + String body = null; try { - String body = new String(msg.getData(), StandardCharsets.UTF_8); + body = new String(msg.getData(), StandardCharsets.UTF_8); handleMessage(body, consumer, msg.getMessageId()); - } catch (Exception e) { + } catch (Exception exception) { MetricsManager.getInstance().addReceiveFailed(1); - - LOG.error("Consumer has exception topic {}, subName {}, ex {}", - topic, - mqConfig.getPulsarConsumerSubName(), - e); - if (mqConfig.isPulsarConsumerEnableRetry()) { - try { - consumer.reconsumeLater(msg, 10, TimeUnit.SECONDS); - } catch (PulsarClientException pulsarClientException) { - LOG.error("Consumer reconsumeLater has exception " - + "topic {}, subName {}, ex {}", - topic, - mqConfig.getPulsarConsumerSubName(), - pulsarClientException); - } - } else { - consumer.negativeAcknowledge(msg); - } + PulsarUtils.acknowledge(consumer, msg.getMessageId()); + LOG.error("Invalid audit data: {}", body, exception); } } }) diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java new file mode 100644 index 00000000000..882ee425a05 --- /dev/null +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/store/utils/PulsarUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.store.utils; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PulsarUtils { + + private static final Logger LOG = LoggerFactory.getLogger(PulsarUtils.class); + + public static void acknowledge(Consumer consumer, MessageId messageId) { + if (consumer == null) { + return; + } + try { + consumer.acknowledge(messageId); + } catch (Exception exception) { + LOG.error("Acknowledge topic:{}, consumer name:{}, message id: {} has exception ", + consumer.getTopic(), consumer.getConsumerName(), messageId, exception); + } + } +}