Kafka streaming – simple producer and consumer

Posted: January 1, 2024 in Hadoop
Tags:

An example for explaining basics of streaming via Apache Kafka with simple producer and consumer.

Apache Kafka is an open-source distributed event store and stream-processing platform (source: https://kafka.apache.org/intro).

Lambda vs Kappa Architecture: https://learn.microsoft.com/en-us/azure/architecture/databases/guide/big-data-architectures

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDP 7.1.7 SP1)
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

Command Line Tools

# create jaas.conf file
cat << EOF > jaas.conf
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
EOF

# create client.properties file
cat << EOF > client.properties
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
EOF

# do kinit
kinit -kt /etc/security/keytabs/<hdfs_user>.keytab <hdfs_user>

# execute kafka-topics command with the export right before
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
kafka-topics --list --bootstrap-server <bootstrap-server_a>:9093,<bootstrap-server_x>:9093 --command-config client.properties


# kafka-console-producer
# kafka-console-consumer
# location: /opt/cloudera/parcels/<path>/bin

Kafka-Basics

Configuration.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class Configuration {
	private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers.config";
	private static final String SECURITY_PROTOCOL = "security.protocol";
	private static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
	
	public static String bootstrap_servers_config;
	public static String security_protocol;
	public static String sasl_kerberos_service_name;
	
	public static void loadConfiguration() throws IOException {
        InputStream input = null;
        try {
            input = new FileInputStream("client.properties");
            Properties prop = new Properties();
            prop.load(input);
            bootstrap_servers_config = prop.getProperty(BOOTSTRAP_SERVERS_CONFIG);
            security_protocol = prop.getProperty(SECURITY_PROTOCOL);
            sasl_kerberos_service_name = prop.getProperty(SASL_KERBEROS_SERVICE_NAME);
        } finally {
            try {
                if (input != null)
                    input.close();
            } catch (IOException e) {
            	System.out.println("Configuration: error");
            	e.printStackTrace();
            }
        }
    }
}

SimpleListOfAllTopics.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.IOException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;

public class SimpleListOfAllTopics {
	public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
		System.out.println("SimpleListOfAllTopics: started");
		Configuration.loadConfiguration();
		
		// Set up client Java properties
		Properties props = new Properties();
		props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
		props.put("security.protocol", Configuration.security_protocol);
		props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
        System.setProperty("java.security.auth.login.config","jaas.conf");
        System.out.println("SimpleListOfAllTopics: set properties done");
        
        try (AdminClient adminClient = AdminClient.create(props)) {
            ListTopicsResult topics = adminClient.listTopics();
            Set<String> topicNames = topics.names().get();
            System.out.println("Topics in the Kafka cluster:");
            topicNames.forEach(System.out::println);
        }
		catch (Exception e) {
			e.printStackTrace();
			System.out.println("SimpleListOfAllTopics: error");
			e.printStackTrace();
		}
	}
}

SimpleProducer.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleProducer {
    public static void main(String[] args) throws IOException {
    	System.out.println("SimpleProducer: started");
    	Configuration.loadConfiguration();
    	
    	// Set up client Java properties
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        props.put("security.protocol", Configuration.security_protocol);
        props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
        System.setProperty("java.security.auth.login.config","jaas.conf");
        System.out.println("SimpleProducer: set properties done");
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            for (long i = 0; i < 10; i++) {
                String key = Long.toString(i + 1);
                String datetime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
                Double temperature = (double) Math.round(Math.random() * 100.0) / 100.0;
                String msg = datetime + ";" + temperature;
                try {
                    ProducerRecord<String, String> data = new ProducerRecord<String, String>("mp_temperature", key, msg);
                    producer.send(data);
                    System.out.println("DEBUG INFO topic: mp_temperature key: " + key + " value: " + msg);
                    System.out.println("SimpleProducer: sent data done");
                    long wait = 5000;
                    Thread.sleep(wait);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("SimpleProducer: error");
                    e.printStackTrace();
                }
            }
        }
    }
}

SimpleConsumer.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {	
    public static void main(String[] args) throws IOException {
    	System.out.println("SimpleConsumer: started");
    	Configuration.loadConfiguration();
    	
        // Set up client Java properties
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "mp");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put("security.protocol", Configuration.security_protocol);
        props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
        System.setProperty("java.security.auth.login.config","jaas.conf");
        System.out.println("SimpleConsumer: set properties done");
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("mp_temperature"));
            System.out.println("SimpleConsumer: subscribe done");
            while (true) {
                try {
                	ConsumerRecords<String, String> records = consumer.poll(100);
                	System.out.println("DEBUG INFO: waiting for data...");
                	if (records.count() > 0) {
                		for (ConsumerRecord<String, String> record : records) {
                			System.out.printf("DEBUG INFO: Offset = %d\n", record.offset());
                			System.out.printf("DEBUG INFO: Key    = %s\n", record.key());
                			System.out.printf("DEBUG INFO: Value  = %s\n", record.value());
                			writeToCsv(record);
                		}
                		System.out.println("SimpleConsumer: received data done");
                	}
                } catch (Exception e) {
                	System.out.println("SimpleConsumer: error");
                    e.printStackTrace();
                }
            }
        }
    }
    
    public static void writeToCsv(ConsumerRecord<String, String> record) throws IOException{
    	FileWriter pw = null;
    	try {
	    	pw = new FileWriter("data.csv",true); 
	    	pw.append(record.value());
	    	pw.append("\n");
	    } catch (Exception e) {
	    	System.out.println("writeToCsv: error");
	        e.printStackTrace();
	    } finally {
	    	pw.flush();
	    	pw.close();
	    }
    }
}

pom.xml (path: /kafka-basics/src/resources)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>eu.placko.examples.kafka.basics</groupId>
  <artifactId>kafka-basics</artifactId>
  <version>1.0</version>
  <name>kafka-basics</name>
  <description>An example for explaining basics of streaming via Apache Kafka with simple producer and consumer</description>
  <packaging>jar</packaging>
  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.1-cdh6.0.0</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>
  <properties>
    <revision>Local-SNAPSHOT</revision>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>	
    <jar.main.class>eu.placko.examples.kafka.basics.SimpleListOfAllTopics</jar.main.class>
  </properties>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.7.0</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
    	<artifactId>maven-assembly-plugin</artifactId>
    	<configuration>
          <archive>
            <manifest>
              <mainClass>eu.placko.examples.kafka.basics.SimpleListOfAllTopics</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
    	</configuration>
	  </plugin>
    </plugins>
    <resources>
   	  <resource>
        <directory>src/main/resources</directory>
        <targetPath>${project.build.directory}</targetPath>
       	<includes>
          <include>jaas.conf</include>
          <include>client.properties</include>
          <include>kafka_run_topics.sh</include>
          <include>kafka_run_producer.sh</include>
          <include>kafka_run_consumer.sh</include>
        </includes>
      </resource>
    </resources>
  </build>
</project>

client.properties (path: /kafka-basics/src/resources)

bootstrap.servers.config=<bootstrap-server_a>:9093,<bootstrap-server_x>:9093

security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka

jaas.conf (path: /kafka-basics/src/resources)

KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};

kafka_run_producer.sh (path: /kafka-basics/src/resources)

#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
  printf "\n\nAn error occurred!\n"
  exit 1
}
trap on_error ERR
java -cp kafka-basics-1.0-jar-with-dependencies.jar eu.placko.examples.kafka.basics.SimpleProducer
exit 0

Result

kafka_run_consumer.sh (path: /kafka-basics/src/resources)

#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
  printf "\n\nAn error occurred!\n"
  exit 1
}
trap on_error ERR
java -cp kafka-basics-1.0-jar-with-dependencies.jar eu.placko.examples.kafka.basics.SimpleConsumer
exit 0

Result

kafka_run_topics.sh (path: /kafka-basics/src/resources)

#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
  printf "\n\nAn error occurred!\n"
  exit 1
}
trap on_error ERR
java -jar kafka-basics-1.0-jar-with-dependencies.jar
exit 0

Result

SimpleListOfAllTopics: started
SimpleListOfAllTopics: set properties done
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Topics in the Kafka cluster:
mp_temperature

Next step (store and visualize data)

note: e.g. based on https://learn.microsoft.com/en-us/power-bi/connect-data/service-real-time-streaming

https://github.com/mplacko/kafka-basics

Leave a comment