Start an Image with kcat / kafka-cat for Debugging
kubectl -n NAMESPACE run "$(whoami)-debug" -it --rm \
--image=confluentinc/cp-kafkacat:6.1.9 \
--restart=Never \
-- bash
Dockerfile for Kafka Analysis Container with different Tools
With jq, kafka console tools, schema registry tools and kafkacat installed ….
FROM confluentinc/cp-kafka:6.2.1 as cp-kafka
FROM confluentinc/cp-schema-registry:6.2.1 as cp-schema-registry
FROM debian:10-slim
ARG DEBIAN_FRONTEND=noninteractive
# Install necessary tools
RUN apt-get update && apt-get install -y \
curl \
jq \
yq \
&& rm -rf /var/lib/apt/lists/*
# Install kafkacat binary
RUN apt-get update && apt-get install -y kafkacat && rm -rf /var/lib/apt/lists/*
# Copy Kafka binaries
COPY --from=cp-kafka /usr/bin/kafka-* /usr/bin/
COPY --from=cp-schema-registry /usr/bin/schema-registry* /usr/bin/
# Copy entrypoint script
COPY entrypoint.sh /usr/bin/entrypoint.sh
RUN chmod +x /usr/bin/entrypoint.sh
ENTRYPOINT ["/usr/bin/entrypoint.sh"]
also using this entrypoint.sh
:
#!/bin/bash
export KAFKA_BROKER_LIST=localhost:9092
export SCHEMA_REGISTRY_URL=http://localhost:8081
/bin/bash
kcat / kafka-cat Useful commands
Consume 10 Messages from a Topic
kafkacat -b localhost:9092 -t TOPICNAME -C -c 10
Query for Topic Offset by Timestamp
For Partition 1
:
kafkacat -b localhost:9092 -Q -t TOPICNAME:1:2023-01-23T20:00:00+0100
Consume Messages between two timestamps
-
-o s@<TIMESTAMP> Start timestamp
-
-o e@<TIMESTAMP> End timestamp
kafkacat -C -b localhost:9092 -t TOPICNAME -o s@1234567 -o e@12345678
Read Avro Messages from a Topic
kafkacat -C -b localhost:9092 -t TOPICNAME -r http://SCHEMAREGISTRYHOST:8081 -p 0 -o -1 -s value=avro -e
Confluent Schema Registry
List existing Subjects
curl -XGET http://localhost:8081/subjects
List Schema for specific Subject
curl -XGET http://localhost:8081/subjects/<SUBJECTNAME>/versions/latest
Replace latest with an existing version-number to inspect other versions
|
Get Schema Versions for ID
curl -XGET http://localhost:8081/schemas/ids/<id>/versions
Check compatibility of a new Schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" http://localhost:8081/compatibility/subjects/mysubject_v1-value/versions/666 --data 'THE-NEW-SCHEMA'
A quick way to read and convert an Avro schema from an avsc file into the clipboard to paste it into the curl command above, is:
Or simply just generate the full curl command and copy it to the clipboard:
|
Resources
Often used CLI Commands
List Topics
kafka-topics --bootstrap-server SERVER:PORT --list
Create Topic
kafka-topics --create --bootstrap-server localhost:9092 --topic sample-topic --partitions 3 --replication-factor 3
Describe Topic
kafka-topics --bootstrap-server localhost:9092 --describe --topic sample-topic
Topic: sample-topic TopicId: dNC5PSxtSfeKhOZoYtUJHw PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: sample-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Offline:
Topic: sample-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Offline:
Topic: sample-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Offline:
Sending Message to a Topic
kafka-console-producer --bootstrap-server localhost:9092 --topic sample-topic
Listening to a Topic
kafka-console-consumer --bootstrap-server localhost:9092 --topic sample-topic --from-beginning
Get the latest 10 Avro Serialized Records from a Topic
kafka-avro-console-consumer --bootstrap-server kafka:9092 \
--topic my_topic \
--from-beginning \
--max-messages 10 \
--property schema.registry.url=http://schemaregistry:8081
Inspect Consumer Groups and Consumer Lag
kafka-consumer-groups --bootstrap-server SERVER:PORT --all-groups count_errors --describe
Inspect Avro Messages with Confluence Schema Registry Tools
-
Start temporary debug container
kubectl -n NAMESPACE run "$(whoami)-analyse" -it --rm \ --image=myregistry.domain/confluentinc/cp-schema-registry:6.2.7 \ --restart=Never \ -- bash
-
Read 10 messages with kafka-avro-consumer
kafka-avro-console-consumer --bootstrap-server server:9092 \ --topic TOPICNAME \ --from-beginning \ --max-messages 10 \ --property schema.registry.url=http://schemaregistry.domain:8081
Avro
avrotools
Download from Maven repository here: https://repo1.maven.org/maven2/org/apache/avro/avro-tools/
$ java -jar avrotools-1.11.0.jar
Version 1.11.1 of Apache Avro
Copyright 2010-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (https://www.apache.org/).
----------------
Available tools:
canonical Converts an Avro Schema to its canonical form
cat Extracts samples from files
compile Generates Java code for the given schema.
concat Concatenates avro files without re-compressing.
count Counts the records in avro files or folders
fingerprint Returns the fingerprint for the schemas.
fragtojson Renders a binary-encoded Avro datum as JSON.
fromjson Reads JSON records and writes an Avro data file.
fromtext Imports a text file into an avro data file.
getmeta Prints out the metadata of an Avro data file.
getschema Prints out schema of an Avro data file.
idl Generates a JSON schema from an Avro IDL file
idl2schemata Extract JSON schemata of the types from an Avro IDL file
induce Induce schema/protocol from Java class/interface via reflection.
jsontofrag Renders a JSON-encoded Avro datum as binary.
random Creates a file with randomly generated instances of a schema.
recodec Alters the codec of a data file.
repair Recovers data from a corrupt Avro Data file
rpcprotocol Output the protocol of a RPC service
rpcreceive Opens an RPC Server and listens for one message.
rpcsend Sends a single RPC message.
tether Run a tethered mapreduce job.
tojson Dumps an Avro data file as JSON, record per line or pretty.
totext Converts an Avro data file to a text file.
totrevni Converts an Avro data file to a Trevni file.
trevni_meta Dumps a Trevni file's metadata as JSON.
trevni_random Create a Trevni file filled with random instances of a schema.
trevni_tojson Dumps a Trevni file as JSON.
Resources:
Serialization Formats
Avro Json encoding
Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization) Binary format with an header that contains the full schema, this is the format usually used when writing Avro files
Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding) Binary format with an header with only the fingerprint/id of the schema, this it the format used by Kafka (see this
Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding) Binary format without the header, the most compact format
Troubleshooting
Inspired by an article Datameer: How to troubleshoot AVRO and JSON files on CLI
I have used sample data from Michael Nolls Github Repository here: https://github.com/miguno/avro-cli-examples |
Check File Header with hexdump
hexdump -C -n 200 twitter.avro
00000000 4f 62 6a 01 04 16 61 76 72 6f 2e 73 63 68 65 6d |Obj...avro.schem|
00000010 61 e8 05 7b 22 74 79 70 65 22 3a 22 72 65 63 6f |a..{"type":"reco|
00000020 72 64 22 2c 22 6e 61 6d 65 22 3a 22 74 77 69 74 |rd","name":"twit|
00000030 74 65 72 5f 73 63 68 65 6d 61 22 2c 22 6e 61 6d |ter_schema","nam|
00000040 65 73 70 61 63 65 22 3a 22 63 6f 6d 2e 6d 69 67 |espace":"com.mig|
00000050 75 6e 6f 2e 61 76 72 6f 22 2c 22 66 69 65 6c 64 |uno.avro","field|
00000060 73 22 3a 5b 7b 22 6e 61 6d 65 22 3a 22 75 73 65 |s":[{"name":"use|
00000070 72 6e 61 6d 65 22 2c 22 74 79 70 65 22 3a 22 73 |rname","type":"s|
00000080 74 72 69 6e 67 22 2c 22 64 6f 63 22 3a 22 4e 61 |tring","doc":"Na|
00000090 6d 65 20 6f 66 20 74 68 65 20 75 73 65 72 20 61 |me of the user a|
000000a0 63 63 6f 75 6e 74 20 6f 6e 20 54 77 69 74 74 65 |ccount on Twitte|
000000b0 72 2e 63 6f 6d 22 7d 2c 7b 22 6e 61 6d 65 22 3a |r.com"},{"name":|
000000c0 22 74 77 65 65 74 22 2c |"tweet",|
We can see that there is a schema present, so far, so good ….
Inspect Metadata
java -jar avro-tools-1.11.0.jar getmeta twitter.avro
avro.schema {"type":"record","name":"twitter_schema","namespace":"com.miguno.avro","fields":[{"name":"username","type":"string","doc":"Name of the user account on Twitter.com"},{"name":"tweet","type":"string","doc":"The content of the user's Twitter message"},{"name":"timestamp","type":"long","doc":"Unix epoch time in seconds"}],"doc:":"A basic schema for storing Twitter messages"}
Get the Schema
java -jar avro-tools-1.11.0.jar getschema twitter.avro > twitter.avsc
cat twitter.avsc
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.miguno.avro",
"fields" : [ {
"name" : "username",
"type" : "string",
"doc" : "Name of the user account on Twitter.com"
}, {
"name" : "tweet",
"type" : "string",
"doc" : "The content of the user's Twitter message"
}, {
"name" : "timestamp",
"type" : "long",
"doc" : "Unix epoch time in seconds"
} ],
"doc:" : "A basic schema for storing Twitter messages"
}
Read Avro Data without Code Generation
Schema schema = new Schema.Parser().parse(new File("article.avsc"));
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File("/tmp/articles.avro"), datumReader);
GenericRecord article = null;
while (dataFileReader.hasNext()) {
// Reuse article object by passing it to next().
// Avoids allocating and garbage collecting many objects
article = dataFileReader.next(article);
// do stuff ...
}