Start an Image with kcat / kafka-cat for Debugging

kubectl -n NAMESPACE run "$(whoami)-debug" -it --rm \
    --image=confluentinc/cp-kafkacat:6.1.9 \
    --restart=Never \
    -- bash

Dockerfile for Kafka Analysis Container with different Tools

With jq, kafka console tools, schema registry tools and kafkacat installed …​.

Dockerfile
FROM confluentinc/cp-kafka:6.2.1 as cp-kafka
FROM confluentinc/cp-schema-registry:6.2.1 as cp-schema-registry

FROM debian:10-slim
ARG DEBIAN_FRONTEND=noninteractive

# Install necessary tools
RUN apt-get update && apt-get install -y \
    curl \
    jq \
    yq \
    && rm -rf /var/lib/apt/lists/*

# Install kafkacat binary
RUN apt-get update && apt-get install -y kafkacat && rm -rf /var/lib/apt/lists/*

# Copy Kafka binaries
COPY --from=cp-kafka /usr/bin/kafka-* /usr/bin/
COPY --from=cp-schema-registry /usr/bin/schema-registry* /usr/bin/

# Copy entrypoint script
COPY entrypoint.sh /usr/bin/entrypoint.sh
RUN chmod +x /usr/bin/entrypoint.sh

ENTRYPOINT ["/usr/bin/entrypoint.sh"]

also using this entrypoint.sh:

entrypoint.sh
#!/bin/bash

export KAFKA_BROKER_LIST=localhost:9092
export SCHEMA_REGISTRY_URL=http://localhost:8081

/bin/bash

kcat / kafka-cat Useful commands

Consume 10 Messages from a Topic

kafkacat -b localhost:9092 -t TOPICNAME -C -c 10

Query for Topic Offset by Timestamp

For Partition 1:

kafkacat -b localhost:9092 -Q -t TOPICNAME:1:2023-01-23T20:00:00+0100

Consume Messages between two timestamps

  • -o s@<TIMESTAMP> Start timestamp

  • -o e@<TIMESTAMP> End timestamp

kafkacat -C -b localhost:9092 -t TOPICNAME -o s@1234567 -o e@12345678

Read Avro Messages from a Topic

kafkacat -C -b localhost:9092 -t TOPICNAME -r http://SCHEMAREGISTRYHOST:8081 -p 0 -o -1 -s value=avro -e

Confluent Schema Registry

List existing Subjects

curl -XGET http://localhost:8081/subjects

List Schema for specific Subject

curl -XGET http://localhost:8081/subjects/<SUBJECTNAME>/versions/latest
Replace latest with an existing version-number to inspect other versions

Get Schema Versions for ID

curl -XGET http://localhost:8081/schemas/ids/<id>/versions

Check compatibility of a new Schema

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" http://localhost:8081/compatibility/subjects/mysubject_v1-value/versions/666 --data 'THE-NEW-SCHEMA'

A quick way to read and convert an Avro schema from an avsc file into the clipboard to paste it into the curl command above, is:

jq '. | {schema: tojson}' src/main/avro/myschema.avsc | pbcopy

Or simply just generate the full curl command and copy it to the clipboard:

SCHEMA_JSON=$(jq '. | {schema: tojson}' src/main/avro/myschema.avsc) &&
echo "curl -X POST -H \"Content-Type: application/vnd.schemaregistry.v1+json\" http://localhost:8081/compatibility/subjects/mysubject_v1-value/versions/666 --data '${SCHEMA_JSON}'" | envsubst | pbcopy

Resources

Often used CLI Commands

List Topics

kafka-topics --bootstrap-server SERVER:PORT --list

Create Topic

kafka-topics --create --bootstrap-server localhost:9092 --topic sample-topic --partitions 3 --replication-factor 3

Describe Topic

kafka-topics --bootstrap-server localhost:9092 --describe --topic sample-topic
Topic: sample-topic     TopicId: dNC5PSxtSfeKhOZoYtUJHw PartitionCount: 3       ReplicationFactor: 1    Configs:
        Topic: sample-topic     Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Offline:
        Topic: sample-topic     Partition: 1    Leader: 1       Replicas: 1     Isr: 1  Offline:
        Topic: sample-topic     Partition: 2    Leader: 1       Replicas: 1     Isr: 1  Offline:

Sending Message to a Topic

kafka-console-producer --bootstrap-server localhost:9092 --topic sample-topic

Listening to a Topic

kafka-console-consumer --bootstrap-server localhost:9092 --topic sample-topic --from-beginning

Get the latest 10 Avro Serialized Records from a Topic

kafka-avro-console-consumer --bootstrap-server kafka:9092 \
  --topic my_topic \
  --from-beginning \
  --max-messages 10 \
  --property schema.registry.url=http://schemaregistry:8081

Inspect Consumer Groups and Consumer Lag

kafka-consumer-groups --bootstrap-server SERVER:PORT --all-groups count_errors --describe

Inspect Avro Messages with Confluence Schema Registry Tools

  1. Start temporary debug container

    kubectl -n NAMESPACE run "$(whoami)-analyse" -it --rm \
        --image=myregistry.domain/confluentinc/cp-schema-registry:6.2.7 \
        --restart=Never \
        -- bash
  2. Read 10 messages with kafka-avro-consumer

    kafka-avro-console-consumer --bootstrap-server server:9092 \
      --topic TOPICNAME \
      --from-beginning \
      --max-messages 10 \
      --property schema.registry.url=http://schemaregistry.domain:8081

Avro

avrotools

Download from Maven repository here: https://repo1.maven.org/maven2/org/apache/avro/avro-tools/

$ java -jar avrotools-1.11.0.jar
Version 1.11.1 of Apache Avro
Copyright 2010-2015 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (https://www.apache.org/).
----------------
Available tools:
    canonical  Converts an Avro Schema to its canonical form
          cat  Extracts samples from files
      compile  Generates Java code for the given schema.
       concat  Concatenates avro files without re-compressing.
        count  Counts the records in avro files or folders
  fingerprint  Returns the fingerprint for the schemas.
   fragtojson  Renders a binary-encoded Avro datum as JSON.
     fromjson  Reads JSON records and writes an Avro data file.
     fromtext  Imports a text file into an avro data file.
      getmeta  Prints out the metadata of an Avro data file.
    getschema  Prints out schema of an Avro data file.
          idl  Generates a JSON schema from an Avro IDL file
 idl2schemata  Extract JSON schemata of the types from an Avro IDL file
       induce  Induce schema/protocol from Java class/interface via reflection.
   jsontofrag  Renders a JSON-encoded Avro datum as binary.
       random  Creates a file with randomly generated instances of a schema.
      recodec  Alters the codec of a data file.
       repair  Recovers data from a corrupt Avro Data file
  rpcprotocol  Output the protocol of a RPC service
   rpcreceive  Opens an RPC Server and listens for one message.
      rpcsend  Sends a single RPC message.
       tether  Run a tethered mapreduce job.
       tojson  Dumps an Avro data file as JSON, record per line or pretty.
       totext  Converts an Avro data file to a text file.
     totrevni  Converts an Avro data file to a Trevni file.
  trevni_meta  Dumps a Trevni file's metadata as JSON.
trevni_random  Create a Trevni file filled with random instances of a schema.
trevni_tojson  Dumps a Trevni file as JSON.

Resources:

Serialization Formats

— Davide Icardi source: https://gist.github.com/davideicardi/e8c5a69b98e2a0f18867b637069d03a9

Troubleshooting

I have used sample data from Michael Nolls Github Repository here: https://github.com/miguno/avro-cli-examples

Check File Header with hexdump

hexdump -C -n 200 twitter.avro
00000000  4f 62 6a 01 04 16 61 76  72 6f 2e 73 63 68 65 6d  |Obj...avro.schem|
00000010  61 e8 05 7b 22 74 79 70  65 22 3a 22 72 65 63 6f  |a..{"type":"reco|
00000020  72 64 22 2c 22 6e 61 6d  65 22 3a 22 74 77 69 74  |rd","name":"twit|
00000030  74 65 72 5f 73 63 68 65  6d 61 22 2c 22 6e 61 6d  |ter_schema","nam|
00000040  65 73 70 61 63 65 22 3a  22 63 6f 6d 2e 6d 69 67  |espace":"com.mig|
00000050  75 6e 6f 2e 61 76 72 6f  22 2c 22 66 69 65 6c 64  |uno.avro","field|
00000060  73 22 3a 5b 7b 22 6e 61  6d 65 22 3a 22 75 73 65  |s":[{"name":"use|
00000070  72 6e 61 6d 65 22 2c 22  74 79 70 65 22 3a 22 73  |rname","type":"s|
00000080  74 72 69 6e 67 22 2c 22  64 6f 63 22 3a 22 4e 61  |tring","doc":"Na|
00000090  6d 65 20 6f 66 20 74 68  65 20 75 73 65 72 20 61  |me of the user a|
000000a0  63 63 6f 75 6e 74 20 6f  6e 20 54 77 69 74 74 65  |ccount on Twitte|
000000b0  72 2e 63 6f 6d 22 7d 2c  7b 22 6e 61 6d 65 22 3a  |r.com"},{"name":|
000000c0  22 74 77 65 65 74 22 2c                           |"tweet",|

We can see that there is a schema present, so far, so good …​.

Inspect Metadata

java -jar avro-tools-1.11.0.jar getmeta twitter.avro
avro.schema	{"type":"record","name":"twitter_schema","namespace":"com.miguno.avro","fields":[{"name":"username","type":"string","doc":"Name of the user account on Twitter.com"},{"name":"tweet","type":"string","doc":"The content of the user's Twitter message"},{"name":"timestamp","type":"long","doc":"Unix epoch time in seconds"}],"doc:":"A basic schema for storing Twitter messages"}

Get the Schema

java -jar avro-tools-1.11.0.jar getschema twitter.avro > twitter.avsc

cat twitter.avsc
{
  "type" : "record",
  "name" : "twitter_schema",
  "namespace" : "com.miguno.avro",
  "fields" : [ {
    "name" : "username",
    "type" : "string",
    "doc" : "Name of the user account on Twitter.com"
  }, {
    "name" : "tweet",
    "type" : "string",
    "doc" : "The content of the user's Twitter message"
  }, {
    "name" : "timestamp",
    "type" : "long",
    "doc" : "Unix epoch time in seconds"
  } ],
  "doc:" : "A basic schema for storing Twitter messages"
}

Read Avro Data without Code Generation

Schema schema = new Schema.Parser().parse(new File("article.avsc"));
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File("/tmp/articles.avro"), datumReader);
GenericRecord article = null;
while (dataFileReader.hasNext()) {
    // Reuse article object by passing it to next().
    // Avoids allocating and garbage collecting many objects
    article = dataFileReader.next(article);

    // do stuff ...
}