An example for explaining basics of streaming via Apache Kafka with simple producer and consumer.
Apache Kafka is an open-source distributed event store and stream-processing platform (source: https://kafka.apache.org/intro).
Lambda vs Kappa Architecture: https://learn.microsoft.com/en-us/azure/architecture/databases/guide/big-data-architectures
Prerequisites
- OS: Linux (RHEL 7.9)
- Hadoop: Cloudera (CDP 7.1.7 SP1)
- Authentication via Kerberos
- OpenJDK 64-Bit 1.8.0_292
Command Line Tools
# create jaas.conf file
cat << EOF > jaas.conf
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
EOF
# create client.properties file
cat << EOF > client.properties
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
EOF
# do kinit
kinit -kt /etc/security/keytabs/<hdfs_user>.keytab <hdfs_user>
# execute kafka-topics command with the export right before
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
kafka-topics --list --bootstrap-server <bootstrap-server_a>:9093,<bootstrap-server_x>:9093 --command-config client.properties
# kafka-console-producer
# kafka-console-consumer
# location: /opt/cloudera/parcels/<path>/bin
Kafka-Basics
Configuration.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)
package eu.placko.examples.kafka.basics;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class Configuration {
private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers.config";
private static final String SECURITY_PROTOCOL = "security.protocol";
private static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
public static String bootstrap_servers_config;
public static String security_protocol;
public static String sasl_kerberos_service_name;
public static void loadConfiguration() throws IOException {
InputStream input = null;
try {
input = new FileInputStream("client.properties");
Properties prop = new Properties();
prop.load(input);
bootstrap_servers_config = prop.getProperty(BOOTSTRAP_SERVERS_CONFIG);
security_protocol = prop.getProperty(SECURITY_PROTOCOL);
sasl_kerberos_service_name = prop.getProperty(SASL_KERBEROS_SERVICE_NAME);
} finally {
try {
if (input != null)
input.close();
} catch (IOException e) {
System.out.println("Configuration: error");
e.printStackTrace();
}
}
}
}
SimpleListOfAllTopics.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)
package eu.placko.examples.kafka.basics;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
public class SimpleListOfAllTopics {
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
System.out.println("SimpleListOfAllTopics: started");
Configuration.loadConfiguration();
// Set up client Java properties
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
props.put("security.protocol", Configuration.security_protocol);
props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
System.setProperty("java.security.auth.login.config","jaas.conf");
System.out.println("SimpleListOfAllTopics: set properties done");
try (AdminClient adminClient = AdminClient.create(props)) {
ListTopicsResult topics = adminClient.listTopics();
Set<String> topicNames = topics.names().get();
System.out.println("Topics in the Kafka cluster:");
topicNames.forEach(System.out::println);
}
catch (Exception e) {
e.printStackTrace();
System.out.println("SimpleListOfAllTopics: error");
e.printStackTrace();
}
}
}
SimpleProducer.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)
package eu.placko.examples.kafka.basics;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class SimpleProducer {
public static void main(String[] args) throws IOException {
System.out.println("SimpleProducer: started");
Configuration.loadConfiguration();
// Set up client Java properties
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
props.put("security.protocol", Configuration.security_protocol);
props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
System.setProperty("java.security.auth.login.config","jaas.conf");
System.out.println("SimpleProducer: set properties done");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (long i = 0; i < 10; i++) {
String key = Long.toString(i + 1);
String datetime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
Double temperature = (double) Math.round(Math.random() * 100.0) / 100.0;
String msg = datetime + ";" + temperature;
try {
ProducerRecord<String, String> data = new ProducerRecord<String, String>("mp_temperature", key, msg);
producer.send(data);
System.out.println("DEBUG INFO topic: mp_temperature key: " + key + " value: " + msg);
System.out.println("SimpleProducer: sent data done");
long wait = 5000;
Thread.sleep(wait);
} catch (Exception e) {
e.printStackTrace();
System.out.println("SimpleProducer: error");
e.printStackTrace();
}
}
}
}
}
SimpleConsumer.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)
package eu.placko.examples.kafka.basics;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class SimpleConsumer {
public static void main(String[] args) throws IOException {
System.out.println("SimpleConsumer: started");
Configuration.loadConfiguration();
// Set up client Java properties
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "mp");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put("security.protocol", Configuration.security_protocol);
props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
System.setProperty("java.security.auth.login.config","jaas.conf");
System.out.println("SimpleConsumer: set properties done");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("mp_temperature"));
System.out.println("SimpleConsumer: subscribe done");
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("DEBUG INFO: waiting for data...");
if (records.count() > 0) {
for (ConsumerRecord<String, String> record : records) {
System.out.printf("DEBUG INFO: Offset = %d\n", record.offset());
System.out.printf("DEBUG INFO: Key = %s\n", record.key());
System.out.printf("DEBUG INFO: Value = %s\n", record.value());
writeToCsv(record);
}
System.out.println("SimpleConsumer: received data done");
}
} catch (Exception e) {
System.out.println("SimpleConsumer: error");
e.printStackTrace();
}
}
}
}
public static void writeToCsv(ConsumerRecord<String, String> record) throws IOException{
FileWriter pw = null;
try {
pw = new FileWriter("data.csv",true);
pw.append(record.value());
pw.append("\n");
} catch (Exception e) {
System.out.println("writeToCsv: error");
e.printStackTrace();
} finally {
pw.flush();
pw.close();
}
}
}
pom.xml (path: /kafka-basics/src/resources)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>eu.placko.examples.kafka.basics</groupId>
<artifactId>kafka-basics</artifactId>
<version>1.0</version>
<name>kafka-basics</name>
<description>An example for explaining basics of streaming via Apache Kafka with simple producer and consumer</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1-cdh6.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<properties>
<revision>Local-SNAPSHOT</revision>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jar.main.class>eu.placko.examples.kafka.basics.SimpleListOfAllTopics</jar.main.class>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>eu.placko.examples.kafka.basics.SimpleListOfAllTopics</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<targetPath>${project.build.directory}</targetPath>
<includes>
<include>jaas.conf</include>
<include>client.properties</include>
<include>kafka_run_topics.sh</include>
<include>kafka_run_producer.sh</include>
<include>kafka_run_consumer.sh</include>
</includes>
</resource>
</resources>
</build>
</project>
client.properties (path: /kafka-basics/src/resources)
bootstrap.servers.config=<bootstrap-server_a>:9093,<bootstrap-server_x>:9093
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
jaas.conf (path: /kafka-basics/src/resources)
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
kafka_run_producer.sh (path: /kafka-basics/src/resources)
#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
printf "\n\nAn error occurred!\n"
exit 1
}
trap on_error ERR
java -cp kafka-basics-1.0-jar-with-dependencies.jar eu.placko.examples.kafka.basics.SimpleProducer
exit 0
Result
kafka_run_consumer.sh (path: /kafka-basics/src/resources)
#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
printf "\n\nAn error occurred!\n"
exit 1
}
trap on_error ERR
java -cp kafka-basics-1.0-jar-with-dependencies.jar eu.placko.examples.kafka.basics.SimpleConsumer
exit 0
Result
kafka_run_topics.sh (path: /kafka-basics/src/resources)
#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
printf "\n\nAn error occurred!\n"
exit 1
}
trap on_error ERR
java -jar kafka-basics-1.0-jar-with-dependencies.jar
exit 0
Result
SimpleListOfAllTopics: started
SimpleListOfAllTopics: set properties done
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Topics in the Kafka cluster:
mp_temperature
Next step (store and visualize data)
note: e.g. based on https://learn.microsoft.com/en-us/power-bi/connect-data/service-real-time-streaming
Source Code
https://github.com/mplacko/kafka-basics
Additional Info
- Kafka
- https://docs.cloudera.com/cdp-private-cloud-base/latest/kafka-managing/topics/kafka-manage-cli-overview.html
- https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/kafka-developing-applications/topics/kafka-develop-example.html
- https://docs.cloudera.com/runtime/latest/kafka-developing-applications/kafka-developing-applications.pdf
- https://docs.cloudera.com/documentation/enterprise/latest/PDF/cloudera-kafka.pdf
- https://docs.cloudera.com/cdp-private-cloud-base/latest/developing-spark-applications/topics/spark-using-spark-streaming.html
- https://github.com/cloudera/kafka-examples
- Lambda vs Kappa
- https://www.waitingforcode.com/data-engineering-cloud/data-architectures-cloud/read#zeta
- https://www.databricks.com/blog/2020/11/20/delta-vs-lambda-why-simplicity-trumps-complexity-for-data-pipelines.html
- https://nexocode.com/blog/posts/lambda-vs-kappa-architecture/
- https://www.kai-waehner.de/blog/2021/09/23/real-time-kappa-architecture-mainstream-replacing-batch-lambda/
- https://www.sqlservercentral.com/articles/advantages-of-kappa-architecture-in-the-modern-data-stack