Skip to content

Commit

Permalink
Makes requestId manipulations idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
egalpin committed Oct 17, 2024
1 parent 4d621fa commit 41aa2e1
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pinot.broker.requesthandler;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.BrokerRequestIdConstants;
import org.apache.pinot.common.utils.request.BrokerRequestIdUtils;


/**
Expand All @@ -46,8 +46,8 @@ public BrokerRequestIdGenerator(String brokerId) {

public long get() {
long normalized =
((_incrementingId.getAndIncrement() & Long.MAX_VALUE) % (OFFSET / BrokerRequestIdConstants.TABLE_TYPE_OFFSET))
* BrokerRequestIdConstants.TABLE_TYPE_OFFSET;
return _mask + normalized;
((_incrementingId.getAndIncrement() & Long.MAX_VALUE) % (OFFSET / BrokerRequestIdUtils.TABLE_TYPE_OFFSET))
* BrokerRequestIdUtils.TABLE_TYPE_OFFSET;
return BrokerRequestIdUtils.getCanonicalRequestId(_mask + normalized);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pinot.broker.requesthandler;

import org.apache.pinot.common.BrokerRequestIdConstants;
import org.apache.pinot.common.utils.request.BrokerRequestIdUtils;
import org.testng.annotations.Test;

import static org.testng.Assert.*;
Expand All @@ -30,11 +30,11 @@ public class BrokerRequestIdGeneratorTest {
public void testGet() {
BrokerRequestIdGenerator gen = new BrokerRequestIdGenerator("foo");
long id = gen.get();
assertEquals(id % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 0);
assertEquals(id / BrokerRequestIdConstants.TABLE_TYPE_OFFSET % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 0);
assertEquals(id % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0);
assertEquals(id / BrokerRequestIdUtils.TABLE_TYPE_OFFSET % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0);

id = gen.get();
assertEquals(id % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 0);
assertEquals(id / BrokerRequestIdConstants.TABLE_TYPE_OFFSET % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 1);
assertEquals(id % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0);
assertEquals(id / BrokerRequestIdUtils.TABLE_TYPE_OFFSET % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.BrokerRequestIdConstants;
import org.apache.pinot.common.utils.request.BrokerRequestIdUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.config.table.TableType;

Expand Down Expand Up @@ -84,7 +84,7 @@ public static String decodeString(ByteBuffer buffer)
public static TableType inferTableType(DataTable dataTable) {
TableType tableType;
long requestId = Long.parseLong(dataTable.getMetadata().get(DataTable.MetadataKey.REQUEST_ID.getName()));
if ((requestId & 0x1) == BrokerRequestIdConstants.REALTIME_TABLE_DIGIT) {
if (requestId == BrokerRequestIdUtils.getRealtimeRequestId(requestId)) {
tableType = TableType.REALTIME;
} else {
tableType = TableType.OFFLINE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common;
package org.apache.pinot.common.utils.request;

public class BrokerRequestIdConstants {
public class BrokerRequestIdUtils {
public static final int TABLE_TYPE_OFFSET = 10;
public static final int OFFLINE_TABLE_DIGIT = 0;
public static final int REALTIME_TABLE_DIGIT = 1;
private BrokerRequestIdConstants() {

private BrokerRequestIdUtils() {
}

public static long getOfflineRequestId(long requestId) {
return getCanonicalRequestId(requestId);
}

public static long getRealtimeRequestId(long requestId) {
return getCanonicalRequestId(requestId) | 0x1;
}

public static long getCanonicalRequestId(long requestId) {
return requestId / TABLE_TYPE_OFFSET * TABLE_TYPE_OFFSET;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.pinot.common.utils.request;

import org.testng.Assert;
import org.testng.annotations.Test;


public class BrokerRequestIdUtilsTest {
@Test
public void testGetOfflineRequestId() {
Assert.assertEquals(BrokerRequestIdUtils.getOfflineRequestId(123), 120);
}

@Test
public void testGetRealtimeRequestId() {
Assert.assertEquals(BrokerRequestIdUtils.getRealtimeRequestId(123), 121);
}

@Test
public void testGetCanonicalRequestId() {
Assert.assertEquals(BrokerRequestIdUtils.getCanonicalRequestId(123), 120);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AsyncQueryResponse implements QueryResponse {

// TODO(egalpin): Remove the concept of truncated request ID in next major release after all servers are expected
// to send back tableName in DataTable metadata
private final long _truncatedRequestId;
private final long _canonicalRequestId;
private final AtomicReference<Status> _status = new AtomicReference<>(Status.IN_PROGRESS);
private final AtomicInteger _numServersResponded = new AtomicInteger();
private final ConcurrentHashMap<ServerRoutingInstance, ConcurrentHashMap<String, ServerResponse>> _responses;
Expand All @@ -59,11 +59,11 @@ public class AsyncQueryResponse implements QueryResponse {
private volatile ServerRoutingInstance _failedServer;
private volatile Exception _exception;

public AsyncQueryResponse(QueryRouter queryRouter, long truncatedRequestId,
public AsyncQueryResponse(QueryRouter queryRouter, long canonicalRequestId,
Map<ServerRoutingInstance, List<InstanceRequest>> requestMap, long startTimeMs, long timeoutMs,
ServerRoutingStatsManager serverRoutingStatsManager) {
_queryRouter = queryRouter;
_truncatedRequestId = truncatedRequestId;
_canonicalRequestId = canonicalRequestId;
_responses = new ConcurrentHashMap<>();
_serverRoutingStatsManager = serverRoutingStatsManager;
int numQueriesIssued = 0;
Expand All @@ -72,7 +72,7 @@ public AsyncQueryResponse(QueryRouter queryRouter, long truncatedRequestId,
// Record stats related to query submission just before sending the request. Otherwise, if the response is
// received immediately, there's a possibility of updating query response stats before updating query
// submission stats.
_serverRoutingStatsManager.recordStatsForQuerySubmission(_truncatedRequestId,
_serverRoutingStatsManager.recordStatsForQuerySubmission(_canonicalRequestId,
serverRequests.getKey().getInstanceId());

_responses.computeIfAbsent(serverRequests.getKey(), k -> new ConcurrentHashMap<>())
Expand Down Expand Up @@ -136,11 +136,11 @@ public Map<ServerRoutingInstance, List<ServerResponse>> getFinalResponses()
} else {
latency = response.getResponseDelayMs();
}
_serverRoutingStatsManager.recordStatsUponResponseArrival(_truncatedRequestId,
_serverRoutingStatsManager.recordStatsUponResponseArrival(_canonicalRequestId,
serverResponses.getKey().getInstanceId(), latency);
}
}
_queryRouter.markQueryDone(_truncatedRequestId);
_queryRouter.markQueryDone(_canonicalRequestId);
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ public Exception getException() {

@Override
public long getRequestId() {
return _truncatedRequestId;
return _canonicalRequestId;
}

@Override
Expand Down Expand Up @@ -272,8 +272,6 @@ void markQueryFailed(ServerRoutingInstance serverRoutingInstance, Exception exce
* server hasn't responded yet.
*/
void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception exception) {
// TODO(egalpin): how to mark servers down under the assumption that multiple queries
// to the same server are valid?
Map<String, ServerResponse> serverResponses = _responses.get(serverRoutingInstance);
for (ServerResponse serverResponse : serverResponses.values()) {
if (serverResponse != null && serverResponse.getDataTable() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.BrokerRequestIdConstants;
import org.apache.pinot.common.utils.request.BrokerRequestIdUtils;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.DataTable;
Expand Down Expand Up @@ -105,12 +105,12 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
for (ServerQueryRoutingContext serverQueryRoutingContext : entry.getValue()) {
ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(preferTls);

// TODO(egalpin): inject value into request ID to indicate REALTIME Vs OFFLINE table type?
long tableTypeDigit = serverQueryRoutingContext.getTableType().equals(TableType.OFFLINE)
? BrokerRequestIdConstants.OFFLINE_TABLE_DIGIT : BrokerRequestIdConstants.REALTIME_TABLE_DIGIT;
long requestIdWithType = serverQueryRoutingContext.getTableType().equals(TableType.OFFLINE)
? BrokerRequestIdUtils.getOfflineRequestId(requestId)
: BrokerRequestIdUtils.getRealtimeRequestId(requestId);

InstanceRequest instanceRequest =
getInstanceRequest(requestId + tableTypeDigit, serverQueryRoutingContext.getBrokerRequest(),
getInstanceRequest(requestIdWithType, serverQueryRoutingContext.getBrokerRequest(),
serverQueryRoutingContext.getSegmentsToQuery());
if (!skipUnavailableServers && QueryOptionsUtils.isSkipUnavailableServers(
serverQueryRoutingContext.getBrokerRequest().getPinotQuery().getQueryOptions())) {
Expand All @@ -123,16 +123,13 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
}
}

// Create the asynchronous query response with the request map
long truncatedRequestId = requestId / BrokerRequestIdConstants.TABLE_TYPE_OFFSET;

// Map the request using all but the last digit. This way, servers will return request IDs where the last digit
// (injected above) will indicate table type, but both will be able to be divided by the TABLE_TYPE_OFFSET in
// order to properly map back to this originating request
AsyncQueryResponse asyncQueryResponse =
new AsyncQueryResponse(this, truncatedRequestId, requestMap, System.currentTimeMillis(), timeoutMs,
new AsyncQueryResponse(this, requestId, requestMap, System.currentTimeMillis(), timeoutMs,
_serverRoutingStatsManager);
_asyncQueryResponseMap.put(truncatedRequestId, asyncQueryResponse);
_asyncQueryResponseMap.put(requestId, asyncQueryResponse);
for (Map.Entry<ServerRoutingInstance, List<InstanceRequest>> serverRequests : requestMap.entrySet()) {
for (InstanceRequest request : serverRequests.getValue()) {

Expand All @@ -145,14 +142,14 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) {
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1);
}
markQueryFailed(truncatedRequestId, serverRoutingInstance, asyncQueryResponse, e);
markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e);
break;
} catch (Exception e) {
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1);
if (skipUnavailableServers) {
asyncQueryResponse.skipServerResponse();
} else {
markQueryFailed(truncatedRequestId, serverRoutingInstance, asyncQueryResponse, e);
markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e);
break;
}
}
Expand Down Expand Up @@ -193,13 +190,10 @@ public void shutDown() {
void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize,
int deserializationTimeMs) {
long requestId = Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName()));
// TODO(egalpin): parse out table type based on last digit of request ID?

// TODO(egalpin): How can we handle rolling out brokers expecting query_hash to be present, but while old server
// versions are still running and not yet setting the query hash from their side? If possible, deploying servers
// first would work.
// TODO(egalpin): remove use of canonical request ID in later major release
AsyncQueryResponse asyncQueryResponse =
_asyncQueryResponseMap.get(requestId / BrokerRequestIdConstants.TABLE_TYPE_OFFSET);
_asyncQueryResponseMap.get(BrokerRequestIdUtils.getCanonicalRequestId(requestId));

// Query future might be null if the query is already done (maybe due to failure)
if (asyncQueryResponse != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.BrokerRequestIdConstants;
import org.apache.pinot.common.utils.request.BrokerRequestIdUtils;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.datatable.DataTableUtils;
Expand Down Expand Up @@ -161,15 +161,15 @@ public void testValidResponse()
throws Exception {
long requestId = 1230;
DataTable offlineDataTable = DataTableBuilderFactory.getEmptyDataTable();
offlineDataTable.getMetadata()
.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId + BrokerRequestIdConstants.OFFLINE_TABLE_DIGIT));
offlineDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(BrokerRequestIdUtils.getOfflineRequestId(requestId)));
offlineDataTable.getMetadata()
.put(MetadataKey.TABLE.getName(), OFFLINE_BROKER_REQUEST.getQuerySource().getTableName());
byte[] offlineResponseBytes = offlineDataTable.toBytes();

DataTable realtimeDataTable = DataTableBuilderFactory.getEmptyDataTable();
realtimeDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(),
Long.toString(requestId + BrokerRequestIdConstants.REALTIME_TABLE_DIGIT));
Long.toString(BrokerRequestIdUtils.getRealtimeRequestId(requestId)));
realtimeDataTable.getMetadata()
.put(MetadataKey.TABLE.getName(), REALTIME_BROKER_REQUEST.getQuerySource().getTableName());
byte[] realtimeResponseBytes = realtimeDataTable.toBytes();
Expand Down

0 comments on commit 41aa2e1

Please sign in to comment.