diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java index eedd458e3c2..1faffcaa905 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGenerator.java @@ -19,7 +19,7 @@ package org.apache.pinot.broker.requesthandler; import java.util.concurrent.atomic.AtomicLong; -import org.apache.pinot.common.BrokerRequestIdConstants; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; /** @@ -46,8 +46,8 @@ public BrokerRequestIdGenerator(String brokerId) { public long get() { long normalized = - ((_incrementingId.getAndIncrement() & Long.MAX_VALUE) % (OFFSET / BrokerRequestIdConstants.TABLE_TYPE_OFFSET)) - * BrokerRequestIdConstants.TABLE_TYPE_OFFSET; - return _mask + normalized; + ((_incrementingId.getAndIncrement() & Long.MAX_VALUE) % (OFFSET / BrokerRequestIdUtils.TABLE_TYPE_OFFSET)) + * BrokerRequestIdUtils.TABLE_TYPE_OFFSET; + return BrokerRequestIdUtils.getCanonicalRequestId(_mask + normalized); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGeneratorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGeneratorTest.java index 3340ed4c0cf..c4bea7cc2aa 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGeneratorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestIdGeneratorTest.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.broker.requesthandler; -import org.apache.pinot.common.BrokerRequestIdConstants; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; import org.testng.annotations.Test; import static org.testng.Assert.*; @@ -30,11 +30,11 @@ public class BrokerRequestIdGeneratorTest { public void testGet() { BrokerRequestIdGenerator gen = new BrokerRequestIdGenerator("foo"); long id = gen.get(); - assertEquals(id % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 0); - assertEquals(id / BrokerRequestIdConstants.TABLE_TYPE_OFFSET % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 0); + assertEquals(id % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0); + assertEquals(id / BrokerRequestIdUtils.TABLE_TYPE_OFFSET % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0); id = gen.get(); - assertEquals(id % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 0); - assertEquals(id / BrokerRequestIdConstants.TABLE_TYPE_OFFSET % BrokerRequestIdConstants.TABLE_TYPE_OFFSET, 1); + assertEquals(id % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 0); + assertEquals(id / BrokerRequestIdUtils.TABLE_TYPE_OFFSET % BrokerRequestIdUtils.TABLE_TYPE_OFFSET, 1); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java index 74ac37f8d0e..a91958fd18f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTableUtils.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.commons.lang3.StringUtils; -import org.apache.pinot.common.BrokerRequestIdConstants; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.spi.config.table.TableType; @@ -84,7 +84,7 @@ public static String decodeString(ByteBuffer buffer) public static TableType inferTableType(DataTable dataTable) { TableType tableType; long requestId = Long.parseLong(dataTable.getMetadata().get(DataTable.MetadataKey.REQUEST_ID.getName())); - if ((requestId & 0x1) == BrokerRequestIdConstants.REALTIME_TABLE_DIGIT) { + if (requestId == BrokerRequestIdUtils.getRealtimeRequestId(requestId)) { tableType = TableType.REALTIME; } else { tableType = TableType.OFFLINE; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/BrokerRequestIdConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtils.java similarity index 64% rename from pinot-common/src/main/java/org/apache/pinot/common/BrokerRequestIdConstants.java rename to pinot-common/src/main/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtils.java index c14940f69e4..a01a2a8798d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/BrokerRequestIdConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtils.java @@ -16,12 +16,23 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.common; +package org.apache.pinot.common.utils.request; -public class BrokerRequestIdConstants { +public class BrokerRequestIdUtils { public static final int TABLE_TYPE_OFFSET = 10; - public static final int OFFLINE_TABLE_DIGIT = 0; - public static final int REALTIME_TABLE_DIGIT = 1; - private BrokerRequestIdConstants() { + + private BrokerRequestIdUtils() { + } + + public static long getOfflineRequestId(long requestId) { + return getCanonicalRequestId(requestId); + } + + public static long getRealtimeRequestId(long requestId) { + return getCanonicalRequestId(requestId) | 0x1; + } + + public static long getCanonicalRequestId(long requestId) { + return requestId / TABLE_TYPE_OFFSET * TABLE_TYPE_OFFSET; } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtilsTest.java new file mode 100644 index 00000000000..c80f167443c --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/request/BrokerRequestIdUtilsTest.java @@ -0,0 +1,22 @@ +package org.apache.pinot.common.utils.request; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class BrokerRequestIdUtilsTest { + @Test + public void testGetOfflineRequestId() { + Assert.assertEquals(BrokerRequestIdUtils.getOfflineRequestId(123), 120); + } + + @Test + public void testGetRealtimeRequestId() { + Assert.assertEquals(BrokerRequestIdUtils.getRealtimeRequestId(123), 121); + } + + @Test + public void testGetCanonicalRequestId() { + Assert.assertEquals(BrokerRequestIdUtils.getCanonicalRequestId(123), 120); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index e01fa84f1fb..6a78162347c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -47,7 +47,7 @@ public class AsyncQueryResponse implements QueryResponse { // TODO(egalpin): Remove the concept of truncated request ID in next major release after all servers are expected // to send back tableName in DataTable metadata - private final long _truncatedRequestId; + private final long _canonicalRequestId; private final AtomicReference _status = new AtomicReference<>(Status.IN_PROGRESS); private final AtomicInteger _numServersResponded = new AtomicInteger(); private final ConcurrentHashMap> _responses; @@ -59,11 +59,11 @@ public class AsyncQueryResponse implements QueryResponse { private volatile ServerRoutingInstance _failedServer; private volatile Exception _exception; - public AsyncQueryResponse(QueryRouter queryRouter, long truncatedRequestId, + public AsyncQueryResponse(QueryRouter queryRouter, long canonicalRequestId, Map> requestMap, long startTimeMs, long timeoutMs, ServerRoutingStatsManager serverRoutingStatsManager) { _queryRouter = queryRouter; - _truncatedRequestId = truncatedRequestId; + _canonicalRequestId = canonicalRequestId; _responses = new ConcurrentHashMap<>(); _serverRoutingStatsManager = serverRoutingStatsManager; int numQueriesIssued = 0; @@ -72,7 +72,7 @@ public AsyncQueryResponse(QueryRouter queryRouter, long truncatedRequestId, // Record stats related to query submission just before sending the request. Otherwise, if the response is // received immediately, there's a possibility of updating query response stats before updating query // submission stats. - _serverRoutingStatsManager.recordStatsForQuerySubmission(_truncatedRequestId, + _serverRoutingStatsManager.recordStatsForQuerySubmission(_canonicalRequestId, serverRequests.getKey().getInstanceId()); _responses.computeIfAbsent(serverRequests.getKey(), k -> new ConcurrentHashMap<>()) @@ -136,11 +136,11 @@ public Map> getFinalResponses() } else { latency = response.getResponseDelayMs(); } - _serverRoutingStatsManager.recordStatsUponResponseArrival(_truncatedRequestId, + _serverRoutingStatsManager.recordStatsUponResponseArrival(_canonicalRequestId, serverResponses.getKey().getInstanceId(), latency); } } - _queryRouter.markQueryDone(_truncatedRequestId); + _queryRouter.markQueryDone(_canonicalRequestId); } } @@ -206,7 +206,7 @@ public Exception getException() { @Override public long getRequestId() { - return _truncatedRequestId; + return _canonicalRequestId; } @Override @@ -272,8 +272,6 @@ void markQueryFailed(ServerRoutingInstance serverRoutingInstance, Exception exce * server hasn't responded yet. */ void markServerDown(ServerRoutingInstance serverRoutingInstance, Exception exception) { - // TODO(egalpin): how to mark servers down under the assumption that multiple queries - // to the same server are valid? Map serverResponses = _responses.get(serverRoutingInstance); for (ServerResponse serverResponse : serverResponses.values()) { if (serverResponse != null && serverResponse.getDataTable() == null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 261855bb5d0..d26440169b0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -28,7 +28,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pinot.common.BrokerRequestIdConstants; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; import org.apache.pinot.common.config.NettyConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.datatable.DataTable; @@ -105,12 +105,12 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, for (ServerQueryRoutingContext serverQueryRoutingContext : entry.getValue()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(preferTls); - // TODO(egalpin): inject value into request ID to indicate REALTIME Vs OFFLINE table type? - long tableTypeDigit = serverQueryRoutingContext.getTableType().equals(TableType.OFFLINE) - ? BrokerRequestIdConstants.OFFLINE_TABLE_DIGIT : BrokerRequestIdConstants.REALTIME_TABLE_DIGIT; + long requestIdWithType = serverQueryRoutingContext.getTableType().equals(TableType.OFFLINE) + ? BrokerRequestIdUtils.getOfflineRequestId(requestId) + : BrokerRequestIdUtils.getRealtimeRequestId(requestId); InstanceRequest instanceRequest = - getInstanceRequest(requestId + tableTypeDigit, serverQueryRoutingContext.getBrokerRequest(), + getInstanceRequest(requestIdWithType, serverQueryRoutingContext.getBrokerRequest(), serverQueryRoutingContext.getSegmentsToQuery()); if (!skipUnavailableServers && QueryOptionsUtils.isSkipUnavailableServers( serverQueryRoutingContext.getBrokerRequest().getPinotQuery().getQueryOptions())) { @@ -123,16 +123,13 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, } } - // Create the asynchronous query response with the request map - long truncatedRequestId = requestId / BrokerRequestIdConstants.TABLE_TYPE_OFFSET; - // Map the request using all but the last digit. This way, servers will return request IDs where the last digit // (injected above) will indicate table type, but both will be able to be divided by the TABLE_TYPE_OFFSET in // order to properly map back to this originating request AsyncQueryResponse asyncQueryResponse = - new AsyncQueryResponse(this, truncatedRequestId, requestMap, System.currentTimeMillis(), timeoutMs, + new AsyncQueryResponse(this, requestId, requestMap, System.currentTimeMillis(), timeoutMs, _serverRoutingStatsManager); - _asyncQueryResponseMap.put(truncatedRequestId, asyncQueryResponse); + _asyncQueryResponseMap.put(requestId, asyncQueryResponse); for (Map.Entry> serverRequests : requestMap.entrySet()) { for (InstanceRequest request : serverRequests.getValue()) { @@ -145,14 +142,14 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, if (ServerChannels.CHANNEL_LOCK_TIMEOUT_MSG.equals(e.getMessage())) { _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_CHANNEL_LOCK_TIMEOUT_EXCEPTIONS, 1); } - markQueryFailed(truncatedRequestId, serverRoutingInstance, asyncQueryResponse, e); + markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e); break; } catch (Exception e) { _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_SEND_EXCEPTIONS, 1); if (skipUnavailableServers) { asyncQueryResponse.skipServerResponse(); } else { - markQueryFailed(truncatedRequestId, serverRoutingInstance, asyncQueryResponse, e); + markQueryFailed(requestId, serverRoutingInstance, asyncQueryResponse, e); break; } } @@ -193,13 +190,10 @@ public void shutDown() { void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize, int deserializationTimeMs) { long requestId = Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName())); - // TODO(egalpin): parse out table type based on last digit of request ID? - // TODO(egalpin): How can we handle rolling out brokers expecting query_hash to be present, but while old server - // versions are still running and not yet setting the query hash from their side? If possible, deploying servers - // first would work. + // TODO(egalpin): remove use of canonical request ID in later major release AsyncQueryResponse asyncQueryResponse = - _asyncQueryResponseMap.get(requestId / BrokerRequestIdConstants.TABLE_TYPE_OFFSET); + _asyncQueryResponseMap.get(BrokerRequestIdUtils.getCanonicalRequestId(requestId)); // Query future might be null if the query is already done (maybe due to failure) if (asyncQueryResponse != null) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 5d291e73d9f..22dd2807dfa 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pinot.common.BrokerRequestIdConstants; +import org.apache.pinot.common.utils.request.BrokerRequestIdUtils; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.datatable.DataTableUtils; @@ -161,15 +161,15 @@ public void testValidResponse() throws Exception { long requestId = 1230; DataTable offlineDataTable = DataTableBuilderFactory.getEmptyDataTable(); - offlineDataTable.getMetadata() - .put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId + BrokerRequestIdConstants.OFFLINE_TABLE_DIGIT)); + offlineDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), + Long.toString(BrokerRequestIdUtils.getOfflineRequestId(requestId))); offlineDataTable.getMetadata() .put(MetadataKey.TABLE.getName(), OFFLINE_BROKER_REQUEST.getQuerySource().getTableName()); byte[] offlineResponseBytes = offlineDataTable.toBytes(); DataTable realtimeDataTable = DataTableBuilderFactory.getEmptyDataTable(); realtimeDataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), - Long.toString(requestId + BrokerRequestIdConstants.REALTIME_TABLE_DIGIT)); + Long.toString(BrokerRequestIdUtils.getRealtimeRequestId(requestId))); realtimeDataTable.getMetadata() .put(MetadataKey.TABLE.getName(), REALTIME_BROKER_REQUEST.getQuerySource().getTableName()); byte[] realtimeResponseBytes = realtimeDataTable.toBytes();