Skip to content

Commit

Permalink
Merge branch 'apache:master' into hadoop-ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
chrajeshbabu authored Oct 16, 2024
2 parents 9a7e252 + b556b37 commit 0f9e4e8
Show file tree
Hide file tree
Showing 52 changed files with 1,190 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions);
queryTimeoutMs = timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs;
database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
QueryEnvironment queryEnvironment = new QueryEnvironment(database, _tableCache, _workerManager);
boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT,
CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT);
//@formatter:off
QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder()
.database(database)
.tableCache(_tableCache)
.workerManager(_workerManager)
.defaultInferPartitionHint(inferPartitionHint)
.build());
//@formatter:on
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
boolean askServers = QueryOptionsUtils.isExplainAskingServers(queryOptions)
Expand Down
9 changes: 9 additions & 0 deletions pinot-clients/pinot-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,13 @@
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
</profiles>
</project>
6 changes: 6 additions & 0 deletions pinot-clients/pinot-jdbc-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,11 @@
<shade.phase.prop>package</shade.phase.prop>
</properties>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
</profiles>
</project>
6 changes: 6 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@
<shade.phase.prop>package</shade.phase.prop>
</properties>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
<profile>
<!-- The fmpp-maven-plugin doesn't care about unchanged (re)sources and will always generate.
This causes the maven-compiler-plugin to detect changes and always recompile the Java sources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private static class Entry {
final Function<IdealState, IdealState> _updater;
IdealState _updatedIdealState = null;
AtomicBoolean _sent = new AtomicBoolean(false);
Throwable _exception;

Entry(String resourceName, Function<IdealState, IdealState> updater) {
_resourceName = resourceName;
Expand Down Expand Up @@ -106,8 +107,8 @@ public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNum
* @param updater the idealState updater to be applied
* @return IdealState if the update is successful, null if not
*/
public IdealState commit(HelixManager helixManager, String resourceName,
Function<IdealState, IdealState> updater, RetryPolicy retryPolicy, boolean noChangeOk) {
public IdealState commit(HelixManager helixManager, String resourceName, Function<IdealState, IdealState> updater,
RetryPolicy retryPolicy, boolean noChangeOk) {
Queue queue = getQueue(resourceName);
Entry entry = new Entry(resourceName, updater);

Expand All @@ -120,39 +121,41 @@ public IdealState commit(HelixManager helixManager, String resourceName,
// All pending entries have been processed, the updatedIdealState should be set.
return entry._updatedIdealState;
}
// remove from queue
Entry first = queue._pending.poll();
processed.add(first);
String mergedResourceName = first._resourceName;
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey idealStateKey = dataAccessor.keyBuilder().idealStates(resourceName);
IdealState idealState = dataAccessor.getProperty(idealStateKey);

// Make a copy of the idealState above to pass it to the updater
// NOTE: new IdealState(idealState.getRecord()) does not work because it's shallow copy for map fields and
// list fields
IdealState idealStateCopy = HelixHelper.cloneIdealState(idealState);

/**
* If the local cache does not contain a value, need to check if there is a
* value in ZK; use it as initial value if exists
*/
IdealState updatedIdealState = first._updater.apply(idealStateCopy);
first._updatedIdealState = updatedIdealState;
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
if (!ent._resourceName.equals(mergedResourceName)) {
continue;
IdealState response = updateIdealState(helixManager, resourceName, idealState -> {
IdealState updatedIdealState = idealState;
if (!processed.isEmpty()) {
queue._pending.addAll(processed);
processed.clear();
}
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
if (!ent._resourceName.equals(resourceName)) {
continue;
}
processed.add(ent);
it.remove();
updatedIdealState = ent._updater.apply(updatedIdealState);
ent._updatedIdealState = updatedIdealState;
ent._exception = null;
}
processed.add(ent);
updatedIdealState = ent._updater.apply(idealStateCopy);
ent._updatedIdealState = updatedIdealState;
it.remove();
return updatedIdealState;
}, retryPolicy, noChangeOk);
if (response == null) {
RuntimeException ex = new RuntimeException("Failed to update IdealState");
for (Entry ent : processed) {
ent._exception = ex;
ent._updatedIdealState = null;
}
throw ex;
}
} catch (Throwable e) {
// If there is an exception, set the exception for all processed entries
for (Entry ent : processed) {
ent._exception = e;
ent._updatedIdealState = null;
}
IdealState finalUpdatedIdealState = updatedIdealState;
updateIdealState(helixManager, resourceName, anyIdealState -> finalUpdatedIdealState,
retryPolicy, noChangeOk);
throw e;
} finally {
queue._running.set(null);
for (Entry e : processed) {
Expand All @@ -176,6 +179,10 @@ public IdealState commit(HelixManager helixManager, String resourceName,
}
}
}
if (entry._exception != null) {
throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName,
entry._exception);
}
return entry._updatedIdealState;
}

Expand Down Expand Up @@ -298,7 +305,7 @@ private boolean shouldCompress(IdealState is) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_SUCCESS, 1L);
}
return idealStateWrapper._idealState;
} catch (Exception e) {
} catch (Throwable e) {
if (controllerMetrics != null) {
controllerMetrics.addMeteredValue(resourceName, ControllerMeter.IDEAL_STATE_UPDATE_FAILURE, 1L);
}
Expand Down
62 changes: 39 additions & 23 deletions pinot-connectors/pinot-spark-2-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,45 @@
</properties>

<profiles>
<profile>
<id>build-shaded-jar</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
<profile>
<id>scala-2.12</id>
<activation>
Expand Down Expand Up @@ -91,29 +130,6 @@
<build>
<plugins>
<!-- scala build -->
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Expand Down
60 changes: 37 additions & 23 deletions pinot-connectors/pinot-spark-3-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,43 @@
</properties>

<profiles>
<profile>
<id>build-shaded-jar</id>
<build>
<plugins>

<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>pinot-fastdev</id>
<properties>
<shade.phase.prop>none</shade.phase.prop>
</properties>
</profile>
<profile>
<id>scala-2.12</id>
<activation>
Expand Down Expand Up @@ -62,29 +99,6 @@
<build>
<plugins>
<!-- scala build -->
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>${shadeBase}.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
Expand Down
Loading

0 comments on commit 0f9e4e8

Please sign in to comment.