From 65f92147e2dce57c28b9e6ea8c42c4ed903939b3 Mon Sep 17 00:00:00 2001 From: Sting Date: Fri, 1 Dec 2023 18:54:25 +0800 Subject: [PATCH] [INLONG-9384][Sort] Fix pulsar audit data loss when restarting (#9386) --- .../inlong/sort/base/metric/SourceMetricData.java | 10 ++++++++++ .../inlong/sort/pulsar/internal/FlinkPulsarSource.java | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 0a0cad5d6ee..1e1a6247621 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -298,6 +298,16 @@ public void outputMetrics(long rowCountSize, long rowDataSize, long fetchDelay, } } + /** + * flush audit data + * usually call this method in close method or when checkpointing + */ + public void flushAuditData() { + if (auditOperator != null) { + auditOperator.send(); + } + } + public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime) { outputDefaultMetrics(rowCountSize, rowDataSize); if (auditOperator != null) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java index b193a5af1a2..a963edf02a9 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java @@ -886,6 +886,9 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { log.debug("snapshotState() called on closed source"); } else { + + flushAudit(); + unionOffsetStates.clear(); PulsarFetcher fetcher = this.pulsarFetcher; @@ -925,6 +928,13 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { } } + // flush audit data first to avoid audit data loss + private void flushAudit() { + if (sourceMetricData != null) { + sourceMetricData.flushAuditData(); + } + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { if (!running) {