Robin Moffatt <robin@confluent.io> v1.00, September 20, 2018
KSQL is the powerful SQL Streaming engine for Apache Kafka. Using standard SQL statements you can build powerful stream processing applications. In this article we’ll see how to troubleshoot some of the common issues that people encounter with KSQL. More advanced topics such as digging deeper into the internals for more advanced troubleshooting will be covered by a subsequent article.
You can use this article for reference, or you can follow along with the code examples to try them out as we go. Using Docker and Docker Compose, we can easily provision an environment in which to explore and try out the different techniques and tools. The environment includes a data generator for a continuous stream of events into a Kafka topic that we will use for testing. You’ll find all the necessary code on GitHub.
So now let’s dive in and start exploring what to do when things aren’t working…
Probably the most common question in the Confluent Community Slack group's #ksql channel is:
Why is my KSQL query not returning data?
That is, you’ve run a CREATE STREAM
, but when you go to query it…
ksql> SELECT * FROM MY_FIRST_KSQL_STREAM;
…nothing happens. And because KSQL queries are continuous, your KSQL session appears to "hang". That’s because KSQL’s continuing to wait for any new messages to show you. So if your run a KSQL SELECT
and get no results back, what could be the reasons for that?
The answer usually comes down to one of five main reasons (the first four of which are variations on a theme):
-
No data in the source topic
-
No new data arriving in the topic
-
KSQL consuming from a later offset than for which there is data
-
Data is read from the topic but none of it matches the predicate (
WHERE
) specified -
Deserialization errors in reading the data
Let’s look at each of these in turn and how to diagnose them.
Let’s work this through from the start. We believe we’ve got data in a topic called ratingz
, so we register it with KSQL as a STREAM
:
ksql> CREATE STREAM RATINGS (rating_id BIGINT, \
user_id BIGINT, \
stars INT, \
route_id BIGINT, \
rating_time BIGINT, \
channel VARCHAR, \
message varchar) \
WITH (KAFKA_TOPIC='ratingz', \
VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql>
And then, we query the new stream:
SELECT rating_id, user_id, message FROM RATINGS;
No data comes back—so let’s now do some detective work.
The first thing is to confirm which Kafka topic we’re using to drive the stream. We think we know (we just ran the CREATE STREAM
, right?), but as with any good troubleshooting approach the key is methodically working through the possibilities.
So using the DESCRIBE EXTENDED
command we can verify the topic from which the stream is sourced:
ksql> DESCRIBE EXTENDED RATINGS;
Name : RATINGS
Type : STREAM
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : ratingz (partitions: 1, replication: 1) // (1)
[...]
-
The Kafka topic used by this stream
So our source topic is ratingz
. Now we will step away from KSQL and use another Kafka consumer to verify if there is data in the topic. My preferred tool here is kafkacat
but you can use other tools, such as kafka-console-consumer
, if you like. Invoking kafkacat shows:
$ docker run --network ksql-troubleshooting_default --tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 \
-C -t ratingz \
-o beginning
% Reached end of topic ratingz [0] at offset 0
The -o beginning
tells it to go back to the beginning of the topic, and -C
to read the messages. Reached end of topic
shows that there’s no data to read. No data means that KSQL isn’t going to be showing anything in the output of a SELECT
!
Note
|
⇒ So in this case, there’s no data in the source topic. Turns out we mistook the topic name, and used ratingz instead of ratings ! D’oh!
|
Let’s fix our stream, and use the ratings
topic (with an s
this time). Register it with KSQL, dropping the previous version first:
ksql> DROP STREAM RATINGS;
Message
------------------------------
Source RATINGS was dropped.
------------------------------
ksql> CREATE STREAM RATINGS (rating_id BIGINT, user_id BIGINT, stars INT, route_id BIGINT, rating_time BIGINT, channel VARCHAR, message varchar) WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql>
As before, try to query the stream, and find there’s no data being returned:
SELECT rating_id, user_id, message FROM RATINGS;
[ no output returned from KSQL ]
Let’s do as before, and first verify the source topic for the stream that we’re querying:
ksql> DESCRIBE EXTENDED RATINGS;
[...]
Kafka topic : ratings (partitions: 1, replication: 1)
and use kafkacat
to check if there’s any data in it:
$ docker run --network ksql-troubleshooting_default --tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 \
-C -t ratings \
-o beginning
{"rating_id":1,"user_id":2,"stars":1,"route_id":2350,"rating_time":1537182554356,"channel":"web","message":"thank you for the most friendly, helpful experience today at your new lounge"}
{"rating_id":2,"user_id":10,"stars":3,"route_id":4161,"rating_time":1537182555220,"channel":"web","message":"more peanuts please"}
[...]
Turns out there’s thousands of messages in the topic! But, by default, KSQL reads from the end of a topic, and no new messages were being written the topic. As soon as new messages were sent to it, the SELECT
returns results
ksql> SELECT rating_id, user_id, message FROM RATINGS;
1 | 8 | (expletive deleted)
2 | 19 | more peanuts please
3 | 8 | meh
[...]
Note
|
⇒ So here we just needed to feed the topic more data. What about if you want to look at data already in the topic? That’s what we’ll look at in the next section. |
Kafka is an immutable log of events, and data is persisted according to the retention settings. When an application reads data from a Kafka topic, the data remains in place, but the offset in the log at which that particular application has read up to is recorded. Another application can read the same data from the same topic, completely independently from the first. The main thing is that there is a log of data, and consuming applications choose the point on the log at which they want to read.
When KSQL reads data from a topic, it will default to read from the latest offset—that is to say, only new messages arriving in the topic after the topic is registered in KSQL.
You can verify the offset setting using LIST PROPERTIES
:
ksql> LIST PROPERTIES;
Property | Value
------------------------------------------------------------------------------------------------------------------------
[...]
ksql.streams.auto.offset.reset | latest
[...]
Often—and particularly in testing and development—you’ll want to read the data that already exists in a topic. To tell KSQL to do this, you change the offset property:
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql>
Now when you run a SELECT
, KSQL will return the data from the beginning of the topic. The SELECT
will still run continuously, so if there is new data arriving you’ll see that—and if there isn’t the SELECT
will just hang and wait for new data (or for you to cancel the query).
Data in Kafka is just bytes. It’s up to the producer how it serialises the source message, and the consumer (which is KSQL here) needs to deserialise using the same method. Common serialisation formats include Avro, JSON, etc.
If KSQL cannot deserialise message data, it will not write anything to the SELECT
results. If this happens, you could have checked the three situations above and ruled them out—but still not have any data returned to your SELECT
.
Here’s a simple example, using one of the existing internal topics called _confluent-metrics
. Let’s register it using a fictional schema that we believe to be correct for the purposes of this example, and declare the serialisation format of the message values to be JSON (tip: it’s not JSON!):
CREATE STREAM METRICS (col1 int, col2 int, col3 varchar) \
WITH (KAFKA_TOPIC='_confluent-metrics', VALUE_FORMAT='JSON');
Taking the lesson from above, set the offset to earliest so that we definitely will pull all the messages, and run a SELECT
:
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'
ksql> SELECT * FROM METRICS;
So…no results coming back. Let’s go through the checklist (although we can check off the offset already, as we’ve specifically set that):
-
What topic are we querying?
ksql> DESCRIBE EXTENDED METRICS; [...] Kafka topic : _confluent-metrics (partitions: 12, replication: 1)
-
Is there any data in it?
$ docker run --network ksql-troubleshooting_default --tty --interactive --rm \ confluentinc/cp-kafkacat \ kafkacat -b kafka:29092 \ -C -t _confluent-metrics \ -o beginning -c 1 (1) ���,� kafka.logSizeLog"$partition.9.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=90� kafka.logSizeLog"$partition.8.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=80� kafka.logSizeLog"$partition.7.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=70� kafka.logSizeLog"$partition.6.topic.__consumer_offsets*Akafka.log:type=Log,name=Size,topic=__consumer_offsets,partition=60� [...]
-
The
-c 1
argument tellskafkacat
to just return the one message and then exit
-
So, there is data, we’re querying the correct topic, we’ve set the offset back to the begining…why isn’t KSQL returning data?
Well, the data we can see from the output of kafkacat
clearly isn’t JSON, which is what we declared in the CREATE STREAM
command. If we go to the KSQL server log file, you’ll see a whole bunch of these deserialisation errors:
[2018-09-17 12:29:09,929] WARN task [0_10] Skipping record due to deserialization error. topic=[_confluent-metrics] partition=[10] offset=[70] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: _confluent-metrics
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ((CTRL-CHAR, code 127)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: (byte[])�����,�
�
[...] [truncated 1544 bytes]; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:567)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2624)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:88)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:77)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:45)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
[...]
You can see from the stack track it’s using the JSON deserialiser (as you’d expect, given our VALUE_FORMAT
configuration), and you can also see from the sample message in the log output that it’s shown ([Source: (byte[])�����,� �
) that it clearly isn’t JSON.
If you hit this problem, then you need to synchronise your serialisation and deserialisation formats. KSQL supports delimited (CSV), JSON, or Avro. If you’re using Protobuf then check out KLIP-0 which proposes adding this to KSQL.
Following on from the above example of no messages being returned, you may also see cases where only some of the messages are shown, and it could be the same root cause—serialisation—but with a different slant. Instead of simply getting your serialisation format wrong, maybe you chose the right serialisation format, but there are some malformed messages on the topic.
Let’s see a simple example. We’ll put some data onto a new topic, using JSON but with some malformed messages
docker run --interactive --rm --network ksql-troubleshooting_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 \
-t dummy_topic \
-P <<EOF
{"col1":1,"col2":16000}
{"col1":2,"col2:42000}
{"col1":3,"col2":94000}
EOF
Note that the second message is invalid JSON, as it’s missing a "
after the field name (col2
).
Register the topic in KSQL:
ksql> CREATE STREAM DUMMY (COL1 INT, COL2 VARCHAR) \
WITH (KAFKA_TOPIC='dummy_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
And now, remembering the lesson from above, set the offset to earliest so that we definitely will pull all the messages, and run a SELECT
:
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'none' to 'earliest'
ksql> SELECT * FROM DUMMY;
1537186945005 | null | 1 | 16000
1537186945005 | null | 3 | 94000
Note that we only get two messages, even though there are three on the topic.
If you check out the KSQL Server log you’ll see
[2018-09-17 13:03:13,662] WARN task [0_0] Skipping record due to deserialization error. topic=[dummy_topic] partition=[0] offset=[1] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: dummy_topic
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
at [Source: (byte[])"{"col1":2,"col2:42000}"; line: 1, column: 45]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:594)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.parseEscapedName(UTF8StreamJsonParser.java:1956)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.slowParseName(UTF8StreamJsonParser.java:1861)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1645)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:999)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:247)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
Note the partition and offset shown in the error message (partition=[0] offset=[1]
). Head back to the ever-versatile kafkacat
and run:
docker run --network ksql-troubleshooting_default \
--tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 -C -K: \
-f '\nKey: %k\t\nValue: %s\n\Partition: %p\tOffset: %o\n--\n' \ (1)
-t dummy_topic \
-o 1 \ (2)
-p 0 \ (3)
-c 1 (4)
-
-f
to format the output and show some nice metadata -
-o 1
start at offset 1 -
-p 0
read from partition 0 -
-c 1
consume just one message
The output of this is:
Key:
Value: {"col1":2,"col2:42000}
Partition: 0 Offset: 1
--
And shows us, if we were in any doubt, that the message value is not valid JSON—and thus can’t be consumed by KSQL.
KSQL writes most of its logs to stdout
by default. If you’re running KSQL using Docker then you’ll find the output in the container logs themselves, for example:
-
docker logs 483b1958efc4
-
docker-compose logs ksql-server
Using the Confluent CLI you can run :
-
confluent log ksql-server
If you’ve installed Confluent Platform using rpm/deb then you should find the logs under /var/log/confluent/
.
So you’re still stuck, and you need more help? There’s several places to turn:
-
KSQL is supported as part of the Confluent Enterprise platform— contact us for details
-
Community support for KSQL is available:
-
Confluent Community Slack #ksql channel
-
Search for similar issues on GitHub, or raise a new issue if one doesn’t exist
-
Other articles in this series:
-
Part 2: What’s happening under the covers?—dive into KSQL internals, and look at tools we can use for monitoring and examining the behaviour of running queries and KSQL itself