Kafka Java Quickstart with Docker

Goals Setup Kafka and Zookeeper with Docker and docker-compose Create a message consumer and producer in Java Kafka Setup We’re using docker-compose to set up our message broker, zookeper and other stuff using confluent-platform. This is our docker-compose.yaml config file from Confluent’s following GitHub repository. docker-compose.yaml --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.0.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost schema-registry: image: confluentinc/cp-schema-registry:7.0.1 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: image: cnfldemos/kafka-connect-datagen:0.5.0-6.2.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR ksqldb-server: image: confluentinc/cp-ksqldb-server:7.0.1 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - connect ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.0.1 container_name: ksqldb-cli depends_on: - broker - connect - ksqldb-server entrypoint: /bin/sh tty: true ksql-datagen: image: confluentinc/ksqldb-examples:7.0.1 hostname: ksql-datagen container_name: ksql-datagen depends_on: - ksqldb-server - broker - schema-registry - connect command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b broker:29092 1 40 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 40 && \ echo Waiting a few seconds for topic creation to finish... && \ sleep 11 && \ tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: broker:29092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 rest-proxy: image: confluentinc/cp-kafka-rest:7.0.1 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' ...

January 29, 2022 · 8 min · 1500 words · Micha Kops

Logback and Spring Boot - Change Log Level to custom format

Create a custom converter This class converts the well known log levels to a custom format CustomLogLevelConverter.java package com.hascode; public class CustomLogLevelConverter extends ClassicConverter { @Override public String convert(ILoggingEvent event) { switch (event.getLevel().toInt()) { case Level.ERROR_INT: return "ERROR!!!"; case Level.WARN_INT: return "WARN!!"; case Level.INFO_INT: return "INFO!"; case Level.TRACE_INT: return "DEBUG"; default: return event.getLevel().toString(); } } } Register the converter The following Logback config includes some defaults and registers our custom converter. ...

August 19, 2021 · 1 min · 165 words · Micha Kops

GitHub Release Pipeline for Java

Goals Set up Maven build pipeline for a Java 11 app Release Maven artifact on GitHub using GitHub actions Setup Maven Assuming that we have a project named sample-app released for my hascode GitHub account: We’re adding some release information to our project’s pom.xml: pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hascode</groupId> <artifactId>sample-app</artifactId> <version>1.0.0-SNAPSHOT</version>bookmark-manager <name>sample-app</name> <description>hasCode.com Bookmark Manager</description> <scm> <developerConnection>scm:git:https://github.com/hascode/sample-app.git </developerConnection> </scm> <distributionManagement> <repository> <id>github</id> <name>GitHub</name> <url>https://maven.pkg.github.com/hascode/sample-app</url> </repository> </distributionManagement> <properties> <java.version>11</java.version> <project.scm.id>github</project.scm.id> </properties> [..] </project> ...

May 14, 2021 · 2 min · 398 words · Micha Kops

JUnit5 Java Maven Snippet

Goals Use JUnit Maven BOM for version alignment Add minimal dependencies for JUnit5 Java Projekt Setup Excerpt from the Maven project’s pom.xml: <dependencyManagement> <dependencies> <dependency> <groupId>org.junit</groupId> <artifactId>junit-bom</artifactId> <version>5.7.1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> </dependencies> JUnit5 does not work with older versions of the Maven Compiler Plugin and the Surefire Plugin used for test execution. Setting their versions in the pom.xml is done like this: ...

May 14, 2021 · 1 min · 85 words · Micha Kops

Package a Spring Boot App as RPM

Goals Package a Spring Boot Service as RPM Package Configure systemd integration Add install/uninstall hooks to create users, directories etc. Maven Setup <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.2</version> <relativePath/> </parent> <groupId>com.hascode</groupId> <artifactId>sample-app</artifactId> <version>1.0.0-SNAPSHOT</version> <name>sample-app</name> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> (1) <groupId>de.dentrassi.maven</groupId> <artifactId>rpm</artifactId> <version>1.5.0</version> <executions> <execution> <goals> <goal>rpm</goal> </goals> </execution> </executions> <configuration> <packageName>sample-app</packageName> <skipSigning>true</skipSigning> <group>Application/Misc</group> <requires> <require>java-11-openjdk-headless</require> (2) </requires> <entries> <entry> (3) <name>/opt/sample-app</name> <directory>true</directory> <user>root</user> <group>root</group> <mode>0755</mode> </entry> <entry> (4) <name>/opt/sample-app/log</name> <directory>true</directory> <user>sample-app</user> <group>sample-app</group> <mode>0750</mode> </entry> <entry> (5) <name>/opt/sample-app/sample-app.jar</name> <file>${project.build.directory}/${project.build.finalName}.jar</file> <user>root</user> <group>root</group> <mode>0644</mode> </entry> <entry> (6) <name>/usr/lib/systemd/system/sample-app.service</name> <file>${project.basedir}/src/main/dist/sample-app.service</file> <mode>0644</mode> </entry> </entries> <beforeInstallation> (7) <file>${project.basedir}/src/main/dist/preinstall.sh</file> </beforeInstallation> <afterInstallation> <file>${project.basedir}/src/main/dist/postinstall.sh</file> </afterInstallation> <beforeRemoval> <file>${project.basedir}/src/main/dist/preuninstall.sh</file> </beforeRemoval> <license>All rights reserved</license> </configuration> </plugin> </plugins> </build> </project> ...

May 14, 2021 · 3 min · 567 words · Micha Kops

Implementing Reactive Client-Server Communication over TCP or Websockets with RSocket and Java

Reactive design or reactive architecture has an impact on how modern software systems are implemented. RSocket is a project that aims to adapt the benefits of the patterns described in the Reactive Manifesto and resulting tools like Reactive Streams or Reactive Extensions to a formal new communication protocol. RSocket works with TCP, WebSockets and Aeron transport layers and offers additional features like session resumption. In the following tutorial I’m going to demonstrate how to implement simple client-server communication over TCP and Websockets for different interaction models like request-response, request-stream, fire-and-forget and event subscription. ...

November 25, 2018 · 8 min · 1558 words · Micha Kops

Capacity Planning using the Universal Scalability Law with Java and usl4j

Capacity planning is an important task when trying to anticipate resources and scaling factors for our applications. The usl4j library offers us an easy abstraction for Neil J. Gunther’s Universal Scalability Law and allows us to build up a predictive model based on the parameters throughput, latency and concurrent operations. With a basic input set of two of these parameters, we are able to predict how these values change if we change one input parameter so that we can build our infrastructure or systems according to our SLAs. ...

September 30, 2018 · 6 min · 1072 words · Micha Kops

Testing Java Applications for Resilience by Simulating Network Problems with Toxiproxy, JUnit and the Docker Maven Plugin

When implementing distributed systems, client-server architectures and simple applications with network related functionalities, everything is fine when we’re in the development or in the testing stage because the network is reliable and the communicating systems are not as stressed as they are in production. But to sleep well we want to validate how resilient we have implemented our systems, how they behave when the network fails, the latency rises, the bandwidth is limited, connections time out and so on. ...

July 29, 2018 · 9 min · 1836 words · Micha Kops

Using jetstreamDB as in-memory Database for Java

JetstreamDB is a in-memory database engine for Java that claims to be built for ultra-high speed and the ability of managing complex data structures by storing real Java objects instead of serializing data structures to other database specific formats. In the following short example I would like to demonstrate how to create and read items from such a database by building a small article management sample app. ...

June 30, 2018 · 4 min · 654 words · Micha Kops

Analyzing Java Problems – Tools, Snippets and Workflows

When we need to investigate the cause for a dysfunctional Java application we have a plethora of tools available that on the one hand help us in gathering information, artifacts and statistics and on the other hand help us in processing this information and identifying possible problems. The following list of tools, snippets, workflows and information about specific artifacts could provide a starting point for analyzing such problems and covers topics like heap-dumps, thread-dumps, heap-histograms, heap-regions, garbage-collection-logs, hotspot-compiler/codecache-logs, debugging native-memory, tools for heap-dump-analysis, JVM unified logging and more.. ...

April 30, 2018 · 26 min · 5526 words · Micha Kops