Skip to content

Commit

Permalink
Add a 'lastUsed' option for resumeConsumption.consumeFrom (apache#12200)
Browse files Browse the repository at this point in the history
* Add a 'lastUsed' option for resumeConsumption.consumeFrom
* Renamed 'lastUsed' as 'lastConsumed'
  • Loading branch information
gortiz authored Jan 4, 2024
1 parent d1cc17c commit 8f5fa80
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,26 @@ public Response pauseConsumption(
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Resume consumption of a realtime table", notes =
"Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets "
+ "consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the "
+ "offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are "
+ "picked to minimize the data loss.")
+ "consumption should resume. Recommended value is 'lastConsumed', which indicates consumption should "
+ "continue based on the offsets in segment ZK metadata, and in case the offsets are already gone, the first "
+ "available offsets are picked to minimize the data loss.")
public Response resumeConsumption(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "smallest | largest") @QueryParam("consumeFrom") String consumeFrom) {
@ApiParam(
value = "lastConsumed (safer) | smallest (repeat rows) | largest (miss rows)",
allowableValues = "lastConsumed, smallest, largest",
defaultValue = "lastConsumed"
)
@QueryParam("consumeFrom") String consumeFrom) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validateTable(tableNameWithType);
if ("lastConsumed".equalsIgnoreCase(consumeFrom)) {
consumeFrom = null;
}
if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && !consumeFrom.equalsIgnoreCase("largest")) {
throw new ControllerApplicationException(LOGGER,
String.format("consumeFrom param '%s' is not valid.", consumeFrom), Response.Status.BAD_REQUEST);
String.format("consumeFrom param '%s' is not valid. Valid values are 'lastConsumed', 'smallest' and "
+ "'largest'.", consumeFrom), Response.Status.BAD_REQUEST);
}
try {
return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, consumeFrom)).build();
Expand Down

0 comments on commit 8f5fa80

Please sign in to comment.