Skip to content

Commit

Permalink
[INLONG-11605][Audit] Added audit data legitimacy verification (#11610)
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Dec 18, 2024
1 parent 45b8394 commit d31c887
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.audit.protocol;

import lombok.Data;

@Data
public class AuditData {

private String ip;
Expand All @@ -33,115 +36,4 @@ public class AuditData {
private long size;
private long delay;
private long auditVersion;

public long getAuditVersion() {
return auditVersion;
}

public void setAuditVersion(long auditVersion) {
this.auditVersion = auditVersion;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public String getDockerId() {
return dockerId;
}

public void setDockerId(String dockerId) {
this.dockerId = dockerId;
}

public String getThreadId() {
return threadId;
}

public void setThreadId(String threadId) {
this.threadId = threadId;
}

public long getSdkTs() {
return sdkTs;
}

public void setSdkTs(long sdkTs) {
this.sdkTs = sdkTs;
}

public long getPacketId() {
return packetId;
}

public void setPacketId(long packetId) {
this.packetId = packetId;
}

public long getLogTs() {
return logTs;
}

public void setLogTs(long logTs) {
this.logTs = logTs;
}

public String getInlongGroupId() {
return inlongGroupId;
}

public void setInlongGroupId(String inlongGroupId) {
this.inlongGroupId = inlongGroupId;
}

public String getInlongStreamId() {
return inlongStreamId;
}

public void setInlongStreamId(String inlongStreamId) {
this.inlongStreamId = inlongStreamId;
}

public String getAuditId() {
return auditId;
}

public void setAuditId(String auditId) {
this.auditId = auditId;
}

public String getAuditTag() {
return auditTag;
}

public void setAuditTag(String auditTag) {
this.auditTag = auditTag;
}
public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}

public long getSize() {
return size;
}

public void setSize(long size) {
this.size = size;
}

public long getDelay() {
return delay;
}

public void setDelay(long delay) {
this.delay = delay;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.utils;

import org.apache.commons.lang3.StringUtils;

public class DataUtils {

private static final int MAX_AUDIT_ITEM_LENGTH = 256;

/**
* Checks if the timestamp is within the specified deviation range.
*
* @param dataTime The timestamp to check.
* @param deviation The allowed time deviation.
* @return true if the timestamp is within the deviation range, false otherwise.
*/
public static boolean isDataTimeValid(long dataTime, long deviation) {
long currentTime = System.currentTimeMillis();
long timeDiff = Math.abs(currentTime - dataTime);
return timeDiff <= deviation;
}

/**
* Checks if the audit item is valid.
*
* @param auditItem The audit item to check.
* @return true if the audit item is blank or its length is less than the maximum length, false otherwise.
*/
public static boolean isAuditItemValid(String auditItem) {
return StringUtils.isBlank(auditItem) || auditItem.length() < MAX_AUDIT_ITEM_LENGTH;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.utils;

import org.junit.Test;

import java.util.Random;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class DataUtilsTest {

@Test
public void testIsDataTimeValid() {
long deviation = 604800000;
long dataTime = System.currentTimeMillis();
boolean valid = DataUtils.isDataTimeValid(dataTime, deviation);
assertTrue(valid);

dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 2;
valid = DataUtils.isDataTimeValid(dataTime, deviation);
assertTrue(valid);

dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 2;
valid = DataUtils.isDataTimeValid(dataTime, deviation);
assertTrue(valid);

dataTime = System.currentTimeMillis() - 1000 * 60 * 60 * 24 * 8;
valid = DataUtils.isDataTimeValid(dataTime, deviation);
assertFalse(valid);

dataTime = System.currentTimeMillis() + 1000 * 60 * 60 * 24 * 8;
valid = DataUtils.isDataTimeValid(dataTime, deviation);
assertFalse(valid);

dataTime = 1734356619540000L;
valid = DataUtils.isDataTimeValid(dataTime, deviation);
assertFalse(valid);

dataTime = 1L;
valid = DataUtils.isDataTimeValid(dataTime, deviation);
assertFalse(valid);
}

@Test
public void testIsAuditItemValid() {
String auditItem = null;
boolean valid = DataUtils.isAuditItemValid(auditItem);
assertTrue(valid);

auditItem = "";
valid = DataUtils.isAuditItemValid(auditItem);
assertTrue(valid);

auditItem = "1@dff";
valid = DataUtils.isAuditItemValid(auditItem);
assertTrue(valid);

auditItem = "fb320c7e51";
valid = DataUtils.isAuditItemValid(auditItem);
assertTrue(valid);

auditItem = "127.0.0.1";
valid = DataUtils.isAuditItemValid(auditItem);
assertTrue(valid);

Random random = new Random();
StringBuilder stringBuilder128 = new StringBuilder(128);
for (int i = 0; i < 128; i++) {
char c = (char) (random.nextInt(26) + 'a');
stringBuilder128.append(c);
}
valid = DataUtils.isAuditItemValid(stringBuilder128.toString());
assertTrue(valid);

StringBuilder stringBuilder256 = new StringBuilder(256);
for (int i = 0; i < 256; i++) {
char c = (char) (random.nextInt(26) + 'a');
stringBuilder256.append(c);
}
valid = DataUtils.isAuditItemValid(stringBuilder256.toString());
assertFalse(valid);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,9 @@ private AuditReply handleRequest(AuditRequest auditRequest) throws Exception {
LOGGER.debug("Receive message count: {}", auditRequest.getMsgBodyCount());
for (AuditMessageBody auditMessageBody : bodyList) {
long msgDays = messageDays(auditMessageBody.getLogTs());
if (msgDays >= this.msgValidThresholdDays) {
LOGGER.debug("Discard the data as it is from {} days ago, only the data with a log timestamp"
+ " less than {} days is valid", msgDays, this.msgValidThresholdDays);

if (Math.abs(msgDays) >= this.msgValidThresholdDays) {
LOGGER.debug("Discard the invalid audit data: {}", auditMessageBody);
MetricsManager.getInstance().addReceiveCountExpired(1);

continue;
}
AuditData auditData = new AuditData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ public class JdbcConfig {
private int processIntervalMs;
@Value("${audit.store.data.queue.size:1000000}")
private int dataQueueSize;
@Value("${audit.store.valid.datatime.range.ms:604800000}")
private long validDataTimeRangeMs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public enum MetricDimension {
RECEIVE_FAILED("receiveFailed"),
SEND_COUNT_SUCCESS("sendCountSuccess"),
SEND_COUNT_FAILED("sendCountFailed"),
SEND_DURATION("sendDuration");
SEND_DURATION("sendDuration"),
INVALID_DATA("invalidData");

private final String key;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ public class MetricItem {
private AtomicLong sendCountSuccess = new AtomicLong(0);
private AtomicLong sendCountFailed = new AtomicLong(0);
private AtomicLong sendDuration = new AtomicLong(0);
private AtomicLong invalidData = new AtomicLong(0);
public void resetAllMetrics() {
receiveCountSuccess.set(0);
receiveFailed.set(0);
sendCountSuccess.set(0);
sendCountFailed.set(0);
sendDuration.set(0);
invalidData.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public void addSendFailed(long count, long duration) {
metricItem.getSendCountFailed().addAndGet(count);
metricItem.getSendDuration().addAndGet(duration);
}
public void addInvalidData() {
metricItem.getInvalidData().addAndGet(1);
}

public void shutdown() {
timer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public List<MetricFamilySamples> collect() {
createSample(MetricDimension.RECEIVE_FAILED, metricItem.getReceiveFailed().doubleValue()),
createSample(MetricDimension.SEND_COUNT_SUCCESS, metricItem.getSendCountSuccess().doubleValue()),
createSample(MetricDimension.SEND_COUNT_FAILED, metricItem.getSendCountFailed().doubleValue()),
createSample(MetricDimension.SEND_DURATION, metricItem.getSendDuration().doubleValue()));
createSample(MetricDimension.SEND_DURATION, metricItem.getSendDuration().doubleValue()),
createSample(MetricDimension.INVALID_DATA, metricItem.getInvalidData().doubleValue()));

MetricFamilySamples metricFamilySamples =
new MetricFamilySamples(AUDIT_STORE_SERVER_NAME, Type.GAUGE, HELP_DESCRIPTION, samples);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.inlong.audit.store.config.JdbcConfig;
import org.apache.inlong.audit.store.entities.JdbcDataPo;
import org.apache.inlong.audit.store.metric.MetricsManager;
import org.apache.inlong.audit.store.utils.PulsarUtils;
import org.apache.inlong.audit.utils.DataUtils;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -185,6 +187,12 @@ public void insert(AuditData msgBody) {

@Override
public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messageId) {
if (!isAuditDataValid(msgBody)) {
MetricsManager.getInstance().addInvalidData();
PulsarUtils.acknowledge(consumer, messageId);
LOG.error("Invalid audit data: {} ", msgBody);
return;
}
JdbcDataPo data = new JdbcDataPo();
data.setConsumer(consumer);
data.setMessageId(messageId);
Expand Down Expand Up @@ -230,4 +238,29 @@ private void acknowledge(List<JdbcDataPo> dataList) {
}
}
}

private boolean isAuditDataValid(AuditData auditData) {
// Check if any of the timestamp fields are within the valid range
if (!isDataTimeValid(auditData)) {
return false;
}
// Check if any of the audit items are valid
return isAuditItemValid(auditData);
}

private boolean isDataTimeValid(AuditData auditData) {
long validDataTimeRangeMs = jdbcConfig.getValidDataTimeRangeMs();
return DataUtils.isDataTimeValid(auditData.getLogTs(), validDataTimeRangeMs) &&
DataUtils.isDataTimeValid(auditData.getSdkTs(), validDataTimeRangeMs);
}

private boolean isAuditItemValid(AuditData auditData) {
return DataUtils.isAuditItemValid(auditData.getInlongGroupId()) &&
DataUtils.isAuditItemValid(auditData.getInlongStreamId()) &&
DataUtils.isAuditItemValid(auditData.getAuditId()) &&
DataUtils.isAuditItemValid(auditData.getAuditTag()) &&
DataUtils.isAuditItemValid(auditData.getIp()) &&
DataUtils.isAuditItemValid(auditData.getDockerId()) &&
DataUtils.isAuditItemValid(auditData.getThreadId());
}
}
Loading

0 comments on commit d31c887

Please sign in to comment.