Posts Tagged ‘Big data’

An example for explaining basics of streaming via Apache Kafka with simple producer and consumer.

Apache Kafka is an open-source distributed event store and stream-processing platform (source: https://kafka.apache.org/intro).

Lambda vs Kappa Architecture: https://learn.microsoft.com/en-us/azure/architecture/databases/guide/big-data-architectures

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDP 7.1.7 SP1)
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

Command Line Tools

# create jaas.conf file
cat << EOF > jaas.conf
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
EOF

# create client.properties file
cat << EOF > client.properties
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
EOF

# do kinit
kinit -kt /etc/security/keytabs/<hdfs_user>.keytab <hdfs_user>

# execute kafka-topics command with the export right before
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
kafka-topics --list --bootstrap-server <bootstrap-server_a>:9093,<bootstrap-server_x>:9093 --command-config client.properties


# kafka-console-producer
# kafka-console-consumer
# location: /opt/cloudera/parcels/<path>/bin

Kafka-Basics

Configuration.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class Configuration {
	private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers.config";
	private static final String SECURITY_PROTOCOL = "security.protocol";
	private static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
	
	public static String bootstrap_servers_config;
	public static String security_protocol;
	public static String sasl_kerberos_service_name;
	
	public static void loadConfiguration() throws IOException {
        InputStream input = null;
        try {
            input = new FileInputStream("client.properties");
            Properties prop = new Properties();
            prop.load(input);
            bootstrap_servers_config = prop.getProperty(BOOTSTRAP_SERVERS_CONFIG);
            security_protocol = prop.getProperty(SECURITY_PROTOCOL);
            sasl_kerberos_service_name = prop.getProperty(SASL_KERBEROS_SERVICE_NAME);
        } finally {
            try {
                if (input != null)
                    input.close();
            } catch (IOException e) {
            	System.out.println("Configuration: error");
            	e.printStackTrace();
            }
        }
    }
}

SimpleListOfAllTopics.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.IOException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;

public class SimpleListOfAllTopics {
	public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
		System.out.println("SimpleListOfAllTopics: started");
		Configuration.loadConfiguration();
		
		// Set up client Java properties
		Properties props = new Properties();
		props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
		props.put("security.protocol", Configuration.security_protocol);
		props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
        System.setProperty("java.security.auth.login.config","jaas.conf");
        System.out.println("SimpleListOfAllTopics: set properties done");
        
        try (AdminClient adminClient = AdminClient.create(props)) {
            ListTopicsResult topics = adminClient.listTopics();
            Set<String> topicNames = topics.names().get();
            System.out.println("Topics in the Kafka cluster:");
            topicNames.forEach(System.out::println);
        }
		catch (Exception e) {
			e.printStackTrace();
			System.out.println("SimpleListOfAllTopics: error");
			e.printStackTrace();
		}
	}
}

SimpleProducer.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleProducer {
    public static void main(String[] args) throws IOException {
    	System.out.println("SimpleProducer: started");
    	Configuration.loadConfiguration();
    	
    	// Set up client Java properties
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        props.put("security.protocol", Configuration.security_protocol);
        props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
        System.setProperty("java.security.auth.login.config","jaas.conf");
        System.out.println("SimpleProducer: set properties done");
        
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            for (long i = 0; i < 10; i++) {
                String key = Long.toString(i + 1);
                String datetime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
                Double temperature = (double) Math.round(Math.random() * 100.0) / 100.0;
                String msg = datetime + ";" + temperature;
                try {
                    ProducerRecord<String, String> data = new ProducerRecord<String, String>("mp_temperature", key, msg);
                    producer.send(data);
                    System.out.println("DEBUG INFO topic: mp_temperature key: " + key + " value: " + msg);
                    System.out.println("SimpleProducer: sent data done");
                    long wait = 5000;
                    Thread.sleep(wait);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.out.println("SimpleProducer: error");
                    e.printStackTrace();
                }
            }
        }
    }
}

SimpleConsumer.java (path: /kafka-basics/src/main/java/eu/placko/examples/kafka/basics)

package eu.placko.examples.kafka.basics;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {	
    public static void main(String[] args) throws IOException {
    	System.out.println("SimpleConsumer: started");
    	Configuration.loadConfiguration();
    	
        // Set up client Java properties
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.bootstrap_servers_config);
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "mp");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put("security.protocol", Configuration.security_protocol);
        props.put("sasl.kerberos.service.name", Configuration.sasl_kerberos_service_name);
        System.setProperty("java.security.auth.login.config","jaas.conf");
        System.out.println("SimpleConsumer: set properties done");
        
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Arrays.asList("mp_temperature"));
            System.out.println("SimpleConsumer: subscribe done");
            while (true) {
                try {
                	ConsumerRecords<String, String> records = consumer.poll(100);
                	System.out.println("DEBUG INFO: waiting for data...");
                	if (records.count() > 0) {
                		for (ConsumerRecord<String, String> record : records) {
                			System.out.printf("DEBUG INFO: Offset = %d\n", record.offset());
                			System.out.printf("DEBUG INFO: Key    = %s\n", record.key());
                			System.out.printf("DEBUG INFO: Value  = %s\n", record.value());
                			writeToCsv(record);
                		}
                		System.out.println("SimpleConsumer: received data done");
                	}
                } catch (Exception e) {
                	System.out.println("SimpleConsumer: error");
                    e.printStackTrace();
                }
            }
        }
    }
    
    public static void writeToCsv(ConsumerRecord<String, String> record) throws IOException{
    	FileWriter pw = null;
    	try {
	    	pw = new FileWriter("data.csv",true); 
	    	pw.append(record.value());
	    	pw.append("\n");
	    } catch (Exception e) {
	    	System.out.println("writeToCsv: error");
	        e.printStackTrace();
	    } finally {
	    	pw.flush();
	    	pw.close();
	    }
    }
}

pom.xml (path: /kafka-basics/src/resources)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>eu.placko.examples.kafka.basics</groupId>
  <artifactId>kafka-basics</artifactId>
  <version>1.0</version>
  <name>kafka-basics</name>
  <description>An example for explaining basics of streaming via Apache Kafka with simple producer and consumer</description>
  <packaging>jar</packaging>
  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.1-cdh6.0.0</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>
  <properties>
    <revision>Local-SNAPSHOT</revision>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>	
    <jar.main.class>eu.placko.examples.kafka.basics.SimpleListOfAllTopics</jar.main.class>
  </properties>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.7.0</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
    	<artifactId>maven-assembly-plugin</artifactId>
    	<configuration>
          <archive>
            <manifest>
              <mainClass>eu.placko.examples.kafka.basics.SimpleListOfAllTopics</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
    	</configuration>
	  </plugin>
    </plugins>
    <resources>
   	  <resource>
        <directory>src/main/resources</directory>
        <targetPath>${project.build.directory}</targetPath>
       	<includes>
          <include>jaas.conf</include>
          <include>client.properties</include>
          <include>kafka_run_topics.sh</include>
          <include>kafka_run_producer.sh</include>
          <include>kafka_run_consumer.sh</include>
        </includes>
      </resource>
    </resources>
  </build>
</project>

client.properties (path: /kafka-basics/src/resources)

bootstrap.servers.config=<bootstrap-server_a>:9093,<bootstrap-server_x>:9093

security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka

jaas.conf (path: /kafka-basics/src/resources)

KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};

kafka_run_producer.sh (path: /kafka-basics/src/resources)

#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
  printf "\n\nAn error occurred!\n"
  exit 1
}
trap on_error ERR
java -cp kafka-basics-1.0-jar-with-dependencies.jar eu.placko.examples.kafka.basics.SimpleProducer
exit 0

Result

kafka_run_consumer.sh (path: /kafka-basics/src/resources)

#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
  printf "\n\nAn error occurred!\n"
  exit 1
}
trap on_error ERR
java -cp kafka-basics-1.0-jar-with-dependencies.jar eu.placko.examples.kafka.basics.SimpleConsumer
exit 0

Result

kafka_run_topics.sh (path: /kafka-basics/src/resources)

#!/bin/sh
# ------------------------------------------------------------------------------
# --- registering trap for errors during script execution
on_error() {
  printf "\n\nAn error occurred!\n"
  exit 1
}
trap on_error ERR
java -jar kafka-basics-1.0-jar-with-dependencies.jar
exit 0

Result

SimpleListOfAllTopics: started
SimpleListOfAllTopics: set properties done
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Topics in the Kafka cluster:
mp_temperature

Next step (store and visualize data)

note: e.g. based on https://learn.microsoft.com/en-us/power-bi/connect-data/service-real-time-streaming

https://github.com/mplacko/kafka-basics

An example for explaining how to connect and run CRUD queries against Phoenix in a Kerberized environment.

Phoenix is a SQL engine on the top of Apache HBase.

Phoenix supports thick and thin connection types:

  • Thick client is faster, but must connect directly to ZooKeeper and HBase RegionServers.
  • Thin client has fewer dependencies and connects through a Phoenix Query Server instance.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDP 7.1.7 SP1)
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292
  • Python 2.7.5

Phoenix – sqlline (Test Phoenix JDBC SQL engine)

*** Caused by: org.apache.hadoop.hbase.TableNotFoundException: SYSTEM.CATALOG ***

Ranger Policy
- A User needs Read/Write/Create privileges on SYSTEM.CATALOG to create table in Phoenix.
- To create a Phoenix table, you MUST have the ability to create the HBase table and you MUST have the ability to write to the SYSTEM.CATALOG table.
$ su <user>
$ kinit -kt /etc/security/keytabs/<user>.keytab <user>
$ cd /opt/cloudera/parcels/CDH/lib/phoenix/bin
$ python sqlline.py -fc FASTCONNECT
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect -p driver org.apache.phoenix.jdbc.PhoenixDriver -p user "none" -p password "none" "jdbc:phoenix:"
Connecting to jdbc:phoenix:
23/05/20 11:00:30 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-phoenix.properties,hadoop-metrics2.properties
Connected to: Phoenix (version 5.1)
Driver: PhoenixEmbeddedDriver (version 5.1)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
sqlline version 1.9.0
0: jdbc:phoenix:> select * from SYSTEM.CATALOG;
+-----------+-------------+-----------------+------------------------------------+---------------+---------------+------------+---------+--------------+--------------+-----------------+-------------+----------------+----------------+---+
| TENANT_ID | TABLE_SCHEM |   TABLE_NAME    |            COLUMN_NAME             | COLUMN_FAMILY | TABLE_SEQ_NUM | TABLE_TYPE | PK_NAME | COLUMN_COUNT | SALT_BUCKETS | DATA_TABLE_NAME | INDEX_STATE | IMMUTABLE_ROWS | VIEW_STATEMENT | D |
+-----------+-------------+-----------------+------------------------------------+---------------+---------------+------------+---------+--------------+--------------+-----------------+-------------+----------------+----------------+---+
|           |             | CUSTOMER        |                                    |               | 0             | u          |         | 3            | null         |                 |             | false          |                |   |
|           |             | CUSTOMER        |                                    | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | CUSTOMER        | ADDRESS                            | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | CUSTOMER        | ID                                 |               | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | CUSTOMER        | NAME                               | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ITEM            |                                    |               | 0             | u          |         | 3            | null         |                 |             | false          |                |   |
|           |             | ITEM            |                                    | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ITEM            | ID                                 |               | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ITEM            | NAME                               | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ITEM            | QUANTITY                           | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ORDER           |                                    |               | 0             | u          |         | 3            | null         |                 |             | false          |                |   |
|           |             | ORDER           |                                    | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ORDER           | CREATION_TIME                      | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ORDER           | CUSTOMER_ID                        | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ORDER           | ID                                 |               | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ORDER_LINE_ITEM |                                    |               | 0             | u          | PK      | 3            | null         |                 |             | false          |                |   |
|           |             | ORDER_LINE_ITEM |                                    | 0             | null          |            |         | null         | null         |                 |             |                |                |   |
|           |             | ORDER_LINE_ITEM | ITEM_ID                            |               | null          |            | PK      | null         | null         |                 |             |                |                |   |
|           |             | ORDER_LINE_ITEM | ORDER_ID                           |               | null          |            | PK      | null         | null         |                 |             |                |                |   |
|           |             | ORDER_LINE_ITEM | SALE_QUANTITY                      | 0             | null          |            | PK      | null         | null         |                 |             |                |                |   |
|           | SYSTEM      | CATALOG         |                                    |               | 0             | s          | PK      | 68           | null         |                 |             | false          |                |   |
...
0: jdbc:phoenix:> !tables
+-----------+-------------+-----------------+--------------+---------+-----------+---------------------------+----------------+------------+
| TABLE_CAT | TABLE_SCHEM |   TABLE_NAME    |  TABLE_TYPE  | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STAT |
+-----------+-------------+-----------------+--------------+---------+-----------+---------------------------+----------------+------------+
|           | SYSTEM      | CATALOG         | SYSTEM TABLE |         |           |                           |                |            |
|           | SYSTEM      | CHILD_LINK      | SYSTEM TABLE |         |           |                           |                |            |
|           | SYSTEM      | FUNCTION        | SYSTEM TABLE |         |           |                           |                |            |
|           | SYSTEM      | LOG             | SYSTEM TABLE |         |           |                           |                |            |
|           | SYSTEM      | MUTEX           | SYSTEM TABLE |         |           |                           |                |            |
|           | SYSTEM      | SEQUENCE        | SYSTEM TABLE |         |           |                           |                |            |
|           | SYSTEM      | STATS           | SYSTEM TABLE |         |           |                           |                |            |
|           | SYSTEM      | TASK            | SYSTEM TABLE |         |           |                           |                |            |
|           |             | CUSTOMER        | TABLE        |         |           |                           |                |            |
|           |             | ITEM            | TABLE        |         |           |                           |                |            |
|           |             | ORDER           | TABLE        |         |           |                           |                |            |
|           |             | ORDER_LINE_ITEM | TABLE        |         |           |                           |                |            |
+-----------+-------------+-----------------+--------------+---------+-----------+---------------------------+----------------+------------+
0: jdbc:phoenix:> !quit
Closing: org.apache.phoenix.jdbc.PhoenixConnection

Phoenix – JDBC Thin Client – CRUD

ClientConnect,java (path: /phoenix-crud/src/main/java/eu/placko/examples/hbase/phoenix)

package eu.placko.examples.hbase.phoenix;

import java.sql.Connection;
import java.sql.DriverManager;

public class ClientConnect {	
	public static void main(String[] args) throws Exception {
		if (args.length < 1) {
			throw new IllegalArgumentException("Usage: eu.placko.examples.hbase.phoenix.ClientConnect \"jdbc:phoenix:thin:url=https://<pqs.endpoint>:8765;serialization=PROTOBUF;authentication=SPNEGO;principal=<user@realm>;keytab=/etc/security/keytabs/<user>.keytab\"");
		    }
		
		connect(args[0]);
    }
	
	private static void connect(String jdbcUrl) throws Exception {
	    Class.forName("org.apache.phoenix.queryserver.client.Driver");
	    
	    try (Connection conn = DriverManager.getConnection(jdbcUrl)){
	    	System.out.println("Connection created");
	    	
	    	ClientOperations cl = new ClientOperations();
	        cl.run(conn);
	    }
    }
}

ClientOperations,java (path: /phoenix-crud/src/main/java/eu/placko/examples/hbase/phoenix)

package eu.placko.examples.hbase.phoenix;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

public class ClientOperations {
	private void createTables(Connection conn) throws SQLException {
	    dropTablesIfExists(conn);
	    
	    System.out.println("Creating tables");
	    Statement stmt = conn.createStatement();
	    stmt.execute("CREATE TABLE IF NOT EXISTS ITEM " +
	        " (id varchar not null primary key, name varchar, quantity integer)");
	    stmt.execute("CREATE TABLE IF NOT EXISTS CUSTOMER " +
	        " (id varchar not null primary key, name varchar, address varchar)");
	    stmt.execute("CREATE TABLE IF NOT EXISTS  \"ORDER\" " +
	        " (id varchar not null primary key, customer_id varchar, creation_time varchar)");
	    stmt.execute("CREATE TABLE IF NOT EXISTS ORDER_LINE_ITEM " +
	        " (order_id varchar not null, item_id varchar not null, sale_quantity integer, "
	        + " constraint pk primary key(order_id, item_id))");
	  }

	  private void dropTablesIfExists(Connection conn) throws SQLException {
	      System.out.println("Dropping tables");
	      Statement stmt = conn.createStatement();
	      stmt.execute("DROP TABLE IF EXISTS ITEM");
	      stmt.execute("DROP TABLE IF EXISTS CUSTOMER");
	      stmt.execute("DROP TABLE IF EXISTS \"ORDER\"");
	      stmt.execute("DROP TABLE IF EXISTS ORDER_LINE_ITEM");
	  }

	  private void populateData(Connection conn) throws SQLException {
	      System.out.println("Populating tables");
	      populateItemData(conn);
	      populateCustomerData(conn);
	  }

	  private void populateItemData(Connection conn) throws SQLException {
	      Statement stmt = conn.createStatement();
	      stmt.execute("UPSERT INTO ITEM VALUES('ITM001','Book', 5)");
	      stmt.execute("UPSERT INTO ITEM VALUES('ITM002','Pen', 5)");
	      stmt.execute("UPSERT INTO ITEM VALUES('ITM003','Soap', 5)");
	      stmt.execute("UPSERT INTO ITEM VALUES('ITM004','Shampoo', 5)");
	      stmt.execute("UPSERT INTO ITEM VALUES('ITM005','Phone', 5)");
	      stmt.execute("UPSERT INTO ITEM VALUES('ITM006','Charger', 5)");
	      conn.commit();
	  }

	  private void populateCustomerData(Connection conn) throws SQLException {
	      Statement stmt = conn.createStatement();
	      stmt.execute("UPSERT INTO CUSTOMER VALUES('CU001','John', 'foo')");
	      stmt.execute("UPSERT INTO CUSTOMER VALUES('CU002','Angel', 'faa')");
	      stmt.execute("UPSERT INTO CUSTOMER VALUES('CU003','David', 'soo')");
	      stmt.execute("UPSERT INTO CUSTOMER VALUES('CU004','Robert', 'laa')");
	      stmt.execute("UPSERT INTO CUSTOMER VALUES('CU005','James', 'naa')");
	      conn.commit();
	  }

	  /**
	   *
	   * @param conn: connection used for performing operation
	   * @param orderId: Order ID of the Creating Order
	   * @param customerId: Customer ID of the customer made an order
	   * @param itemVsQuantity: Items selected with quantities for order
	   * @throws SQLException
	   */
	  private void createOrder(Connection conn, String orderId, String customerId,
	      Map<String, Integer> itemVsQuantity) throws SQLException {
	      Statement stmt = conn.createStatement();
	      stmt.execute("UPSERT INTO \"ORDER\" VALUES('" + orderId + "','" + customerId + "',"
	          + " CURRENT_DATE()||' '|| CURRENT_TIME())");
	      for(Entry<String, Integer> item: itemVsQuantity.entrySet()) {
	          String itemID = item.getKey();
	          int saleQuantity = item.getValue();
	          stmt.execute("UPSERT INTO ORDER_LINE_ITEM VALUES('"+ orderId+"','" +itemID+"',1)");
	          stmt.execute("UPSERT INTO ITEM(ID, QUANTITY)"
	              + " SELECT '"+itemID+"', QUANTITY - " + saleQuantity + " FROM ITEM "
	              + " WHERE ID = '" + itemID + "'");
	      }
	  }
	
	public void run(Connection conn) throws SQLException {
		conn.setAutoCommit(false);
		
		createTables(conn);
		populateData(conn);
	    
		System.out.println("*** CREATING ORDER***");
	    Map<String, Integer> orderItems = new HashMap<>();
	    orderItems.put("ITM001", 2);
	    orderItems.put("ITM002", 3);
	    orderItems.put("ITM003", 2);
	    createOrder(conn, "OR001", "CU001", orderItems);
	    try {
	    	conn.commit();
	    } catch (SQLException e) {
	    	System.out.println(e.getMessage());
		}
		System.out.println("Found " + countRows("\"ORDER\"", conn) + " record(s) in table ORDER");
		
		System.out.println("*** RESULTS ***");
		showTables("ITEM", conn);
		showTables("CUSTOMER", conn);
		showTables("\"ORDER\"", conn);
		showTables("ORDER_LINE_ITEM", conn);

		conn.close();
	  }
	
	private void showTables(String tableName, Connection conn) throws SQLException {
		System.out.println("SELECT * FROM " + tableName);
		Statement stmt = conn.createStatement();
		ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
		while (rs.next()) {
		  System.out.println(rs.getString(1) + " | " + rs.getString(2) + " | " + rs.getString(3));
		}
	}
	
	private int countRows(String tableName, Connection conn) throws SQLException {
		Statement stmt = conn.createStatement();
		try (ResultSet results = stmt.executeQuery("SELECT COUNT(1) FROM " + tableName)) {
	      if (!results.next()) {
	        throw new RuntimeException("Query should have result!");
	      }
	      return results.getInt(1);
	    }
	}
}

pom.xml (path: /phoenix-crud/)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>eu.placko.examples.hbase.phoenix</groupId>
  <artifactId>phoenix-crud</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>phoenix-crud</name>
  <description>An example for explaining how to work with HBase/Phoenix JDBC Thin Client – CRUD</description>
  <packaging>jar</packaging>
  <properties>
		<revision>Local-SNAPSHOT</revision>
		<maven.compiler.source>1.7</maven.compiler.source>
		<maven.compiler.target>1.7</maven.compiler.target>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<jar.main.class>eu.placko.examples.hbase.phoenix.ClientConnect</jar.main.class>
	</properties>
	
	<!-- Phoenix -->
	<dependencies>
		<dependency>
    		<groupId>org.apache.phoenix</groupId>
    		<artifactId>phoenix-queryserver-client</artifactId>
    		<version>6.0.0.7.1.7.67-1</version>
		</dependency>
	</dependencies>
	
  <build>
	<plugins>
		<plugin>
    		<artifactId>maven-assembly-plugin</artifactId>
    			<configuration>
        			<archive>
            			<manifest>
                			<mainClass>eu.placko.examples.hbase.phoenix.ClientConnect</mainClass>
            			</manifest>
        			</archive>
        			<descriptorRefs>
            			<descriptorRef>jar-with-dependencies</descriptorRef>
        			</descriptorRefs>
    			</configuration>
		</plugin>
	</plugins>
	<pluginManagement />
  </build>
</project>

README.md (path: /phoenix-crud/)

HOW TO CONFIGURE THE PROJECT
  
Building and Running
   
Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install assembly:single
   
Run
$ su <user>
$ cd /home/<user>
$ chmod 770 ./hbase/phoenix-crud-0.0.1-SNAPSHOT-jar-with-dependencies.jar
$ chown <user>:<user> ./hbase/phoenix-crud-0.0.1-SNAPSHOT-jar-with-dependencies.jar
$ kinit -kt /etc/security/keytabs/<user>.keytab <user>
$ java -jar ./hbase/phoenix-crud-0.0.1-SNAPSHOT-jar-with-dependencies.jar "jdbc:phoenix:thin:url=https://<pqs.endpoint>:8765;serialization=PROTOBUF;authentication=SPNEGO;principal=<user>@<realm>;keytab=/etc/security/keytabs/<user>.keytab"

Result

Connection created
Dropping tables
Creating tables
Populating tables
*** CREATING ORDER***
Found 1 record(s) in table ORDER
*** RESULTS ***
SELECT * FROM ITEM
ITM001 | Book | 3
ITM002 | Pen | 2
ITM003 | Soap | 3
ITM004 | Shampoo | 5
ITM005 | Phone | 5
ITM006 | Charger | 5
SELECT * FROM CUSTOMER
CU001 | John | foo
CU002 | Angel | faa
CU003 | David | soo
CU004 | Robert | laa
CU005 | James | naa
SELECT * FROM "ORDER"
OR001 | CU001 | 2023-05-20 14:45:44
SELECT * FROM ORDER_LINE_ITEM
OR001 | ITM001 | 1
OR001 | ITM002 | 1
OR001 | ITM003 | 1

Source Code

https://github.com/mplacko/phoenix-crud

Additional Info

An example for explaining how to work with HBase Java API – CRUD.

What is HBase? See: HBase – Operational Database on Hadoop – Part: Basics and Shell

How many column families? HBase currently does not do well with anything above two or three column families so keep the number of column families in your schema low. See: https://hbase.apache.org/book.html#number.of.cfs

Structure of HBase Key-Value object
Key: row_key | col_family | col_qualifier | timestamp
Value: cell_vallue

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDP 7.1.7 SP1)
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

HBase Java API – CRUD

HBaseClientConnect,java (path: /hbase-crud/src/main/java/eu/placko/examples/hbase/)

package eu.placko.examples.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.*;

public class HBaseClientConnect {
	public static void main(String[] args) throws IOException {
        new HBaseClientConnect().connect();
    }
	
	private void connect() throws IOException {
        Configuration config = HBaseConfiguration.create();

        try {
            HBaseAdmin.available(config);
            System.out.println("\n*** HBase is running. ***");
        } catch (MasterNotRunningException ex) {
            System.out.println("\n*** HBase is not running. ***" + ex.getMessage());
            return;
        }

        HBaseClientOperations HBaseClientOperations = new HBaseClientOperations();
        HBaseClientOperations.run(config);
    }
}

HBaseClientOperations.java (path: /hbase-crud/src/main/java/eu/placko/examples/hbase/)

package eu.placko.examples.hbase;

import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseClientOperations {
	private static final TableName tb = TableName.valueOf("shop");
    private static final byte[] cf = Bytes.toBytes("shop");
    private static final byte[] rk1 = Bytes.toBytes("1");
    private static final byte[] rk2 = Bytes.toBytes("2");
    private static final byte[] cq1 = Bytes.toBytes("category");
    private static final byte[] cq2 = Bytes.toBytes("product");
    private static final byte[] cq3 = Bytes.toBytes("size_eu");
    private static final byte[] cq4 = Bytes.toBytes("color");
    private static final byte[] cq5 = Bytes.toBytes("sex");
    private static final byte[] cq6 = Bytes.toBytes("price_eu");
	
	public void run(final Configuration config) throws IOException {
        try (final Connection connection = ConnectionFactory.createConnection(config)) {
            final Admin admin = connection.getAdmin();
            deleteTable(admin);
            createTable(admin);
            
            final Table table = connection.getTable(tb);
            put(table);
            get(table);
            update(table);
            delete(admin);
            
            connection.close();
        }
    }
	
	public static void deleteTable(final Admin admin) throws IOException {
        if (admin.tableExists(tb)) {
            admin.disableTable(tb);
            admin.deleteTable(tb);
        }
    }
	
	public static void createTable(final Admin admin) throws IOException {
        if(!admin.tableExists(tb)) {
            TableDescriptor desc = TableDescriptorBuilder.newBuilder(tb)
                    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf))
                    .build();
            admin.createTable(desc);
        }
    }
	
	public static void put(final Table table) throws IOException {
		System.out.println("\n*** Create/Insert - BEGIN ***");
		
		table.put(new Put(rk1).addColumn(cf, cq1, Bytes.toBytes("shoes")));
		table.put(new Put(rk1).addColumn(cf, cq2, Bytes.toBytes("productA")));
		table.put(new Put(rk1).addColumn(cf, cq3, Bytes.toBytes("42")));
		table.put(new Put(rk1).addColumn(cf, cq4, Bytes.toBytes("black")));
		table.put(new Put(rk1).addColumn(cf, cq5, Bytes.toBytes("m")));
		table.put(new Put(rk1).addColumn(cf, cq6, Bytes.toBytes("44.50")));
		
		table.put(new Put(rk2).addColumn(cf, cq1, Bytes.toBytes("shoes")));
		table.put(new Put(rk2).addColumn(cf, cq2, Bytes.toBytes("productA")));
		table.put(new Put(rk2).addColumn(cf, cq3, Bytes.toBytes("42")));
		table.put(new Put(rk2).addColumn(cf, cq4, Bytes.toBytes("white")));
		table.put(new Put(rk2).addColumn(cf, cq5, Bytes.toBytes("m")));
		table.put(new Put(rk2).addColumn(cf, cq6, Bytes.toBytes("40.50")));
		
		System.out.println("OK");
		
		System.out.println("*** Create/Insert - END ***");
    }
	
	public static void get(final Table table) throws IOException {
        System.out.println("\n*** Read/Select - BEGIN ***");

        //System.out.println(table.get(new Get(Bytes.toBytes("1"))));
        //System.out.println(table.get(new Get(Bytes.toBytes("2"))));
        
        for (int i = 1; i < 3; i++) {
        	Get get = new Get(Bytes.toBytes(Integer.toString(i)));
        	Result result = table.get(get);
        	String row = Bytes.toString(result.getRow());
        	//String specificValue = Bytes.toString(result.getValue(Bytes.toBytes(Bytes.toString(cf)), 
Bytes.toBytes(Bytes.toString(cq1))));
        	//System.out.println("latest cell value in shoes:category for row 1 is: " + specificValue);
        
        	// Traverse entire returned rows: 1 and 2
        	System.out.println(row);
        	NavigableMap<byte[], NavigableMap<byte[],NavigableMap<Long,byte[]>>> map = result.getMap();
        	for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> navigableMapEntry : map.entrySet()) {
        		String family = Bytes.toString(navigableMapEntry.getKey());
        		System.out.println("\t" + family);
        		NavigableMap<byte[], NavigableMap<Long, byte[]>> familyContents = navigableMapEntry.getValue();
        		for (Map.Entry<byte[], NavigableMap<Long, byte[]>> mapEntry : familyContents.entrySet()) {
        			String qualifier = Bytes.toString(mapEntry.getKey());
        			System.out.println("\t\t" + qualifier);
        			NavigableMap<Long, byte[]> qualifierContents = mapEntry.getValue();
        			for (Map.Entry<Long, byte[]> entry : qualifierContents.entrySet()) {
        				Long timestamp = entry.getKey();
        				String value = Bytes.toString(entry.getValue());
        				System.out.printf("\t\t\t%s, %d\n", value, timestamp);
        			}
        		}
        	}
        }
        
        System.out.println("*** Read/Select - End ***");
    }
	
	public static void update(final Table table) throws IOException {
        System.out.println("\n*** Update - BEGIN ***");

        table.put(new Put(rk1).addColumn(cf, cq1, Bytes.toBytes("shoes")));
		table.put(new Put(rk1).addColumn(cf, cq2, Bytes.toBytes("productA")));
		table.put(new Put(rk1).addColumn(cf, cq3, Bytes.toBytes("42")));
		table.put(new Put(rk1).addColumn(cf, cq4, Bytes.toBytes("black")));
		table.put(new Put(rk1).addColumn(cf, cq5, Bytes.toBytes("m")));
		table.put(new Put(rk1).addColumn(cf, cq6, Bytes.toBytes("42.50")));
		
		System.out.println("OK");
		get(table);
		
        System.out.println("*** Update - End ***");
    }
	
	public static void delete(final Admin admin) throws IOException {
        System.out.println("\n*** Delete - BEGIN ***");

        deleteTable(admin);
        System.out.println("OK");
        
        System.out.println("*** Delete - End ***");
    }
}

pom.xml (path: /hbase-crud/)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>eu.placko.examples.hbase</groupId>
  <artifactId>hbase-crud</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>hbase-crud</name>
  <description>An example for explaining how to work with HBase Java API – CRUD</description>
  <packaging>jar</packaging>
  <properties>
		<revision>Local-SNAPSHOT</revision>
		<maven.compiler.source>1.7</maven.compiler.source>
		<maven.compiler.target>1.7</maven.compiler.target>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<jar.main.class>eu.placko.examples.hbase.HBaseClientConnect</jar.main.class>
	</properties>
	
	<!-- HBase -->
	<dependencies>
		<dependency>
    		<groupId>org.apache.hbase</groupId>
    		<artifactId>hbase-client</artifactId>
    		<version>2.5.2</version>
		</dependency>
	</dependencies>
	
  <build>
	<plugins>
		<plugin>
    		<artifactId>maven-assembly-plugin</artifactId>
    			<configuration>
        			<archive>
            			<manifest>
                			<mainClass>eu.placko.examples.hbase.HBaseClientConnect</mainClass>
            			</manifest>
        			</archive>
        			<descriptorRefs>
            			<descriptorRef>jar-with-dependencies</descriptorRef>
        			</descriptorRefs>
    			</configuration>
		</plugin>
	</plugins>
	<pluginManagement />
  </build>
</project>

README.md (path: /hbase-crud/)

HOW TO CONFIGURE THE PROJECT
 
path: /hbase-crud/src/main/resources/
add core-site.xml from /etc/hbase/conf.cloudera.hbase/
add hbase-site.xml from /etc/hbase/conf.cloudera.hbase/
 
Building and Running
  
Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install assembly:single
  
Run
$ su <user>
$ cd /home/<user>
$ chmod 770 ./hbase/hbase-crud-0.0.1-SNAPSHOT-jar-with-dependencies.jar
$ chown <user>:<user> ./hbase/hbase-crud-0.0.1-SNAPSHOT-jar-with-dependencies.jar
$ kinit -kt /etc/security/keytabs/<user>.keytab <user>
$ java -jar ./hbase/hbase-crud-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Result

*** HBase is running. ***

*** Create/Insert - BEGIN ***
OK
*** Create/Insert - END ***

*** Read/Select - BEGIN ***
1
        shop
                category
                        shoes, 1674459300795
                color
                        black, 1674459300856
                price_eu
                        44.50, 1674459300872
                product
                        productA, 1674459300836
                sex
                        m, 1674459300864
                size_eu
                        42, 1674459300845
2
        shop
                category
                        shoes, 1674459300879
                color
                        white, 1674459300916
                price_eu
                        40.50, 1674459300931
                product
                        productA, 1674459300892
                sex
                        m, 1674459300924
                size_eu
                        42, 1674459300904
*** Read/Select - End ***

*** Update - BEGIN ***
OK

*** Read/Select - BEGIN ***
1
        shop
                category
                        shoes, 1674459300968
                color
                        black, 1674459301013
                price_eu
                        42.50, 1674459301029
                product
                        productA, 1674459300997
                sex
                        m, 1674459301022
                size_eu
                        42, 1674459301006
2
        shop
                category
                        shoes, 1674459300879
                color
                        white, 1674459300916
                price_eu
                        40.50, 1674459300931
                product
                        productA, 1674459300892
                sex
                        m, 1674459300924
                size_eu
                        42, 1674459300904
*** Read/Select - End ***
*** Update - End ***

*** Delete - BEGIN ***
OK
*** Delete - End ***
1st step: Create/Insert
 ________________________________
/               /t1 (version 1) /
|_______________|_______________|
|row_key        |cf:cq6         |
|_______________|_______________|
|1              |44.50          |
|_______________|_______________|
|2              |40.50          |
|_______________|_______________|

2nd step: Update
   ________________________________
  /               /t1 (version 1) /
 /_______________/_______________/
/               /t2 (version 2) /
|_______________|_______________|
|row_key        |cf:cq6         |
|_______________|_______________|
|1              |42.50          |
|_______________|_______________|
|2              |40.50          |
|_______________|_______________|

Source Code

https://github.com/mplacko/hbase-crud

Additional Info

An example for explaining how to work with HBase Shell – CRUD.

HBase is an open source and sorted map data suitable for sparse datasets (i.e. a lot of columns have null values) built on Hadoop. It is column oriented, NoSQL and horizontally scalable. It is based on Google’s Big Table. It has set of tables which keep data in key value format. Fits when you need random, real-time read/write access. HBase offers an alternative to Hive which is based on HDFS and has a write-once, read-many approach.

HBaseRDBMS
HBase is schema-less, it doesn’t have the concept of fixed columns schema; defines only column families.An RDBMS is governed by its schema, which describes the whole structure of tables.
It is built for wide tables. HBase is horizontally scalable.It is thin and built for small tables. Hard to scale.
No transactions exist in HBase.RDBMS is transactional.
It has de-normalized data tables.It will have normalized data.
It is good for semi-structured as well as structured data.It is good for structured data.
source: https://jcsites.juniata.edu/faculty/rhodes/smui/hbase.htm

Concept: the map is indexed by a row key, column key and a timestamp (following by an example based on geonames-all-cities-with-a-population-1000.csv):

ROW COLUMN+CELL
1000006 column=admin1_code:, timestamp=1672844046876, value=DC24
1000006 column=admin2_code:, timestamp=1672844046876, value=KZN245
1000006 column=admin3_code:, timestamp=1672844046876, value=
1000006 column=admin4_code:, timestamp=1672844046876, value=13912
1000006 column=alternate_names:, timestamp=1672844046876, value=P
1000006 column=ascii_name:, timestamp=1672844046876, value=Greytown
1000006 column=cou_name_en:, timestamp=1672844046876, value=
1000006 column=country_code:, timestamp=1672844046876, value=South Africa
1000006 column=country_code_2:, timestamp=1672844046876, value=02
1000006 column=dem:, timestamp=1672844046876, value=Africa/Johannesburg
1000006 column=elevation:, timestamp=1672844046876, value=1050
1000006 column=feature_class:, timestamp=1672844046876, value=PPLA3
1000006 column=feature_code:, timestamp=1672844046876, value=ZA
1000006 column=geoname_id:, timestamp=1672844046876, value=Greytown
1000006 column=label_en:, timestamp=1672844046876, value=-29.06415,30.59279
1000006 column=modification_date:, timestamp=1672844046876, value=South Africa
1000006 column=name:, timestamp=1672844046876, value=Greytown
1000006 column=population:, timestamp=1672844046876, value=
1000006 column=timezone:, timestamp=1672844046876, value=2012-07-12

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDP 7.1.7 SP1)
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

HBase Shell – CRUD

--- Ranger Policy
HBase > read / write / create / execute > <user>
su <user>

kinit -kt /etc/security/keytabs/<user>.keytab <user>

hdfs dfs -mkdir /user/<user>/hbase
hdfs dfs -put /home/<user>/hbase/geonames-all-cities-with-a-population-1000.csv /user/<user>/hbase
hbase shell
create 'cities1000', 'geoname_id', 'name', 'ascii_name', 'alternate_names', 'feature_class', 'feature_code', 'country_code', 'cou_name_en', 'country_code_2', 'admin1_code', 'admin2_code', 'admin3_code', 'admin4_code', 'population', 'elevation', 'dem', 'timezone', 'modification_date', 'label_en', 'coordinates'
exit

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=';' -Dimporttsv.columns=HBASE_ROW_KEY,geoname_id,name,ascii_name,alternate_names,feature_class,feature_code,country_code,cou_name_en,country_code_2,admin1_code,admin2_code,admin3_code,admin4_code,population,elevation,dem,timezone,modification_date,label_en,coordinates cities1000 /user/vsl3509/hbase/geonames-all-cities-with-a-population-1000.csv

hbase shell
list
describe 'cities1000'
scan 'cities1000', {'LIMIT' => 10}
exit

--- OPTIONAL
hbase shell
disable 'cities1000'
drop 'cities1000'
exit
hbase shell

--- operation: CREATE (Insert)
hbase:001:0> put 'cities1000', '9999999', 'ascii_name', 'test'

--- operation: READ (Select)
hbase:002:0> get 'cities1000', '9999999', {COLUMN => 'ascii_name'}
COLUMN                         CELL
 ascii_name:                   timestamp=1672913385003, value=test

--- operation: UPDATE
hbase:003:0> put 'cities1000', '9999999', 'ascii_name', 'test_new'
hbase:004:0> get 'cities1000', '9999999', {COLUMN => 'ascii_name'}
COLUMN                         CELL
 ascii_name:                   timestamp=1672913636442, value=test_new

--- operation: DELETE
hbase:005:0> deleteall 'cities1000', '9999999'
hbase:006:0> get 'cities1000', '9999999'
COLUMN                         CELL
0 row(s)

exit

--- SOURCE: https://www.guru99.com/hbase-shell-general-commands.html

Additional Info

An example for explaining batch (Spark Crunch Indexer) for Cloudera Search.

Cloudera Search offers the following methods for indexing data at scale:

  • batch indexing (Spark or MapReduce indexing: MapReduceIndexerTool or Lily HBase batch indexing)
  • NRT indexing (Lily HBase NRT indexing or Flume NRT indexing)

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDP 7.1.7 SP1)
  • Scala: 2.11.12
  • Spark: 2.4.7.7.1.7.1044-1
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

HDFS commands

hdfs dfs -mkdir /user/solrsearch/cities
hdfs dfs -put -f /home/solrsearch/cities/geonames-all-cities-with-a-population-1000.csv /user/solrsearch/cities

Hive queries

DROP TABLE IF EXISTS cities;

CREATE EXTERNAL TABLE cities
(
geoname_id		    string,
name			    string,
ascii_name		    string,
alternate_names		string,
feature_class		string,
feature_code		string,
country_code		string,
cou_name_en		    string,
country_code_2		string,
admin1_code		    string,
admin2_code		    string,
admin3_code		    string,
admin4_code		    string,
population		    integer,
elevation		    string,
dem			        integer,
timezone		    string,
modification_date	date,
label_en		    string,
coordinates		    string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES("separatorChar"=";","quoteChar"="\"")
LOCATION '/user/solrsearch/cities'
TBLPROPERTIES
(
"skip.header.line.count"="1",
'external.table.purge'='true'
);

SELECT * FROM solrsearch.cities LIMIT 100;

Solr commands

# Cloudera  Zookeeper services:
democdhm01.placko.eu
democdhm02.placko.eu
democdhd03.placko.eu
# Cloudera SolR service:
democdhd01.placko.eu
democdhd02.placko.eu
democdhd03.placko.eu

su solrsearch
kinit -kt /etc/security/keytabs/solrsearch.keytab solrsearch
# optional: klist
 
/home/solrsearch
 
 
solrctl instancedir --generate $HOME/cities
 
# modify the managed-schema and add morphline file (change the zkHost if needed)
solrctl config --upload cities $HOME/cities -overwrite
 
# optional: solrctl instancedir --list
# optional: solrctl instancedir --delete cities
 
# create collection based on the configuration uploaded before
solrctl collection --create cities -s 1 -r 3 -c cities
# optional: solrctl collection --list
# optional: solrctl collection --delete cities
 

curl --negotiate -u: https://democdhd01.placko.eu:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt
curl --negotiate -u: https://democdhd01.placko.e:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt
curl --negotiate -u: https://democdhd01.placko.e:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt
 
export myDriverJarDir=/opt/cloudera/parcels/CDH/lib/solr/contrib/crunch
export myDriverJar=$(find $myDriverJarDir -maxdepth 1 -name 'search-crunch-*.jar' ! -name '*-job.jar' ! -name '*-sources.jar')
export myDependencyJarDir=/opt/cloudera/parcels/CDH/lib/search/lib/search-crunch
export myJVMOptions="-DmaxConnectionsPerHost=10000 -DmaxConnections=10000 -Djava.io.tmpdir=/tmp/"
export myDependencyJarFiles=$(find $myDependencyJarDir -name '*.jar' | sort | tr '\n' ',' | head -c -1)

spark-submit \
--name "SparkToSolrIngest-cities" \
--master yarn \
--deploy-mode cluster \
--jars $myDependencyJarFiles \
--executor-memory 16G \
--driver-memory 16G \
--conf "spark.executor.extraJavaOptions=$myJVMOptions" \
--driver-java-options "$myJVMOptions" \
--class org.apache.solr.crunch.CrunchIndexerTool \
--files $HOME/tokenFile.txt,$HOME/cities/conf/morphline.conf \
$myDriverJar \
-D hadoop.tmp.dir=/tmp \
-D tokenFile=tokenFile.txt \
-D morphlineVariable.ZK_HOST=$(hostname):2181/solr \
--morphline-file morphline.conf \
--pipeline-type spark \
--chatty \
hdfs://nameservice1/user/solrsearch/cities

managed-schema

<?xml version="1.0" encoding="UTF-8" ?>

<schema name="cities" version="1.6">
	<field name="geoname_id" type="string" indexed="true" stored="true" required="true" multiValued="false"/>
	<field name="name" type="text_autocomplete" indexed="true" stored="true" omitNorms="true" omitTermFreqAndPositions="true"/>
	<field name="ascii_name" type="string" indexed="true" stored="true"/>
	<field name="alternate_names" type="string" indexed="true" stored="true"/>
	<field name="feature_class" type="string" indexed="true" stored="true"/>
	<field name="feature_code" type="string" indexed="true" stored="true"/>
	<field name="country_code" type="string" indexed="true" stored="true"/>
	<field name="cou_name_en" type="string" indexed="true" stored="true"/>
	<field name="country_code_2" type="string" indexed="true" stored="true"/>
	<field name="admin1_code" type="string" indexed="true" stored="true"/>
	<field name="admin2_code" type="string" indexed="true" stored="true"/>
	<field name="admin3_code" type="string" indexed="true" stored="true"/>
	<field name="admin4_code" type="string" indexed="true" stored="true"/>
	<field name="population" type="string" indexed="true" stored="true"/>
	<field name="elevation" type="string" indexed="true" stored="true"/>
	<field name="dem" type="string" indexed="true" stored="true"/>
	<field name="timezone" type="string" indexed="true" stored="true"/>
	<field name="modification_date" type="string" indexed="true" stored="true"/>
	<field name="label_en" type="string" indexed="true" stored="true"/>
	<field name="coordinates" type="string" indexed="true" stored="true"/>
        <field name="_version_" type="long" indexed="true" stored="true"/>
	<uniqueKey>geoname_id</uniqueKey>
	<fieldType name="text_autocomplete" class="solr.TextField" positionIncrementGap="100">
		<analyzer type="multiterm">
			<tokenizer class="solr.KeywordTokenizerFactory"/>
			<filter class="solr.GermanNormalizationFilterFactory"/>	
			<filter class="solr.LowerCaseFilterFactory"/>
			<filter class="solr.EdgeNGramFilterFactory" minGramSize="3" maxGramSize="10" />
		</analyzer>
		<analyzer type="query">
			<tokenizer class="solr.KeywordTokenizerFactory"/>
			<filter class="solr.GermanNormalizationFilterFactory"/>		
			<filter class="solr.LowerCaseFilterFactory"/>
		</analyzer>
	</fieldType>
	<fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
	<fieldType name="string" class="solr.StrField" sortMissingLast="true" docValues="true" />
	<fieldType name="strings" class="solr.StrField" sortMissingLast="true" multiValued="true" docValues="true" />
	<fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
	<fieldType name="booleans" class="solr.BoolField" sortMissingLast="true" multiValued="true"/>
	<fieldType name="pint" class="solr.IntPointField" docValues="true"/>
	<fieldType name="pfloat" class="solr.FloatPointField" docValues="true"/>
	<fieldType name="plong" class="solr.LongPointField" docValues="true"/>
	<fieldType name="pdouble" class="solr.DoublePointField" docValues="true"/>
	<fieldType name="pints" class="solr.IntPointField" docValues="true" multiValued="true"/>
	<fieldType name="pfloats" class="solr.FloatPointField" docValues="true" multiValued="true"/>
	<fieldType name="plongs" class="solr.LongPointField" docValues="true" multiValued="true"/>
	<fieldType name="pdoubles" class="solr.DoublePointField" docValues="true" multiValued="true"/>
	<fieldType name="pdate" class="solr.DatePointField" docValues="true"/>
	<fieldType name="pdates" class="solr.DatePointField" docValues="true" multiValued="true"/>
	<fieldType name="binary" class="solr.BinaryField"/>
	<fieldType name="text_general" class="solr.TextField" positionIncrementGap="100" multiValued="true">
		<analyzer type="index">
			<tokenizer class="solr.StandardTokenizerFactory"/>
			<filter class="solr.StopFilterFactory" ignoreCase="true" words="stopwords.txt" />
			<filter class="solr.LowerCaseFilterFactory"/>
		</analyzer>
		<analyzer type="query">
			<tokenizer class="solr.StandardTokenizerFactory"/>
			<filter class="solr.StopFilterFactory" ignoreCase="true" words="stopwords.txt" />
			<filter class="solr.SynonymGraphFilterFactory" synonyms="synonyms.txt" ignoreCase="true" expand="true"/>
			<filter class="solr.LowerCaseFilterFactory"/>
		</analyzer>
	</fieldType>
</schema>

morphline.conf

SOLR_LOCATOR : {
  collection : cities
  zkHost : "democdhm02.placko.eu:2181/solr"
}
morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
    commands : [
      {
        readCSV {
          separator : ";"
		  columns : [geoname_id, name, ascii_name, alternate_names, feature_class, feature_code, country_code, cou_name_en, country_code_2, admin1_code, admin2_code, admin3_code, admin4_code, population, elevation, dem, timezone, modification_date, label_en, coordinates]
          ignoreFirstLine : true
          quoteChar : "\""
          commentPrefix : ""
          trim : true
          charset : UTF-8
        }
      }
      {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}
        }
      }
      # log the record at DEBUG level to SLF4J
      { logDebug { format : "output record: {}", args : ["@{}"] } }
      # load the record into a Solr server or MapReduce Reducer
      {
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }
    ]
  }
]

Securities – Ranger

HDFS
policy: hdfs_solrsearch_rwe
path: /user/solrsearch/cities
group: solrsearch / user: solrsearch
persmissions: read, write, execute

HIVE
policy: hive_solrsearch_all
database: solrsearch (tb: all, cl: all)
group: solrsearch / user: solrsearch
permissions: all

policy: hive_url_solrsearch_r
url: hdfs://nameservice1//user/solrsearch/cities
group: solrsearch / user: solrsearch
permissions: read

SOLR
policy: solr_solrsearch
Solr Collection: cities
group: solrsearch / user: solrsearch
permissions: query, update, others, solr admin

Securities – Kerberos

- SPNEGO -> auth. via Kerberos (NO via KNOX) - for client apps with big numbers of requests e.g. SolR use cases
- SolR admin via KNOX (user and password) -> https://democdhm01.placko.eu:8443/gateway/homepage/home/

Kerberos example (source: https://web.mit.edu/kerberos/dist/#kfw-3.2)
Kerberos on Windows Client
Firefox
about:config
network.negotiate-auth.trusted-uris  .placko.eu
network.auth.use-sspi false
network.negotiate-auth.allow-proxies true
network.negotiate-auth.delegation-uris  .placko.eu
 
cd C:\Program Files\MIT\Kerberos\bin
set KRB5_CONFIG=C:\Users\<user>\Documents\krb5.ini
set KRB5_TRACE=C:\Users\<user>\Documents\kinit.log
kinit solrsearch@<FQDN>

Testing

curl --negotiate -u: https://democdhd01.placko.eu:8985/solr/admin?op=GETDELEGATIONTOKEN > tokenFile.txt

curl --negotiate -u : "https://democdhd01.placko.eu:8985/solr/cities/select?q=ascii_name:Brati*"

{
  "responseHeader":{
    "zkConnected":true,
    "status":0,
    "QTime":1,
    "params":{
      "q":"ascii_name:Brati*"}},
  "response":{"numFound":3,"start":0,"docs":[
      {
        "admin1_code":"02",
        "timezone":"Europe/Bratislava",
        "dem":"133",
        "geoname_id":"3060280",
        "ascii_name":"Bratislava - Vajnory",
        "admin2_code":"103",
        "modification_date":"2019-06-09",
        "feature_code":"PPLX",
        "cou_name_en":"Slovakia",
        "coordinates":"48.20563,17.20759",
        "label_en":"Slovakia",
        "population":"5484",
        "alternate_names":"Pozsonyszolos,Pozsonyszőlős,Pracsa,Prácsa,Vajnory,Weinern",
        "country_code":"SK",
        "feature_class":"P",
        "name":"Bratislava - Vajnory",
        "admin3_code":"529362",
        "_version_":1744658835843317767},
      {
        "admin1_code":"04",
        "timezone":"Europe/Bucharest",
        "country_code_2":"RO",
        "dem":"269",
        "geoname_id":"683802",
        "ascii_name":"Bratila",
        "admin2_code":"22898",
        "modification_date":"2013-04-21",
        "feature_code":"PPL",
        "cou_name_en":"Romania",
        "coordinates":"46.32348,26.77442",
        "label_en":"Romania",
        "population":"2092",
        "alternate_names":"Bratila,Bratila de Mijloc,Brătila,Brătila de Mijloc",
        "country_code":"RO",
        "feature_class":"P",
        "name":"Brătila",
        "_version_":1744658841818103812},
      {
        "admin1_code":"02",
        "timezone":"Europe/Bratislava",
        "dem":"157",
        "geoname_id":"3060972",
        "ascii_name":"Bratislava",
        "modification_date":"2019-09-05",
        "feature_code":"PPLC",
        "cou_name_en":"Slovakia",
        "coordinates":"48.14816,17.10674",
        "label_en":"Slovakia",
        "population":"423737",
        "alternate_names":"An Bhrataslaiv,An Bhratasláiv,BTS,Baratislawa,Bracislava,Bratislav,Bratislava,Bratislava osh,Bratislavae,Bratislavo,Bratislawa,Bratisllava,Bratisława,Bratyslawa,Bratysława,Bratîslava,Mpratislaba,Posonium,Pozsony,Presburg,Presporok,Prespurk,Pressburg,Preszburg,Preßburg,Prešporok,Prešpurk,beulatiseullaba,bra ti sla wa,bratisalava,bratislabha,bratislava,bratislavha,bratslawa,bratsylava,bratyslafa,bratyslava,bratyslaw,bratyslawa,bu la di si la fa,burachisuravua,pirattislava,Μπρατισλάβα,Братислав,Братиславæ,Братислава,Братислава ош,Братіслава,Братїслава,Браціслава,Բրատիսլավա,בראטיסלאווא,ברטיסלאבה,براتسلاوا,براتسیلاڤا,براتىسلاۋا,براتيسلافا,براتیسلاو,براتیسلاوا,براٹیسلاوا,ܒܪܛܝܣܠܐܒܐ,ब्रातिस्लाभा,ब्रातिस्लाव्हा,ব্রাতিস্লাভা,ਬ੍ਰਾਤਿਸਲਾਵਾ,பிராத்திஸ்லாவா,ಬ್ರಾಟಿಸ್ಲಾವಾ,ബ്രാട്ടിസ്‌ലാവ,บราติสลาวา,བ་ར་ཏིསི་ལ་བ།,ბრატისლავა,ብራቲስላቫ,ブラチスラヴァ,布拉迪斯拉发,布拉迪斯拉發,브라티슬라바",
        "country_code":"SK",
        "feature_class":"P",
        "name":"Bratislava",
        "_version_":1744658845288890378}]
  }}

Additional Info

An example for explaining schema evolution with Parquet (column based) binary data format. Starting with a simple schema, adding new column, deleting existing column and ending up with multiple Parquet files with different but compatible schemas.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292
  • IDE (for Windows 10 Enterprise): Eclipse IDE for Java Developers – 2020-09

In Spark, Parquet data storage is able automatically detect schema changes and merge schemas of all these Parquet files what is internally called as schema merging. Without automatic schema merging, the typical way of handling schema evolution is through historical data reload that requires much work.
The schema merging is a relatively expensive operation and therefore this feature is turned off by default.
It can be enabled by:

  1. setting the data source option mergeSchema to true when reading Parquet files
    or
  2. setting the global SQL option spark.sql.parquet.mergeSchema to true

ParquetSchemaMerging

Schema v1: SchemaVersion1.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/datafactory/)

package eu.placko.examples.spark.datafactory;

import java.io.Serializable;

public class SchemaVersion1 implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String col_a;

    public Integer getId() {return id;}
    public String getCol_A() {return col_a;}

    public void setId(Integer id) {this.id = id;}
    public void setCol_A(String col_a) {this.col_a = col_a;}
}

Schema v2: SchemaVersion2.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/datafactory/)

package eu.placko.examples.spark.datafactory;

import java.io.Serializable;

public class SchemaVersion2 implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String col_b;
    private String col_a;

    public Integer getId() {return id;}
    public String getCol_B() {return col_b;}
    public String getCol_A() {return col_a;}

    public void setId(Integer id) {this.id = id;}
    public void setCol_B(String col_b) {this.col_b = col_b;}
    public void setCol_A(String col_a) {this.col_a = col_a;}
}

Schema v3: SchemaVersion3.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/datafactory/)

package eu.placko.examples.spark.datafactory;

import java.io.Serializable;

public class SchemaVersion3 implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String col_b;

    public Integer getId() {return id;}
    public String getCol_B() {return col_b;}

    public void setId(Integer id) {this.id = id;}
    public void setCol_B(String col_b) {this.col_b = col_b;}
}

ParquetSchemaMerging.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/app/)

package eu.placko.examples.spark.app;

import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import eu.placko.examples.spark.config.SparkConfig;
import eu.placko.examples.spark.config.SparkSQLConfig;
import eu.placko.examples.spark.sql.HiveBuilder;
import eu.placko.examples.spark.sql.QueryBuilder;
import eu.placko.examples.spark.datafactory.*;

import java.util.ArrayList;
import java.util.List;
import java.io.IOException;


public class ParquetSchemaMerging {
	private final static Logger LOGGER = Logger.getLogger(ParquetSchemaMerging.class.getName());
	
	public static void main(String[] args) throws IOException {
		if (args.length != 1) {
		    LOGGER.info("wrong input! /user/<REPLACE>/ParquetSchemaMerging/");
		    return;
		}
		String path = args[0];
		
		SparkSession sparkSession = SparkConfig.createSparkSession();
		LOGGER.info("Started Spark Session");	
		
		
		SchemaVersion1 row1V1 = new SchemaVersion1();
		row1V1.setId(1);
		row1V1.setCol_A("A1");
		SchemaVersion1 row2V1 = new SchemaVersion1();
		row2V1.setId(2);
		row2V1.setCol_A("A2");
		
		List<SchemaVersion1> dataV1 = new ArrayList<SchemaVersion1>();
		dataV1.add(row1V1);
		dataV1.add(row2V1);
		
		Dataset<Row> dsV1 = sparkSession.createDataFrame(dataV1, SchemaVersion1.class);
		dsV1.show();
		dsV1.printSchema();
		dsV1
			.write()
			.format("parquet")
			.mode(SaveMode.Overwrite)
			.save(path + "data/partition-date=2022-01-01");
		LOGGER.info("Created dsV1");
		
		
		SchemaVersion2 row1V2 = new SchemaVersion2();
		row1V2.setId(1);
		row1V2.setCol_B("B1");
		row1V2.setCol_A("A1");
		SchemaVersion2 row2V2 = new SchemaVersion2();
		row2V2.setId(2);
		row2V2.setCol_B("B2");
		row2V2.setCol_A("A2");
		
		List<SchemaVersion2> dataV2 = new ArrayList<SchemaVersion2>();
		dataV2.add(row1V2);
		dataV2.add(row2V2);
		
		Dataset<Row> dsV2 = sparkSession.createDataFrame(dataV2, SchemaVersion2.class);
		dsV2.show();
		dsV2.printSchema();
		dsV2
			.write()
			.format("parquet")
			.mode(SaveMode.Overwrite)
			.save(path + "data/partition-date=2022-01-02");
		LOGGER.info("Created dsV2");
		
		
		SchemaVersion3 row1V3 = new SchemaVersion3();
		row1V3.setId(1);
		row1V3.setCol_B("B1");
		SchemaVersion3 row2V3 = new SchemaVersion3();
		row2V3.setId(2);
		row2V3.setCol_B("B2");
		
		List<SchemaVersion3> dataV3 = new ArrayList<SchemaVersion3>();
		dataV3.add(row1V3);
		dataV3.add(row2V3);
		
		Dataset<Row> dsV3 = sparkSession.createDataFrame(dataV3, SchemaVersion3.class);
		dsV3.show();
		dsV3.printSchema();
		dsV3
			.write()
			.format("parquet")
			.mode(SaveMode.Overwrite)
			.save(path + "data/partition-date=2022-01-03");
		LOGGER.info("Created dsV3");
		
		
		Dataset<Row> dsMergeSchema = sparkSession.read()
				.option("mergeSchema","true")
				.parquet(path + "data");
		dsMergeSchema.show();
		dsMergeSchema.printSchema();
		LOGGER.info("Read dsMergeSchema");
		
		Dataset<Row> dsWithoutMergeSchema = sparkSession.read()
				//.option("mergeSchema","false")
				.parquet(path + "data");
		dsWithoutMergeSchema.show();
		dsWithoutMergeSchema.printSchema();
		LOGGER.info("Read dsWithoutMergeSchema");
		
		Dataset<Row> dsMergeSchemaSparkSQL = eu.placko.examples.spark.config.SparkSQLConfig.createSparkSQLSession().read()
				.parquet(path + "data");
		dsMergeSchemaSparkSQL.show();
		dsMergeSchemaSparkSQL.printSchema();
		LOGGER.info("Read dsMergeSchemaSparkSQL");
		
		
		String hive = new HiveBuilder().buildDB();
		sparkSession.sql(hive);
		LOGGER.info("Created Hive DB if not exists");
		
		hive = new HiveBuilder().buildTB();
		sparkSession.sql(hive);
		LOGGER.info("Dropped Hive table if exists");
		
		hive = new HiveBuilder().buildTB(path);
		sparkSession.sql(hive);
		LOGGER.info("Created Hive table over the parquet");
		
		
		hive = new QueryBuilder().build();
		Dataset<Row> dsReadFromHiveTableBeforeRepair = sparkSession.sql(hive);
		dsReadFromHiveTableBeforeRepair.show();
		dsReadFromHiveTableBeforeRepair.printSchema();
		LOGGER.info("Read dsReadFromHiveTableBeforeRepair");
		
		hive = new HiveBuilder().repairTB();
		sparkSession.sql(hive);
		LOGGER.info("MSCK REPAIR TABLE");
		
		hive = new QueryBuilder().build();
		Dataset<Row> dsReadFromHiveTableAfterRepair = sparkSession.sql(hive);
		dsReadFromHiveTableAfterRepair.show();
		dsReadFromHiveTableAfterRepair.printSchema();
		LOGGER.info("Read dsReadFromHiveTableAfterRepair");
	}
}

SparkConfig.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
//import java.io.File;

public class SparkConfig {
	public static SparkSession createSparkSession() {
		//String hiveDBLocation = new File("parquet_schema_evolution").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("ParquetSchemaMerging")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				//.config("spark.sql.warehouse.dir", hiveDBLocation)
				.enableHiveSupport()
				.getOrCreate();	
		return spark;
	}
}

SparkSQLConfig.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
//import java.io.File;

public class SparkSQLConfig {
	public static SparkSession createSparkSQLSession() {
		//String hiveDBLocation = new File("parquet_schema_evolution").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("ParquetSchemaMerging")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				.config("spark.sql.parquet.mergeSchema", "true") // Use Spark SQL
				//.config("spark.sql.warehouse.dir", hiveDBLocation)
				.enableHiveSupport()
				.getOrCreate();
		return spark;
	}
}

HiveBuilder.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class HiveBuilder {
	public String buildDB() {
		String sql = "CREATE DATABASE IF NOT EXISTS parquet_schema_evolution LOCATION '/user/hive/databases/parquet_schema_evolution.db'";
		return sql;
	}
	
	public String buildTB() {
		String sql = "DROP TABLE IF EXISTS parquet_schema_evolution.parquet_merge";
		return sql;
	}
	
	public String buildTB(String path) {
		String sql = "CREATE EXTERNAL TABLE parquet_schema_evolution.parquet_merge (id INT, col_a STRING) PARTITIONED BY (`partition-date` STRING) STORED AS PARQUET LOCATION 'hdfs://nameservice1" + path + "data'";
		return sql;
	}
	
	public String repairTB() {
		String sql = "MSCK REPAIR TABLE parquet_schema_evolution.parquet_merge";
		return sql;
	}
}

QueryBuilder.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class QueryBuilder {
	private Long limit = 0L;

	public QueryBuilder setLimit(Long limit) {
		this.limit = limit;
		return this;
	}

	public String build() {
		String sql = "SELECT * FROM parquet_schema_evolution.parquet_merge";
		if (limit != null && limit.longValue() > 0) {
			sql = sql + " LIMIT " + limit;
		}
		return sql;
	}
}

ParquetSchemaMerging.sh (path: /ParquetSchemaMerging/scripts/)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.app.ParquetSchemaMerging

appVersion="1.0.0-SNAPSHOT"
appArtifact="/home/<REPLACE>/spark/java/ParquetSchemaMerging-$appVersion.jar /user/<REPLACE>/ParquetSchemaMerging/"
log4j_setting="-Dlog4j.configuration=file:///home/<REPLACE>/spark/java/log4j.xml"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    --conf "spark.driver.extraJavaOptions=${log4j_setting}" \
    --conf "spark.executor.extraJavaOptions=${log4j_setting}" \
    $appArtifact

exit 0;

Results

2022-03-22 08:34:34 INFO  ParquetSchemaMerging:31 - Started Spark Session
+-----+---+
|col_A| id|
+-----+---+
|   A1|  1|
|   A2|  2|
+-----+---+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)

2022-03-22 08:34:41 INFO  ParquetSchemaMerging:53 - Created dsV1
+-----+-----+---+
|col_A|col_B| id|
+-----+-----+---+
|   A1|   B1|  1|
|   A2|   B2|  2|
+-----+-----+---+

root
 |-- col_A: string (nullable = true)
 |-- col_B: string (nullable = true)
 |-- id: integer (nullable = true)

2022-03-22 08:34:42 INFO  ParquetSchemaMerging:77 - Created dsV2
+-----+---+
|col_B| id|
+-----+---+
|   B1|  1|
|   B2|  2|
+-----+---+

root
 |-- col_B: string (nullable = true)
 |-- id: integer (nullable = true)

2022-03-22 08:34:42 INFO  ParquetSchemaMerging:99 - Created dsV3
+-----+---+-----+--------------+
|col_A| id|col_B|partition-date|
+-----+---+-----+--------------+
|   A1|  1|   B1|    2022-01-02|
|   A2|  2|   B2|    2022-01-02|
|   A1|  1| null|    2022-01-01|
|   A2|  2| null|    2022-01-01|
| null|  1|   B1|    2022-01-03|
| null|  2|   B2|    2022-01-03|
+-----+---+-----+--------------+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- col_B: string (nullable = true)
 |-- partition-date: date (nullable = true)

2022-03-22 08:34:44 INFO  ParquetSchemaMerging:107 - Read dsMergeSchema
+-----+---+--------------+
|col_A| id|partition-date|
+-----+---+--------------+
|   A1|  1|    2022-01-02|
|   A2|  2|    2022-01-02|
|   A1|  1|    2022-01-01|
|   A2|  2|    2022-01-01|
| null|  1|    2022-01-03|
| null|  2|    2022-01-03|
+-----+---+--------------+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- partition-date: date (nullable = true)

2022-03-22 08:34:44 INFO  ParquetSchemaMerging:114 - Read dsWithoutMergeSchema *
* Without schema merge, the schema will be decided randomly based on partitioned files. 

2022-03-22 08:34:44 WARN  SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
+-----+---+-----+--------------+
|col_A| id|col_B|partition-date|
+-----+---+-----+--------------+
|   A1|  1|   B1|    2022-01-02|
|   A2|  2|   B2|    2022-01-02|
|   A1|  1| null|    2022-01-01|
|   A2|  2| null|    2022-01-01|
| null|  1|   B1|    2022-01-03|
| null|  2|   B2|    2022-01-03|
+-----+---+-----+--------------+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- col_B: string (nullable = true)
 |-- partition-date: date (nullable = true)

2022-03-22 08:34:44 INFO  ParquetSchemaMerging:120 - Read dsMergeSchemaSparkSQL *
* The result is same as using mergeSchema option. The advantage of using this option is that it is effective in the whole Spark session instead of specifying it in all read functions.

2022-03-22 08:34:45 INFO  ParquetSchemaMerging:125 - Created Hive DB if not exists
2022-03-22 08:34:45 INFO  ParquetSchemaMerging:129 - Dropped Hive table if exists
2022-03-22 08:34:46 INFO  ParquetSchemaMerging:133 - Created Hive table over the parquet
+---+-----+--------------+
| id|col_a|partition-date|
+---+-----+--------------+
+---+-----+--------------+ *
* Partitions not in metastore:
parquet_merge:partition-date=2022-01-01
parquet_merge:partition-date=2022-01-02
parquet_merge:partition-date=2022-01-03
Repair: Added partition to metastore parquet_merge: partition-date=2022-01-01
Repair: Added partition to metastore parquet_merge: partition-date=2022-01-02
Repair: Added partition to metastore parquet_merge: partition-date=2022-01-03

root
 |-- id: integer (nullable = true)
 |-- col_a: string (nullable = true)
 |-- partition-date: string (nullable = true)

2022-03-22 08:34:46 INFO  ParquetSchemaMerging:140 - Read dsReadFromHiveTableBeforeRepair
2022-03-22 08:34:46 INFO  ParquetSchemaMerging:144 - MSCK REPAIR TABLE
+---+-----+--------------+
| id|col_a|partition-date|
+---+-----+--------------+
|  1|   A1|    2022-01-02|
|  2|   A2|    2022-01-02|
|  1|   A1|    2022-01-01|
|  2|   A2|    2022-01-01|
|  1| null|    2022-01-03|
|  2| null|    2022-01-03|
+---+-----+--------------+

root
 |-- id: integer (nullable = true)
 |-- col_a: string (nullable = true)
 |-- partition-date: string (nullable = true)

2022-03-22 08:34:47 INFO  ParquetSchemaMerging:150 - Read dsReadFromHiveTableAfterRepair

README.md (path: /ParquetSchemaMerging/)

HOW TO CONFIGURE THE PROJECT
 
ParquetSchemaMerging.sh
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab

appArtifact="/home/<REPLACE>/spark/java/ParquetSchemaMerging-$appVersion.jar /user/<REPLACE>/ParquetSchemaMerging/"
log4j_setting="-Dlog4j.configuration=file:///home/<REPLACE>/spark/java/log4j.xml"

log4j.xml
<param name="file" value="/home/<REPLACE>/spark/java/log.out" />

Building and Running
 
Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install
 
Submit to Spark cluster
For running spark application see shell script inside /scripts dir.

Source Code

https://github.com/mplacko/ParquetSchemaMerging

Additional Info

Avro – Schema Evolution

Posted: February 26, 2022 in Hadoop
Tags:

An example for explaining schema evolution with Avro (row based) binary data format.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

Avro stores the data definition in a JSON file (.avsc)

Basic structure of an Avro JSON schema file

{
  "type" : "record",
  "name" : "file_name_excluding_extension",
  "doc" : "documentation",
  "fields" : [{
         "name" : "field_name",
         "type" : "datatype",
         "columnName" : "hive_table_column_name"
  },{}],
  "tableName" : "hive_table_name"
}

Original schema: avro_schema_evolution.avsc

{
	"type":"record",
	"name":"avro_schema_evolution",
	"doc":"avro schema evolution",
	"fields":[
		{
			"name":"id",
			"type":"int",
			"columnName":"id"
		},
		{
			"name":"col_a",
			"type":"string",
			"columnName":"col_a"
		}
	],
	"tableName":"AvroSchemaEvolution"
}
# Kerberos
$ su - <user>
$ kinit -kt /etc/security/keytabs/<user>.keytab <user>
$ klist
 $ hdfs dfs -put -f /home/<user>/schema/avro_schema_evolution.avsc /user/<user>
hive> CREATE DATABASE IF NOT EXISTS avro_schema_evolution LOCATION "/user/hive/databases/avro_schema_evolution.db";
hive> CREATE EXTERNAL TABLE avro_schema_evolution.avro_schema_evolution
STORED AS AVRO
LOCATION '/user/hive/databases/avro_schema_evolution.db'
TBLPROPERTIES ('avro.schema.url' = 'hdfs:///user/<user>/avro_schema_evolution.avsc');
hive> SHOW CREATE TABLE avro_schema_evolution.avro_schema_evolution;
OK
CREATE EXTERNAL TABLE `avro_schema_evolution.avro_schema_evolution`(
  `id` int COMMENT '',
  `col_b` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION
  'hdfs://nameservice1/user/hive/databases/avro_schema_evolution.db'
TBLPROPERTIES (
  'avro.schema.url'='hdfs:///user/<user>/avro_schema_evolution.avsc',
  'transient_lastDdlTime'='1645356091')
hive> INSERT OVERWRITE TABLE avro_schema_evolution.avro_schema_evolution
  VALUES (1, 'A1'), (2, 'A2');
hive> SELECT * FROM avro_schema_evolution.avro_schema_evolution;
id      col_a
1       A1
2       A2

Updated schema (added new col_b): avro_schema_evolution.avsc

{
	"type":"record",
	"name":"avro_schema_evolution",
	"doc":"avro schema evolution",
	"fields":[
		{
			"name":"id",
			"type":"int",
			"columnName":"id"
		},
		{
			"name":"col_b",
			"type":"string",
			"columnName":"col_b",
			"default":"null"
		},
		{
			"name":"col_a",
			"type":"string",
			"columnName":"col_a"
		}
	],
	"tableName":"AvroSchemaEvolution"
}

NOTE: new col_b contains a default value

$ hdfs dfs -put -f /home/<user>/schema/new/avro_schema_evolution.avsc /user/<user>
hive > INSERT INTO TABLE avro_schema_evolution.avro_schema_evolution
  VALUES (3, 'B3', 'A3'), (4, 'B4', 'A4');
hive>  SELECT * FROM avro_schema_evolution.avro_schema_evolution;
id      col_b  col_a
1       null    A1
2       null    A2
3       B3      A3
4       B4      A4

Updated schema (renamed col_a -> col_aa via ALIAS): avro_schema_evolution.avsc

{
	"type":"record",
	"name":"avro_schema_evolution",
	"doc":"avro schema evolution",
	"fields":[
		{
			"name":"id",
			"type":"int",
			"columnName":"id"
		},
		{
			"name":"col_b",
			"type":"string",
			"columnName":"col_b",
			"default":"null"
		},
		{
			"name":"col_aa",
			"type":"string",
			"columnName":"col_aa",
			"aliases" : [ "col_a" ]
		}
	],
	"tableName":"AvroSchemaEvolution"
}
$ hdfs dfs -put -f /home/<user>/schema/new_alias/avro_schema_evolution.avsc /user/<user>
hive> SELECT * FROM avro_schema_evolution.avro_schema_evolution;
id      col_b  col_aa
1       null    A1
2       null    A2
3       B3      A3
4       B4      A4

WARNING: You CANNOT change a field’s datatype. You need to add a new column instead!

Additional Info

An example for reading (querying) and writing a parquet file and creating hive table.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292
  • IDE (for Windows 10 Enterprise): Eclipse IDE for Java Developers – 2020-09

SparkSql2Parquet

SparkConfig.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
import java.io.File;

public class SparkConfig {
	public static SparkSession createSparkSession() {
		String episodesLocation = new File("episodes").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("sparksql2parquet")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				.config("spark.sql.warehouse.dir", episodesLocation)
				.enableHiveSupport()
				.getOrCreate();	
		return spark;
	}
}

QueryBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class QueryBuilder {
	private Long limit = 0L;

	public QueryBuilder setLimit(Long limit) {
		this.limit = limit;
		return this;
	}

	public String build() {
		String sql = "SELECT title FROM tmp_episodes";
		if (limit != null && limit.longValue() > 0) {
			sql = sql + " limit " + limit;
		}
		return sql;
	}
}

HiveBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class HiveBuilder {
	public String buildDB() {
		String sql = "CREATE DATABASE IF NOT EXISTS episodes LOCATION '/user/hive/databases/episodes.db'";
		return sql;
	}
	
	public String buildTB() {
		String sql = "DROP TABLE IF EXISTS episodes.episodes";
		return sql;
	}
	
	public String buildTB(String path) {
		String sql = "CREATE EXTERNAL TABLE episodes.episodes(title string) STORED AS PARQUET LOCATION 'hdfs://nameservice1" + path + "episodes_titles_only.parquet'";
		return sql;
	}
}

NOTES:

  • nameservice1 -> HA (High Availability)
  • PARTITIONED BY (title string)
  • SHOW CREATE TABLE episodes.episodes

SparkSql2Parquet.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/app/)

package eu.placko.examples.spark.app;

import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;

import eu.placko.examples.spark.config.SparkConfig;
import eu.placko.examples.spark.sql.QueryBuilder;
import eu.placko.examples.spark.sql.HiveBuilder;

public class SparkSql2Parquet {
	private final static Logger LOGGER = Logger.getLogger(SparkSql2Parquet.class.getName());
	
	public static void main(String[] args) {
		if (args.length != 1) {
		    LOGGER.info("wrong input! usage: /user/<REPLACE>/sparksql2parquet/");
		    return;
		}
		String path = args[0];
		
		SparkSession sparkSession = SparkConfig.createSparkSession();
		LOGGER.info("Started Spark Session");
		
		Dataset<Row> parquetFileDF = sparkSession.read().parquet(path + "episodes.parquet");
		parquetFileDF.createOrReplaceTempView("tmp_episodes");
		LOGGER.info("Created temp view");
		
		String query = new QueryBuilder().setLimit(null).build();
		Dataset<Row> episodesDF = sparkSession.sql(query);
		episodesDF
			//.select("title")
			.write()
			.mode(SaveMode.Overwrite)
			.parquet(path + "episodes_titles_only.parquet");
		LOGGER.info("Written to the new parquet");
		
		String hive = new HiveBuilder().buildDB();
		sparkSession.sql(hive);
		LOGGER.info("Created Hive DB if not exists");
		
		hive = new HiveBuilder().buildTB();
		sparkSession.sql(hive);
		LOGGER.info("Dropped Hive table if exists");
		
		hive = new HiveBuilder().buildTB(path);
		sparkSession.sql(hive);
		LOGGER.info("Created Hive table over the parquet");
	}
}

NOTE: “episodes.parquet” does not need to be a parquet file directly; it can be a directory and inside will be responsible parquet files

e.g.:

/user/<REPLACE>/sparksql2parquet/episodes.parquet/
part-00000-d26aa3a3-ef94-422f-933e-d230b345cad3-c000.snappy.parquet

sparksql2parquet.sh (path: /sparksql2parquet/scripts/)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.app.SparkSql2Parquet

appVersion="1.0.0-SNAPSHOT"
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    $appArtifact

exit 0;

README.md (path: /sparksql2parquet/)

HOW TO CONFIGURE THE PROJECT

sparksql2parquet.sh
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

INPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes.parquet
parquet location: \src\main\resources
avro source: downloaded from https://github.com/apache/hive/blob/master/data/files/episodes.avro

OUTPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes_titles_only.parquet


Building and Running

Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install

Submit to Spark cluster
For running spark application see shell script inside /scripts dir.

Source Code

https://github.com/mplacko/sparksql2parquet

Additional Info

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

Avro2Parquet

avro2parquet.scala (path: /avro2parquet/src/main/scala/example/)

package eu.placko.examples.spark

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hadoop.fs.{Path}

object Avro2Parquet {
def main(args: Array[String]) {
  if (args.length != 2) {
    println("wrong input! usage: /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet")
    return
  }

  val avroPath = new Path(args(0))
  val parquetPath = new Path(args(1))

  val spark: SparkSession = SparkSession.builder()
    //.master("local[1]")
    //.master("yarn")
    .appName("Avro2Parquet")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  try {
    //read avro file
    val df = spark.read.format("avro")
      .load(avroPath + "/" + "episodes.avro")
    df.show()
    df.printSchema()

    //convert to parquet
    df.write.mode(SaveMode.Overwrite)
      .parquet(parquetPath + "/" + "episodes.parquet")
  } catch {
    case e: Exception => println("Exception: " + e);
  }
}
}

notes:

Dependencies.scala (path: /avro2parquet/project/)

import sbt._

object Dependencies {
  val sparkVersion = "2.4.0"
  lazy val sparkAvro = Seq(
    "org.apache.spark" %% "spark-sql" % sparkVersion,
    "org.apache.spark" %%  "spark-avro" % sparkVersion
)}

build.sbt (path: /avro2parquet/)

import Dependencies._

ThisBuild / scalaVersion    := "2.11.12"
ThisBuild / version    := "0.1.0"
ThisBuild / organization    := "eu.placko"
ThisBuild / organizationName    := "examples.spark"

lazy val root = (project in file("."))
  .settings(
    name := "avro2parquet",
    version := "0.1.0",
    libraryDependencies ++= sparkAvro
)

// Uncomment the following for publishing to Sonatype.
// See https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html for more detail.

// ThisBuild / description := "Some descripiton about your project."
// ThisBuild / licenses    := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
// ThisBuild / homepage    := Some(url("https://github.com/example/project"))
// ThisBuild / scmInfo := Some(
//   ScmInfo(
//     url("https://github.com/your-account/your-project"),
//     "scm:git@github.com:your-account/your-project.git"
//   )
// )
// ThisBuild / developers := List(
//   Developer(
//     id    = "Your identifier",
//     name  = "Your Name",
//     email = "your@email",
//     url   = url("http://your.url")
//   )
// )
// ThisBuild / pomIncludeRepository := { _ => false }
// ThisBuild / publishTo := {
//   val nexus = "https://oss.sonatype.org/"
//   if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
//   else Some("releases" at nexus + "service/local/staging/deploy/maven2")
// }
// ThisBuild / publishMavenStyle := true

BUILD (path: /avro2parquet/sbt)

./sbt/bin/sbt compile
#./sbt/bin/sbt run
./sbt/bin/sbt package

notes:

EXECUTE avro2parquet.sh (path: /avro2parquet)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.Avro2Parquet

appVersion="0.1.0"
appArtifact="/<REPLACE>/avro2parquet/target/scala-2.11/avro2parquet_2.11-$appVersion.jar /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet"
log4j_setting="-Dlog4j.configuration=file:///<REPLACE>/avro2parquet/conf/log4j.xml"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    --conf "spark.driver.extraJavaOptions=${log4j_setting}" \
    --conf "spark.executor.extraJavaOptions=${log4j_setting}" \
    $appArtifact

exit 0;

notes (source: https://sparkbyexamples.com/spark/sparksession-explained-with-examples/):

  • master() – If you are running it on the cluster you need to use your master name as an argument to master(). Usually, it would be either yarn or mesos depends on your cluster setup.
  • Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
  • and
  • https://sparkbyexamples.com/spark/spark-deploy-modes-client-vs-cluster/

Source Code

https://github.com/mplacko/avro2parquet

Additional Info

Spark-Shell Example

# Kerberos
# su - <user>
# kinit -kt /etc/security/keytabs/<user>.kaytab <user>

Start Spark-shell

spark-shell

Load dataframe from Parquet

var df = spark.read.format("parquet").option("inferSchema", "true").load("/user/<user>/avro2parquet/parquet/episodes.parquet")

Save dataframe as Parquet

import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet("/user/<user>/avro2parquet/parquet/checksum")

Print dataframe schema

df.printSchema

scala> df.printSchema
root
 |-- title: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- doctor: integer (nullable = true)

Add column to dataframe

# add checksum column by performing MD5 hash on concatenation of all columns
df = df.withColumn("checksum", md5(concat_ws(",", col("title"), col("air_date"), col("doctor"))))

Print dataframe schema

df.printSchema

scala> df.printSchema
root
 |-- title: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- doctor: integer (nullable = true)
 |-- checksum: string (nullable = false)

Select from dataframe

df.select("checksum").show(false)

scala> df.select("checksum").show(false)
+--------------------------------+
|checksum                        |
+--------------------------------+
|60723b412082c7ecca173420db2dd620|
|a6e1f1847fff5d7795bb4ffadf7c337d|
|25ff41e3d2507c37fbcc24ab571e55d6|
|2deea188b3e6b7d654db4f98baf7cdcf|
|ab0e985ea1588b0a3e34b80dae197391|
|4901b9d1fc3f47f6b3283c7bc6e3cf8b|
|6a43f326e1a823108ee84f97cf2f59ba|
|984d29d372f8f10f3edbb0fa26b096cb|
+--------------------------------+

Impala JDBC Tester – Cloudera driver

Posted: December 21, 2021 in Hadoop
Tags:

Prerequisites

impala-jdbc-tester

ClouderaImpalaJdbcTester.java

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class ClouderaImpalaJdbcTester {

    private static final String CONNECTION_URL_PROPERTY = "connection.url";
    private static final String JDBC_DRIVER_NAME_PROPERTY = "jdbc.driver.class.name";

    private static String connectionUrl;
    private static String jdbcDriverName;

    private static void loadConfiguration() throws IOException {
        InputStream input = null;
        try {
            String filename = ClouderaImpalaJdbcTester.class.getSimpleName() + ".conf";
            filename = "ClouderaImpalaJdbcTester.conf";
            input = ClouderaImpalaJdbcTester.class.getClassLoader().getResourceAsStream(filename);
            Properties prop = new Properties();
            prop.load(input);

            connectionUrl = prop.getProperty(CONNECTION_URL_PROPERTY);
            jdbcDriverName = prop.getProperty(JDBC_DRIVER_NAME_PROPERTY);
        } finally {
            try {
                if (input != null)
                    input.close();
            } catch (IOException e) {
                // ToDo
            }
        }
    }

    public static void main(String[] args) throws IOException {
        //String sqlStatement = args[0];
        String sqlStatement = "show databases";
        //sqlStatement = "select * from <table>;";

        loadConfiguration();

        System.out.println("\n=============================================");
        System.out.println("Cloudera Impala JDBC Tester");
        System.out.println("Using Connection URL: " + connectionUrl);
        System.out.println("Running Query: " + sqlStatement);

        Connection con = null;
        try {

            Class.forName(jdbcDriverName);
            con = DriverManager.getConnection(connectionUrl);
            Statement stmt = con.createStatement();
            ResultSet rs = stmt.executeQuery(sqlStatement);
            System.out.println("\n== Begin Query Results ======================");
            // print the results to the console
            while (rs.next()) {
                // the tester query returns one String column
                System.out.println(rs.getString(1));
            }
            System.out.println("== End Query Results =======================\n\n");
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                con.close();
            } catch (Exception e) {
                // ToDo
            }
        }
    }
}

ClouderaImpalaJdbcTester.conf

#For secure cluster with Kerberos authentication and TLS enabled; Here we allow the SSL certificate used by the server to be self-signed, by sending the AllowSelfSignedCerts property=1
connection.url = jdbc:impala://<impala-load-balancer>:21050;AuthMech=3;UID=<username>;PWD=<password>;SSL=1;AllowSelfSignedCerts=1
jdbc.driver.class.name = com.cloudera.impala.jdbc.Driver
#com.cloudera.hive.jdbc.HS2Driver

Source Code

https://github.com/mplacko/impala-jdbc-tester

Additional Info

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
doNotPrompt=true
useTicketCache=true
principal=”impala@<…>”
keyTab=”<…>/impala.keytab”;
};

  • SSLTrustStore vs JAVA_HOME

jssecacerts and cacerts are part of the JAVA_HOME, that’s what hive/impala driver is loking for.
If jssecacerts does not exist, then cacerts is used. (The default location of cacerts is jre/lib/security.)
If JAVA_HOME is not set, it cannot access the truststore files; Hence in the connection string explicitly has to be defined following: SSLTrustStore=/etc/pki/java/cacerts;SSLTrustStorePwd=changeit;
note: If the trust store requires a password, provide it using the property SSLTrustStorePwd.
where:
{
“name” : “hiveserver2_truststore_file”,
“value” : “/etc/pki/java/cacerts”
}, {
“name” : “hiveserver2_truststore_password”,
“value” : “changeit”
}, {..
where:
JAVA_HOME: /usr/java/default

  • Impala
    • INVALIDATE METADATA
      • The INVALIDATE METADATA <tablename> must be used if:
        • if the table is newly created/removed  (outside impala: impala is not aware of this operation, e.g: the table is created/dropped in Hive or Spark)
        • if the table schema is altered outside impala (e.g column name added/removed/renamed from Hive), if partitions are added/removed outside impala (e.g from Hive)
        • if the blocks location is changed (in case the blocks were moved from 1 machine to another, e.g execution of the hdfs rebalance)
    • REFRESH
    • COMPUTE STATS