When writing test for applications that interact with Kafka brokers we often need to setup a decent environment including an instance of Kafka and ZooKeeper.
Though Kafka ships with some classes for testing, setting up a simple testing environment is quite easier with the kafka-unit library that offers JUnit test rule support or a fast programmatic setup within no time.
In the following short example, I’d like to show how create a simple setup using Maven, Surefire and kafka-unit. [more-1484]#
Maven Setup
Using Maven, we need to add the following two dependencies to our project’s pom.xml:
-
kafka-unit: kafka-unit for initializing Kafka and ZooKeeper
-
junit: our testing framework of choice here
<dependencies>
<dependency>
<groupId>info.batey.kafka</groupId>
<artifactId>kafka-unit</artifactId>
<version>1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
That’s all we need to begin writing tests..
Programmatic Test Setup
In the first example, we’re setting up our Kafka environment programmatically with the help of the KafkaUnit utility class. What we’re doing now is:
-
we’re starting a Kafka broker running on a random port (we might also have specified a port)
-
we’re creating a new Kafka topic named “MyTestTopic“
-
we’re sending a message to the topic
-
we’re fetching all messages from the topic
-
we’re verifying that only one message exists and that it matches our published message
-
we’re stopping the broker
package it;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import info.batey.kafka.unit.KafkaUnit;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class KafkaTest {
KafkaUnit kafkaUnitServer;
@Before
public void setup() throws Exception {
kafkaUnitServer = new KafkaUnit();
kafkaUnitServer.startup();
}
@After
public void tearDown() {
kafkaUnitServer.shutdown();
}
@Test
public void readMessagesFromTopic() throws Exception {
final String topicName = "MyTestTopic";
kafkaUnitServer.createTopic(topicName);
ProducerRecord<String, String> keyedMessage = new ProducerRecord<>(topicName, "greeting",
"Hello world from hascode.com :)");
kafkaUnitServer.sendMessages(keyedMessage);
List<String> allMessages = kafkaUnitServer.readAllMessages(topicName);
assertThat("topic should contain only one message", allMessages.size(), equalTo(1));
assertThat("the message should match the published message", allMessages.get(0), equalTo("Hello world from hascode.com :)"));
}
}
We may now run our tests e.g. using Maven and the command-line like this:
$ mvn test -Dtest=KafkaTest ✭
[..]
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running it.KafkaTest
Created topic "MyTestTopic".
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.731 sec - in it.KafkaTest
Results :
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
Running in an IDE like IntelliJ or EclipseIDE should produce a similar result like this:
Setup using Test Rules
In the second example we’re choosing the more comfortable approach using JUnit rules. The KafkaJUnitRule used as a @Rule handles starting and shutting down the broker for us. Again we’re following these steps:
-
we’re creating a new Kafka topic named “MyTestTopic“
-
we’re sending a message to the topic
-
we’re fetching all messages from the topic
-
we’re verifying that only one message exists and that it matches our published message
package it;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import info.batey.kafka.unit.KafkaUnit;
import info.batey.kafka.unit.KafkaUnitRule;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Rule;
import org.junit.Test;
public class KafkaWithRulesTest {
@Rule
public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
@Test
public void readMessagesFromTopic() throws Exception {
final String topicName = "MyTestTopic";
KafkaUnit kafkaUnitServer = kafkaUnitRule.getKafkaUnit();
kafkaUnitServer.createTopic(topicName);
ProducerRecord<String, String> keyedMessage = new ProducerRecord<>(topicName, "greeting",
"Hello world from hascode.com :)");
kafkaUnitServer.sendMessages(keyedMessage);
List<String> allMessages = kafkaUnitServer.readAllMessages(topicName);
assertThat("topic should contain only one message", allMessages.size(), equalTo(1));
assertThat("the message should match the published message", allMessages.get(0), equalTo("Hello world from hascode.com :)"));
}
}
Running the test on the command-line could look similar to this result:
$ mvn test -Dtest=KafkaWithRulesTest ✭
[..]
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running it.KafkaWithRulesTest
Created topic "MyTestTopic".
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.783 sec - in it.KafkaWithRulesTest
Results :
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
Running in an IDE like IntelliJ or EclipseIDE should produce a similar result like this:
Tutorial Sources
Please feel free to download the tutorial sources from my GitHub repository, fork it there or clone it using Git:
git clone https://github.com/hascode/kafka-testing-tutorial.git
Appendix A: Kafka and Docker
The Docker image spotify/kafka allows to setup Kafka in no time.
Running Kafka and ZooKeeper
The following command starts both ..
docker run -p2181:2181-p9092:9092--rm--envADVERTISED_HOST=localhost --envADVERTISED_PORT=9092 spotify/kafka
Creating a Topic
The following command creates a new topic named “stream-plaintext-input“:
docker exec-ti ID bash
$(find-name'kafka-topics.sh')--create--zookeeper localhost:2181--replication-factor1--partitions1--topic stream-plaintext-input
Other Testing Articles of mine
Please feel free to have a look at other testing tutorial of mine (an excerpt):
And more…
Article Updates
-
2018-05-02: Snippet for starting Kafka with Docker added.
-
2018-03-28: Unused imports removed.