Skip to content

Commit

Permalink
[INLONG-9384][Sort] Fix pulsar audit data loss when restarting (#9386)
Browse files Browse the repository at this point in the history
  • Loading branch information
EMsnap authored Dec 1, 2023
1 parent 912dce3 commit 65f9214
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ public void outputMetrics(long rowCountSize, long rowDataSize, long fetchDelay,
}
}

/**
* flush audit data
* usually call this method in close method or when checkpointing
*/
public void flushAuditData() {
if (auditOperator != null) {
auditOperator.send();
}
}

public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime) {
outputDefaultMetrics(rowCountSize, rowDataSize);
if (auditOperator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,9 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
log.debug("snapshotState() called on closed source");
} else {

flushAudit();

unionOffsetStates.clear();

PulsarFetcher<T> fetcher = this.pulsarFetcher;
Expand Down Expand Up @@ -925,6 +928,13 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
}
}

// flush audit data first to avoid audit data loss
private void flushAudit() {
if (sourceMetricData != null) {
sourceMetricData.flushAuditData();
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {
Expand Down

0 comments on commit 65f9214

Please sign in to comment.