Skip to content

Commit

Permalink
Instantiate plugin threads after deserialiser
Browse files Browse the repository at this point in the history
This moves the instantiation of the plugin thread objects after the
creation of the deserialiser. Previously, the threads were trying to
access the deserialiser which wasn't initialised yet and were trying to
work with a null pointer.

Resolves: OpenTSDB#10
  • Loading branch information
muffix committed Dec 3, 2018
1 parent ebaa8af commit 4f3a8a7
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {
: KafkaRpcPluginConfig.DEFAULT_CONSUMER_THREADS;
kafka_consumers = new ArrayList<KafkaRpcPluginThread>(num_threads);

for (int i = 0; i < num_threads; i++) {
kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics));
}

timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS);

final String deser_class = config.getString(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + groupID + ".deserializer");
if (Strings.isNullOrEmpty(deser_class)) {
Expand Down Expand Up @@ -179,6 +173,12 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {
throw new IllegalArgumentException("Unable to find a deserializer "
+ "for class [" + deser_class + "]");
}

for (int i = 0; i < num_threads; i++) {
kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics));
}

timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down

0 comments on commit 4f3a8a7

Please sign in to comment.