Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allows multiple requests per server per request ID #13742

Open
wants to merge 51 commits into
base: master
Choose a base branch
from

Conversation

egalpin
Copy link
Member

@egalpin egalpin commented Aug 3, 2024

Relates to #10712

This PR proposes to remove the concept of separate "servers" for OFFLINE and REALTIME query handling. Instead, queries are uniquely identified based on the physical table that they target in the actual query (myTable_OFFLINE or myTable_REALTIME). The hashcode of the Thrift PinotQuery object is unique per-server, per-physical-table.

This change helps pave the way for Logical Table support by allowing a single broker request to more easily "fanout" into arbitrarily many requests issued to each required server.

There may be a few rough edges here and there, but I'd like to open this for feedback on the concept and current implementation.

@codecov-commenter
Copy link

codecov-commenter commented Aug 7, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 100.00%. Comparing base (59551e4) to head (c660fcb).
Report is 1203 commits behind head on master.

Additional details and impacted files
@@              Coverage Diff               @@
##             master    #13742       +/-   ##
==============================================
+ Coverage     61.75%   100.00%   +38.24%     
+ Complexity      207         6      -201     
==============================================
  Files          2436         3     -2433     
  Lines        133233         6   -133227     
  Branches      20636         0    -20636     
==============================================
- Hits          82274         6    -82268     
+ Misses        44911         0    -44911     
+ Partials       6048         0     -6048     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 100.00% <ø> (+38.29%) ⬆️
java-21 100.00% <ø> (+38.37%) ⬆️
skip-bytebuffers-false 100.00% <ø> (+38.25%) ⬆️
skip-bytebuffers-true 0.00% <ø> (-27.73%) ⬇️
temurin 100.00% <ø> (+38.24%) ⬆️
unittests ?
unittests1 ?
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@egalpin
Copy link
Member Author

egalpin commented Aug 13, 2024

@siddharthteotia @jackjlli @vvivekiyer I'd love to get some guidance on the regression suites. I believe I understand why the failures exist, because depending on the order of which components get upgraded first, brokers and servers may be incompatible across versions with this change as it is currently proposed.

If servers were guaranteed to be upgraded first, upgrades should be smooth. However, if brokers were upgraded first then server responses of data tables would be lacking the expected metadata field QUERY_HASH.

What's the preferred approach for this kind of change? It may be possible to create intentional throw-away code that would provide a compatibility bridge, but this would imply that getting to the final state would require multiple upgrades (possibly spanning multiple minor versions, which feels very drawn out).

@@ -771,15 +756,14 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
// - Compile time function invocation
// - Literal only queries
// - Any rewrites
if (pinotQuery.isExplain()) {
// if (pinotQuery.isExplain()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is it commented out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -49,12 +50,12 @@ enum Status {
/**
* Returns the current server responses without blocking.
*/
Map<ServerRoutingInstance, ServerResponse> getCurrentResponses();
Map<ServerRoutingInstance, List<ServerResponse>> getCurrentResponses();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the code change in this PR is backward compatible. We definitely need to keep the existing signatures as is. Can we introduce new signatures and mark the existing ones Deprecated, so that the old signatures can be cleaned up in the next official release?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fully recognize that I may be missing some important details here, but can you help me understand the implications of changing this interface? I see that the interface QueryResponse and its only implementation AsyncQueryResponse are defined in the org.apache.pinot.core.transport package, but all usages are isolated to org.apache.pinot.broker.

I agree that the change needs to be backward compatible with the data sent from Pinot servers to the brokers so that mismatched versions of code running on brokers Vs servers can interoperate during upgrade. But I'm not yet certain that this interface needs to remain unchanged in order to accomplish that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackjlli I believe I've found a way to maintain compatibility such that brokers with this version could roll out and communicate with servers on a prior version, while still gathering the information required. This is done through borrow a single digit of the request ID (the least significant digit, such that it does not increase the probability of a broker ID hash collision) in order to use that bit to identify if a query pertained to a REALTIME or OFFLINE table.

Because in this version servers will only receive 1 or 2 queries, always targeting the same table "base" (i.e. foo of foo_OFFLINE), we can infer from the table type REALTIME or OFFLINE which request the server response pertains to. This system will not work in the future when a server will return responses for many different tables, but as soon as we have server versions deployed which always return the table name in the dataTable metadata, we can fully dismantle this requestId hijacking. Using the request ID to convey table type only needs to exist for the moments (or hours, depending on size of cluster) where brokers are expecting table name in dataTable metadata, but servers are not yet running the version which does so.

I feel like this provides a good bridge to be able to move forward with subsequent Logical Table implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wanted to add that the way the server responds is not changing such that the servers return a list and therefore the brokers need to change what they expect. It's only that all responses from the same server are grouped as a list upon receipt within the brokers.

*/
@ThreadSafe
public final class ServerRoutingInstance {
private static final String SHORT_OFFLINE_SUFFIX = "_O";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with this change we kind of lose the way of knowing how many servers get queried/respond for each table type. I understand that the number of queries being hit to the same server can be reduced, but is it possible to include this table type information in the request and response, so that we don't lose this stat?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// instance within the same query
return 31 * 31 * _hostname.hashCode() + 31 * Integer.hashCode(_port) + _tableType.hashCode();
return 31 * _hostname.hashCode() + Integer.hashCode(_port);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the hashcode inconsistent between two different versions of broker and servers. Can you double check if this has any side effects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have one major intentional implication, which is that we would no longer open 2 channels from each broker to each server (1 for REALTIME, 1 for OFFLINE). In turn, that would cause the peak network traffic on a channel to at worst double, assuming all queries were to hybrid tables. I'm not certain whether that presents an issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth stating that the motivation in doing so is to be able to have a single broker request fanout to an arbitrary number of requests to any single server involved in query processing. This unblocks a major hurdle in the ability to support Logical Tables[1].

[1] #10712

return _responseMap.get(serverRoutingInstance).getResponseDelayMs();
// TODO(egalpin): How to get query hash here?
return -1L;
// return _responseMap.get(serverRoutingInstance).getResponseDelayMs();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be uncommented, right?


// TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables
if (realtimeBrokerRequest != null && (!pinotQuery.isExplain() || offlineBrokerRequest != null)) {
// Don't send explain queries to realtime for OFFLINE or HYBRID tables
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackjlli I'll remove lines 759-766 as it's handled here now instead.

@egalpin
Copy link
Member Author

egalpin commented Sep 3, 2024

@siddharthteotia @jackjlli @vvivekiyer Is there a specific order in which different pinot components must be restarted when performing an upgrade?

I can think of a smooth way to introduce these changes if servers are updated first (servers start returning additional and initially unused metadata with datatable responses), but it's much more challenging as written if brokers must be updated first. Thoughts?

@jackjlli
Copy link
Member

jackjlli commented Sep 4, 2024

@siddharthteotia @jackjlli @vvivekiyer Is there a specific order in which different pinot components must be restarted when performing an upgrade?

I can think of a smooth way to introduce these changes if servers are updated first (servers start returning additional and initially unused metadata with datatable responses), but it's much more challenging as written if brokers must be updated first. Thoughts?

The common way to roll out any new release is controller -> broker -> server -> minion. In this case, it's always preferred to roll out pinot-brokers before pinot-servers. You can check the logic in pinot_compatibility_tests.yml which mimics this rollout behavior. So it'd be good that broker has the logic to recognize the old and new signatures (i.e. making it backward compatible).

@egalpin egalpin requested a review from jackjlli October 15, 2024 22:50
@egalpin
Copy link
Member Author

egalpin commented Oct 17, 2024

From what I can tell from the logs, the failing tests are coming from pinot-controller. However I do believe that they are flaky tests as I'm able to see success e2e tests locally via mvn test -T 16 -pl 'pinot-controller':

Screenshot 2024-10-17 at 15 27 16

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants