-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allows multiple requests per server per request ID #13742
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #13742 +/- ##
==============================================
+ Coverage 61.75% 100.00% +38.24%
+ Complexity 207 6 -201
==============================================
Files 2436 3 -2433
Lines 133233 6 -133227
Branches 20636 0 -20636
==============================================
- Hits 82274 6 -82268
+ Misses 44911 0 -44911
+ Partials 6048 0 -6048
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
…posed to map entries
…tting QueryOptionKey.SERVER_RETURN_FINAL_RESULT
@siddharthteotia @jackjlli @vvivekiyer I'd love to get some guidance on the regression suites. I believe I understand why the failures exist, because depending on the order of which components get upgraded first, brokers and servers may be incompatible across versions with this change as it is currently proposed. If servers were guaranteed to be upgraded first, upgrades should be smooth. However, if brokers were upgraded first then server responses of data tables would be lacking the expected metadata field What's the preferred approach for this kind of change? It may be possible to create intentional throw-away code that would provide a compatibility bridge, but this would imply that getting to the final state would require multiple upgrades (possibly spanning multiple minor versions, which feels very drawn out). |
@@ -771,15 +756,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S | |||
// - Compile time function invocation | |||
// - Literal only queries | |||
// - Any rewrites | |||
if (pinotQuery.isExplain()) { | |||
// if (pinotQuery.isExplain()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why is it commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -49,12 +50,12 @@ enum Status { | |||
/** | |||
* Returns the current server responses without blocking. | |||
*/ | |||
Map<ServerRoutingInstance, ServerResponse> getCurrentResponses(); | |||
Map<ServerRoutingInstance, List<ServerResponse>> getCurrentResponses(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the code change in this PR is backward compatible. We definitely need to keep the existing signatures as is. Can we introduce new signatures and mark the existing ones Deprecated
, so that the old signatures can be cleaned up in the next official release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fully recognize that I may be missing some important details here, but can you help me understand the implications of changing this interface? I see that the interface QueryResponse
and its only implementation AsyncQueryResponse
are defined in the org.apache.pinot.core.transport
package, but all usages are isolated to org.apache.pinot.broker
.
I agree that the change needs to be backward compatible with the data sent from Pinot servers to the brokers so that mismatched versions of code running on brokers Vs servers can interoperate during upgrade. But I'm not yet certain that this interface needs to remain unchanged in order to accomplish that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackjlli I believe I've found a way to maintain compatibility such that brokers with this version could roll out and communicate with servers on a prior version, while still gathering the information required. This is done through borrow a single digit of the request ID (the least significant digit, such that it does not increase the probability of a broker ID hash collision) in order to use that bit to identify if a query pertained to a REALTIME or OFFLINE table.
Because in this version servers will only receive 1 or 2 queries, always targeting the same table "base" (i.e. foo of foo_OFFLINE), we can infer from the table type REALTIME or OFFLINE which request the server response pertains to. This system will not work in the future when a server will return responses for many different tables, but as soon as we have server versions deployed which always return the table name in the dataTable metadata, we can fully dismantle this requestId hijacking. Using the request ID to convey table type only needs to exist for the moments (or hours, depending on size of cluster) where brokers are expecting table name in dataTable metadata, but servers are not yet running the version which does so.
I feel like this provides a good bridge to be able to move forward with subsequent Logical Table implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes specifically can be viewed like so (a subset of overall commits): https://github.com/apache/pinot/pull/13742/files/2ffd65d8f869f5f707af708f1680cbb45fb3a2ea..734189bc7da58705f6090102abbec76758ab84ee
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also wanted to add that the way the server responds is not changing such that the servers return a list and therefore the brokers need to change what they expect. It's only that all responses from the same server are grouped as a list upon receipt within the brokers.
*/ | ||
@ThreadSafe | ||
public final class ServerRoutingInstance { | ||
private static final String SHORT_OFFLINE_SUFFIX = "_O"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with this change we kind of lose the way of knowing how many servers get queried/respond for each table type. I understand that the number of queries being hit to the same server can be reduced, but is it possible to include this table type information in the request and response, so that we don't lose this stat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linking comments: https://github.com/apache/pinot/pull/13742/files#r1716084206
// instance within the same query | ||
return 31 * 31 * _hostname.hashCode() + 31 * Integer.hashCode(_port) + _tableType.hashCode(); | ||
return 31 * _hostname.hashCode() + Integer.hashCode(_port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make the hashcode inconsistent between two different versions of broker and servers. Can you double check if this has any side effects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will have one major intentional implication, which is that we would no longer open 2 channels from each broker to each server (1 for REALTIME, 1 for OFFLINE). In turn, that would cause the peak network traffic on a channel to at worst double, assuming all queries were to hybrid tables. I'm not certain whether that presents an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth stating that the motivation in doing so is to be able to have a single broker request fanout to an arbitrary number of requests to any single server involved in query processing. This unblocks a major hurdle in the ability to support Logical Tables[1].
[1] #10712
return _responseMap.get(serverRoutingInstance).getResponseDelayMs(); | ||
// TODO(egalpin): How to get query hash here? | ||
return -1L; | ||
// return _responseMap.get(serverRoutingInstance).getResponseDelayMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be uncommented, right?
|
||
// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables | ||
if (realtimeBrokerRequest != null && (!pinotQuery.isExplain() || offlineBrokerRequest != null)) { | ||
// Don't send explain queries to realtime for OFFLINE or HYBRID tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackjlli I'll remove lines 759-766 as it's handled here now instead.
@siddharthteotia @jackjlli @vvivekiyer Is there a specific order in which different pinot components must be restarted when performing an upgrade? I can think of a smooth way to introduce these changes if servers are updated first (servers start returning additional and initially unused metadata with datatable responses), but it's much more challenging as written if brokers must be updated first. Thoughts? |
The common way to roll out any new release is controller -> broker -> server -> minion. In this case, it's always preferred to roll out pinot-brokers before pinot-servers. You can check the logic in |
…server always sends tableName in meta
Relates to #10712
This PR proposes to remove the concept of separate "servers" for OFFLINE and REALTIME query handling. Instead, queries are uniquely identified based on the physical table that they target in the actual query (myTable_OFFLINE or myTable_REALTIME). The hashcode of the Thrift PinotQuery object is unique per-server, per-physical-table.
This change helps pave the way for Logical Table support by allowing a single broker request to more easily "fanout" into arbitrarily many requests issued to each required server.
There may be a few rough edges here and there, but I'd like to open this for feedback on the concept and current implementation.