From 8f5fa804cd6e6fc9c029815488589cca370e7aea Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Jaureguizar Date: Thu, 4 Jan 2024 18:05:34 +0100 Subject: [PATCH] Add a 'lastUsed' option for resumeConsumption.consumeFrom (#12200) * Add a 'lastUsed' option for resumeConsumption.consumeFrom * Renamed 'lastUsed' as 'lastConsumed' --- .../resources/PinotRealtimeTableResource.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java index af7f45334fe..ddbf2f398a7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java @@ -109,17 +109,26 @@ public Response pauseConsumption( @Produces(MediaType.APPLICATION_JSON) @ApiOperation(value = "Resume consumption of a realtime table", notes = "Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets " - + "consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the " - + "offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are " - + "picked to minimize the data loss.") + + "consumption should resume. Recommended value is 'lastConsumed', which indicates consumption should " + + "continue based on the offsets in segment ZK metadata, and in case the offsets are already gone, the first " + + "available offsets are picked to minimize the data loss.") public Response resumeConsumption( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, - @ApiParam(value = "smallest | largest") @QueryParam("consumeFrom") String consumeFrom) { + @ApiParam( + value = "lastConsumed (safer) | smallest (repeat rows) | largest (miss rows)", + allowableValues = "lastConsumed, smallest, largest", + defaultValue = "lastConsumed" + ) + @QueryParam("consumeFrom") String consumeFrom) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validateTable(tableNameWithType); + if ("lastConsumed".equalsIgnoreCase(consumeFrom)) { + consumeFrom = null; + } if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && !consumeFrom.equalsIgnoreCase("largest")) { throw new ControllerApplicationException(LOGGER, - String.format("consumeFrom param '%s' is not valid.", consumeFrom), Response.Status.BAD_REQUEST); + String.format("consumeFrom param '%s' is not valid. Valid values are 'lastConsumed', 'smallest' and " + + "'largest'.", consumeFrom), Response.Status.BAD_REQUEST); } try { return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, consumeFrom)).build();