Posts Tagged ‘Big data’

An example for reading (querying) and writing a parquet file and creating hive table.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • OpenJDK 64-Bit 1.8.0_292
  • IDE (for Windows 10 Enterprise): Eclipse IDE for Java Developers – 2020-09

SparkSql2Parquet

SparkConfig.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
import java.io.File;

public class SparkConfig {
	public static SparkSession createSparkSession() {
		String episodesLocation = new File("episodes").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("sparksql2parquet")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				.config("spark.sql.warehouse.dir", episodesLocation)
				.enableHiveSupport()
				.getOrCreate();	
		return spark;
	}
}

QueryBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class QueryBuilder {
	private Long limit = 0L;

	public QueryBuilder setLimit(Long limit) {
		this.limit = limit;
		return this;
	}

	public String build() {
		String sql = "SELECT title FROM tmp_episodes";
		if (limit != null && limit.longValue() > 0) {
			sql = sql + " limit " + limit;
		}
		return sql;
	}
}

HiveBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class HiveBuilder {
	public String buildDB() {
		String sql = "CREATE DATABASE IF NOT EXISTS episodes LOCATION '/user/hive/databases/episodes.db'";
		return sql;
	}
	
	public String buildTB() {
		String sql = "DROP TABLE IF EXISTS episodes.episodes";
		return sql;
	}
	
	public String buildTB(String path) {
		String sql = "CREATE EXTERNAL TABLE episodes.episodes(title string) STORED AS PARQUET LOCATION 'hdfs://nameservice1" + path + "episodes_titles_only.parquet'";
		return sql;
	}
}

NOTES:

  • nameservice1 -> HA (High Availability)
  • PARTITIONED BY (title string)
  • SHOW CREATE TABLE episodes.episodes

SparkSql2Parquet.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/app/)

package eu.placko.examples.spark.app;

import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;

import eu.placko.examples.spark.config.SparkConfig;
import eu.placko.examples.spark.sql.QueryBuilder;
import eu.placko.examples.spark.sql.HiveBuilder;

public class SparkSql2Parquet {
	private final static Logger LOGGER = Logger.getLogger(SparkSql2Parquet.class.getName());
	
	public static void main(String[] args) {
		if (args.length != 1) {
		    LOGGER.info("wrong input! usage: /user/<REPLACE>/sparksql2parquet/");
		    return;
		}
		String path = args[0];
		
		SparkSession sparkSession = SparkConfig.createSparkSession();
		LOGGER.info("Started Spark Session");
		
		Dataset<Row> parquetFileDF = sparkSession.read().parquet(path + "episodes.parquet");
		parquetFileDF.createOrReplaceTempView("tmp_episodes");
		LOGGER.info("Created temp view");
		
		String query = new QueryBuilder().setLimit(null).build();
		Dataset<Row> episodesDF = sparkSession.sql(query);
		episodesDF
			//.select("title")
			.write()
			.mode(SaveMode.Overwrite)
			.parquet(path + "episodes_titles_only.parquet");
		LOGGER.info("Written to the new parquet");
		
		String hive = new HiveBuilder().buildDB();
		sparkSession.sql(hive);
		LOGGER.info("Created Hive DB if not exists");
		
		hive = new HiveBuilder().buildTB();
		sparkSession.sql(hive);
		LOGGER.info("Dropped Hive table if exists");
		
		hive = new HiveBuilder().buildTB(path);
		sparkSession.sql(hive);
		LOGGER.info("Created Hive table over the parquet");
	}
}

NOTE: “episodes.parquet” does not need to be a parquet file directly; it can be a directory and inside will be responsible parquet files

e.g.:

/user/<REPLACE>/sparksql2parquet/episodes.parquet/
part-00000-d26aa3a3-ef94-422f-933e-d230b345cad3-c000.snappy.parquet

sparksql2parquet.sh (path: /sparksql2parquet/scripts/)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.app.SparkSql2Parquet

appVersion="1.0.0-SNAPSHOT"
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    $appArtifact

exit 0;

README.md (path: /sparksql2parquet/)

HOW TO CONFIGURE THE PROJECT

sparksql2parquet.sh
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

INPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes.parquet
parquet location: \src\main\resources
avro source: downloaded from https://github.com/apache/hive/blob/master/data/files/episodes.avro

OUTPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes_titles_only.parquet


Building and Running

Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install

Submit to Spark cluster
For running spark application see shell script inside /scripts dir.

Source Code

https://github.com/mplacko/sparksql2parquet

Additional Info

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • OpenJDK 64-Bit 1.8.0_292

Avro2Parquet

avro2parquet.scala (path: /avro2parquet/src/main/scala/example/)

package eu.placko.examples.spark

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hadoop.fs.{Path}

object Avro2Parquet {
def main(args: Array[String]) {
  if (args.length != 2) {
    println("wrong input! usage: /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet")
    return
  }

  val avroPath = new Path(args(0))
  val parquetPath = new Path(args(1))

  val spark: SparkSession = SparkSession.builder()
    //.master("local[1]")
    //.master("yarn")
    .appName("Avro2Parquet")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  try {
    //read avro file
    val df = spark.read.format("avro")
      .load(avroPath + "/" + "episodes.avro")
    df.show()
    df.printSchema()

    //convert to parquet
    df.write.mode(SaveMode.Overwrite)
      .parquet(parquetPath + "/" + "episodes.parquet")
  } catch {
    case e: Exception => println("Exception: " + e);
  }
}
}

notes:

Dependencies.scala (path: /avro2parquet/project/)

import sbt._

object Dependencies {
  val sparkVersion = "2.4.0"
  lazy val sparkAvro = Seq(
    "org.apache.spark" %% "spark-sql" % sparkVersion,
    "org.apache.spark" %%  "spark-avro" % sparkVersion
)}

build.sbt (path: /avro2parquet/)

import Dependencies._

ThisBuild / scalaVersion    := "2.11.12"
ThisBuild / version    := "0.1.0"
ThisBuild / organization    := "eu.placko"
ThisBuild / organizationName    := "examples.spark"

lazy val root = (project in file("."))
  .settings(
    name := "avro2parquet",
    version := "0.1.0",
    libraryDependencies ++= sparkAvro
)

// Uncomment the following for publishing to Sonatype.
// See https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html for more detail.

// ThisBuild / description := "Some descripiton about your project."
// ThisBuild / licenses    := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
// ThisBuild / homepage    := Some(url("https://github.com/example/project"))
// ThisBuild / scmInfo := Some(
//   ScmInfo(
//     url("https://github.com/your-account/your-project"),
//     "scm:git@github.com:your-account/your-project.git"
//   )
// )
// ThisBuild / developers := List(
//   Developer(
//     id    = "Your identifier",
//     name  = "Your Name",
//     email = "your@email",
//     url   = url("http://your.url")
//   )
// )
// ThisBuild / pomIncludeRepository := { _ => false }
// ThisBuild / publishTo := {
//   val nexus = "https://oss.sonatype.org/"
//   if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
//   else Some("releases" at nexus + "service/local/staging/deploy/maven2")
// }
// ThisBuild / publishMavenStyle := true

BUILD (path: /avro2parquet/sbt)

./sbt/bin/sbt compile
#./sbt/bin/sbt run
./sbt/bin/sbt package

notes:

EXECUTE avro2parquet.sh (path: /avro2parquet)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.Avro2Parquet

appVersion="0.1.0"
appArtifact="/<REPLACE>/avro2parquet/target/scala-2.11/avro2parquet_2.11-$appVersion.jar /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet"
log4j_setting="-Dlog4j.configuration=file:///<REPLACE>/avro2parquet/conf/log4j.xml"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    --conf "spark.driver.extraJavaOptions=${log4j_setting}" \
    --conf "spark.executor.extraJavaOptions=${log4j_setting}" \
    $appArtifact

exit 0;

notes (source: https://sparkbyexamples.com/spark/sparksession-explained-with-examples/):

  • master() – If you are running it on the cluster you need to use your master name as an argument to master(). Usually, it would be either yarn or mesos depends on your cluster setup.
  • Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
  • and
  • https://sparkbyexamples.com/spark/spark-deploy-modes-client-vs-cluster/

Source Code

https://github.com/mplacko/avro2parquet

Additional Info

Impala JDBC Tester – Cloudera driver

Posted: December 21, 2021 in Hadoop
Tags:

Prerequisites

impala-jdbc-tester

ClouderaImpalaJdbcTester.java

import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class ClouderaImpalaJdbcTester {

    private static final String CONNECTION_URL_PROPERTY = "connection.url";
    private static final String JDBC_DRIVER_NAME_PROPERTY = "jdbc.driver.class.name";

    private static String connectionUrl;
    private static String jdbcDriverName;

    private static void loadConfiguration() throws IOException {
        InputStream input = null;
        try {
            String filename = ClouderaImpalaJdbcTester.class.getSimpleName() + ".conf";
            filename = "ClouderaImpalaJdbcTester.conf";
            input = ClouderaImpalaJdbcTester.class.getClassLoader().getResourceAsStream(filename);
            Properties prop = new Properties();
            prop.load(input);

            connectionUrl = prop.getProperty(CONNECTION_URL_PROPERTY);
            jdbcDriverName = prop.getProperty(JDBC_DRIVER_NAME_PROPERTY);
        } finally {
            try {
                if (input != null)
                    input.close();
            } catch (IOException e) {
                // ToDo
            }
        }
    }

    public static void main(String[] args) throws IOException {
        //String sqlStatement = args[0];
        String sqlStatement = "show databases";
        //sqlStatement = "select * from <table>;";

        loadConfiguration();

        System.out.println("\n=============================================");
        System.out.println("Cloudera Impala JDBC Tester");
        System.out.println("Using Connection URL: " + connectionUrl);
        System.out.println("Running Query: " + sqlStatement);

        Connection con = null;
        try {

            Class.forName(jdbcDriverName);
            con = DriverManager.getConnection(connectionUrl);
            Statement stmt = con.createStatement();
            ResultSet rs = stmt.executeQuery(sqlStatement);
            System.out.println("\n== Begin Query Results ======================");
            // print the results to the console
            while (rs.next()) {
                // the tester query returns one String column
                System.out.println(rs.getString(1));
            }
            System.out.println("== End Query Results =======================\n\n");
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                con.close();
            } catch (Exception e) {
                // ToDo
            }
        }
    }
}

ClouderaImpalaJdbcTester.conf

#For secure cluster with Kerberos authentication and TLS enabled; Here we allow the SSL certificate used by the server to be self-signed, by sending the AllowSelfSignedCerts property=1
connection.url = jdbc:impala://<impala-load-balancer>:21050;AuthMech=3;UID=<username>;PWD=<password>;SSL=1;AllowSelfSignedCerts=1
jdbc.driver.class.name = com.cloudera.impala.jdbc.Driver
#com.cloudera.hive.jdbc.HS2Driver

Source Code

https://github.com/mplacko/impala-jdbc-tester

Additional Info

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
doNotPrompt=true
useTicketCache=true
principal=”impala@<…>”
keyTab=”<…>/impala.keytab”;
};

Prerequisites

References

Hive/Impala SQLLine Tester

prepare directory

mkdir sqllinetester && cd sqllinetester

download SQLLine with-dependencies

wget https://repo1.maven.org/maven2/sqlline/sqlline/1.11.0/sqlline-1.11.0-jar-with-dependencies.jar
chmod +x sqlline-1.11.0-jar-with-dependencies.jar

download Hive JDBC driver

wget https://repository.cloudera.com/artifactory/cdh-build/Hive/HiveJDBC42/2.6.15.1018/HiveJDBC42-2.6.15.1018.jar
chmod +x HiveJDBC42-2.6.15.1018.jar

download Impala JDBC driver

wget https://repository.cloudera.com/artifactory/cdh-build/Impala/ImpalaJDBC42/2.6.15.1017/ImpalaJDBC42-2.6.15.1017.jar
chmod +x ImpalaJDBC42-2.6.15.1017.jar

obtain TGT in ticket cache

# kinit -kt <path-to-keytab> <client-principal>
kinit -kt /etc/security/keytabs/hiveimpalatester.keytab hiveimpalatester

execute SQLLine with the jars in the classpath

java -cp "./*" sqlline.SqlLine

check if JDBC drivers are loaded

sqlline> !scan
scan complete in 84ms
3 driver classes found
Compliant Version Driver Class
no        2.6     com.cloudera.hive.jdbc.HS1Driver
no        2.6     com.cloudera.hive.jdbc.HS2Driver
no        2.6     com.cloudera.impala.jdbc.Driver

# class names:
# com.cloudera.impala.jdbc.Driver
# com.cloudera.hive.jdbc.HS2Driver

connect to Hive via Hive JDBC

sqlline> !connect jdbc:hive2://<hs2_node>:10000/default;AuthMech=1;ssl=1;sslTrustStore=/<path>/certs.jks;KrbRealm=<krb_realm>;KrbHostFQDN=_HOST;KrbServiceName=hive;KrbAuthType=2;

# When you authenticate via Kerberos, sqlline will ask for username/password. When prompted, just press Enter and Enter.

0: jdbc:hive2://<hs2_node> show tables;

connect to Impala via Impala JDBC

sqlline> !connect jdbc:impala://<impala-load-balancer>:21050/default;AuthMech=1;ssl=1;sslTrustStore=/<path>/certs.jks;KrbRealm=<krb_realm>;KrbHostFQDN=_HOST;KrbServiceName=impala;KrbAuthType=2;

# When you authenticate via Kerberos, sqlline will ask for username/password. When prompted, just press Enter and Enter.

0: jdbc:impala://<impala-load-balancer> show tables;

exit SQLLine

sqlline> !quit

Additional Info

AuthMech

  • 0: no authentication
  • 1: Kerberos
  • 2: user name
  • 3: user name and password
  • 4: user name and password with SSL

KrbAuthType

  • 0: To configure the driver to automatically detect which method to use for obtaining the Subject, set the KrbAuthType property to 0. Alternatively, do not set the KrbAuthType property.
  • 1: Or, to create a LoginContext from a JAAS configuration and then use the Subject associated with it, set the KrbAuthType property to 1.
  • 2: Or, to create a LoginContext from a Kerberos ticket cache and then use the Subject associated with it, set the KrbAuthType property to 2.

Prerequisites

Hive/Impala Tester

HiveServer2 Beeline tests

beeline -u 'jdbc:hive2://<hs2_node>:10000/default;ssl=true;principal=hive/_HOST@<kerberos_realm>'
beeline -u 'jdbc:hive2://<impala-load-balancer>:21050/default;ssl=true;principal=impala/_HOST@<kerberos_realm>'

note: Beeline Command Options

Obtain TGT in ticket cache

# renewal of validity for Kerberos ticket: scheduled e.g. Cron job
su - u hiveimpalatester
# kinit -kt <path-to-keytab> <client-principal>
kinit -kt /etc/security/keytabs/hiveimpalatester.keytab hiveimpalatester
#klist

hiveimpala_jdbc_tester.java

//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Properties;
import java.io.StringReader;
import java.sql.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
 
public class hiveimpala_jdbc_tester {
  //private static final Logger LOGGER = 
  LoggerFactory.getLogger(hiveimpala_jdbc_tester.class);
  private static String HIVESERVE2DRIVER = "org.apache.hive.jdbc.HiveDriver";
 
  public static void main(String[] args) throws SQLException, IOException {
    String principal = args[0];
    String keytab    = args[1];
    String url       = args[2];
    String fetchSize = args[3];
    String sqlString = args[4];
    int fSize = Integer.parseInt(fetchSize.trim());
 
    // attaching Kerberos conf information: realm and kdc
    System.setProperty("java.security.krb5.conf", "/<path>/krb5.conf");
    //System.setProperty("sun.security.krb5.debug", "true");
 
    // extract serverPrincipal
    Properties p = new Properties();
    p.load(new StringReader(url.replaceAll(";", "\n")));
    String serverPrincipal=p.getProperty("principal");
    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "kerberos");
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(principal, keytab);
 
    // load the driver
    try {
      Class.forName(HIVESERVE2DRIVER);
    } catch (Exception e) {
      //LOGGER.error("Driver not found");
      System.out.println("Driver not found");
    }
 
    // get connected
    //LOGGER.info("");
    //LOGGER.info("Connecting...");
    System.out.println("");
    System.out.println("Connecting...");
    Connection conn = DriverManager.getConnection(url);
    Statement stmt = conn.createStatement();
    // HIVE only:
    //stmt.execute("SET hive.query.results.cache.enabled=false");
    // set fetch task if needed
    //stmt.execute("SET hive.fetch.task.conversion=more");
    //stmt.execute("SET hive.fetch.task.conversion.threshold=-1");
    stmt.setFetchSize(fSize);
    //LOGGER.info("command: " + sqlString);
    System.out.println("command: " + sqlString);
    //LOGGER.info("Executing...");
    System.out.println("Executing...");
    ResultSet res = stmt.executeQuery(sqlString);
    //LOGGER.info("Fetching...");
    System.out.println("Fetching...");
    ResultSetMetaData resMD = res.getMetaData();
    int cols = resMD.getColumnCount();
     while ( res.next() ) {
      for (int i = 1; i <= cols; i++) {
          if (i > 1) System.out.print(",  ");
              String value = res.getString(i);
              System.out.print(value);
          }
          System.out.println("");
    }
    //LOGGER.info("Exiting...");
    System.out.println("Exiting...");
    conn.close();
  }
}

build_java.sh

javac -d . hiveimpala_jdbc_tester.java -cp "/<path>/jars/*:."

set permissions & execute build_java.sh

chmod +x ./build_java.sh
./build_java.sh

run_tester.sh

# DEBUG: Prints all Hadoop and Hive configuration variables
#set -v
 
# Number of rows that should be fetched from the database
FETCHSIZE="10000"
 
# DEVE/TEST: For secure cluster with Kerberos authentication and TLS enabled; Here we allow the SSL certificate used by the server to be self-signed, by sending the AllowSelfSignedCerts property=1
#HS2="jdbc:hive2://<hs2_node>:10000/;ssl=true;AllowSelfSignedCerts=1;principal=hive/_HOST@<kerberos_realm>;"
#HS2="jdbc:hive2://<impala-load-balancer>:21050/;ssl=true;AllowSelfSignedCerts=1;principal=impala/_HOST@<kerberos_realm>;"
 
# PROD: For secure cluster with Kerberos authentication and TLS enabled; here we allow the SSL certificate used by the server to be signed by a Certificate Authority* (CA)
# To import the certificate to a truststore on a client in two steps as follows:
# Copy /opt/cloudera/CMCA/trust-store/cm-auto-global_cacerts.pem from CM server to localhost
# Run the following command to import the certificate to the truststore:
# keytool -import -keystore /tmp/certs.jks -alias autotls -file /<path>/certs/cm-auto-global_cacerts.pem -keypass changeit -storepass changeit -noprompt
# Certificate was added to keystore
#HS2="jdbc:hive2://<hs2_node>:10000/;ssl=true;sslTrustStore=/<path>/certs.jks;trustStorePassword=changeit;principal=impala/_HOST@<kerberos_realm>;"
HS2="jdbc:hive2://<impala-load-balancer>:21050/;ssl=true;sslTrustStore=/<path>/certs.jks;trustStorePassword=changeit;principal=impala/_HOST@<kerberos_realm>;"
 
KRB_PRINC="hiveimpalatester@<kerberos_realm>"
 
KRB_KEYTB="/etc/security/keytabs/hiveimpalatester.keytab"
 
time java -cp "/<path>/jars/*:." \
hiveimpala_jdbc_tester "${KRB_PRINC}" "${KRB_KEYTB}" "${HS2};fetchSize=${FETCHSIZE}" "${FETCHSIZE}" \
"select * from <db>.<table> limit 1" 2>&1

set permissions & execute run_tester.sh

chmod +x ./run_tester.sh
./run_tester.sh

Task: to present near real-time (or batch) enterprise search platform built on the Apache Lucene project

Cloudera Search offers the following methods for indexing data at scale:

  • NRT indexing (Lily HBase NRT indexing or Flume NRT indexing)
  • batch indexing (Spark or MapReduce indexing: MapReduceIndexerTool or Lily HBase batch indexing)

cloudera_search_solr-archi

Architecture: NRT Cloudera Search

Environment:

  • Hadoop (big data) cluster: Cloudera (either an existing Cloudera infrastructure or Cloudera Quickstart VM)
  • Cloudera Search services: HBase with ZooKeeper, Key-Value Store Indexer (Lily NRT HBase indexer) and Solr; supporting services: Cloudera Manager, Hue, HDFS and YARN (with MapReduce included)
  • data ingestion: e.g. Talend Open Studio / Solr UI update (optional)
  • testing: Solr UI select / SOAP UI (optional)

Setup:
a) creating/enabling HBase table ‘Solr_Test’ with column family ‘cities1000’ and enabling replication for Lily HBase NRT indexing

hbase shell
create 'Solr_Test', 'cities1000'
alter 'Solr_Test', {NAME => 'cities1000', REPLICATION_SCOPE => 1}
enable 'Solr_Test'

cloudera_search_solr-hbase_script

Listing: HBase shell

b) creating Solr collection ‘solr_test-collection’ and schema ‘schema.xml’

solrctl instancedir --generate $HOME/solr_test-collection

download: schema.xml

solrctl instancedir --create solr_test-collection $HOME/solr_test-collection
solrctl collection --create solr_test-collection

cloudera_search_solr-solr_script

Listing: Solr shell

c) creating Lily HBase configuration files: ‘morphlines.conf’ and ‘morphline-hbase-mapper.xml’ and adding indexer
download: morphlines.conf
download: morphline-hbase-mapper.xml

hbase-indexer add-indexer \
--name SolrTestIndexer \
--indexer-conf $HOME/solr_test-collection/conf/morphline-hbase-mapper.xml \
--connection-param solr.collection=solr_test-collection \
--zookeeper quickstart.cloudera:2181

cloudera_search_solr-lily_script

Listing: HBase indexer

hbase-indexer list-indexers

cloudera_search_solr-lily_running

Listing: HBase indexers

d) additional settings
URL: http://quickstart.cloudera:7180

Key-Value Store Indexer -> “logging”
log4j.logger.org.kitesdk.morphline=TRACE
log4j.logger.com.ngdata=TRACE

cloudera_search_solr-lily_logging

Lily: logging setting

Cloudera Manager -> Clusters -> Key-Value Store Indexer -> Configuration
Java Heap Size of Lily HBase Indexer in Bytes -> 50 MB -> e.g. 1 GB (based on the input)

cloudera_search_solr-lily_heapsize

Lily: heap size setting

Cloudera Manager -> Clusters -> HBase -> Configuration
Java Heap Size of HBase Master in Bytes -> 50 MB -> e.g. 1 GB (based on the input)

cloudera_search_solr-hbase_heapsize_master

HBase: heap size master setting

Java Heap Size of HBase RegionServer in Bytes -> 50 MB -> e.g. 1 GB (based on the input)

cloudera_search_solr-hbase_heapsize_region

HBase: heap size region setting

e) verifying that the indexer works
URL: http://quickstart.cloudera:8983

cloudera_search_solr-solr_output_empty

Solr: indexer (empty)

Note: HBase indexer log file: /var/log/hbase-solr/ lily-hbase-indexer*.log.out

f) data ingest

cloudera_search_solr-talend_loading

Talend: data ingestion

cloudera_search_solr-hbase_data

Hue -> HBase: ingested data

Optional Solr data ingest in form (note: not related to HBase part!):
http://quickstart.cloudera:8983/solr/solr_test-collection_shard1_replica1/update/csv?commit=true&separator=%09&fieldnames=id,name
,,alternative_names,latitude,longitude,,,countrycode,,,,,,population,elevation,,timezone,lastupdate&stream.file
=/home/cloudera/solr_test-collection/cities1000.txt&overwrite=true&stream.contentType=text/plain;charset=utf-8

g) testing
URL: http://quickstart.cloudera:8983

At this point, if you run data ingestion (e.g. via job in Talend), in a matter of few seconds (i.e. near real-time), you will receive new data as result to query in Solr.

cloudera_search_solr-solr_output_full

Solr: indexed data (documents)

cloudera_search_solr-solr_output_query

Solr: query result

Field q (in query) accepts format field:value and accepts wildcard symbols.

{
  "responseHeader": {
    "status": 0,
    "QTime": 1,
    "params": {
      "indent": "true",
      "q": "name:Botta*",
      "_": "1544275090709",
      "wt": "json"
    }
  },
  "response": {
    "numFound": 2,
    "start": 0,
    "docs": [
      {
        "countrycode": "IT",
        "alternative_names": "Bottanuco",
        "elevation": "222",
        "id": "3181668",
        "lastupdate": "2014-04-13",
        "timezone": "Europe/Rome",
        "name": "Bottanuco",
        "longitude": "9.50903",
        "latitude": "45.63931",
        "population": "5121",
        "_version_": 1619289130669179000
      },
      {
        "countrycode": "IT",
        "alternative_names": "Botta",
        "elevation": "",
        "id": "9036161",
        "lastupdate": "2014-05-20",
        "timezone": "Europe/Rome",
        "name": "Botta",
        "longitude": "9.53257",
        "latitude": "45.83222",
        "population": "751",
        "_version_": 1619289135325905000
      }
    ]
  }
}

Optional SOAP UI REST: http://quickstart.cloudera:8983/solr/solr_test-collection_shard1_replica1/select?q=name%3ABotta~&sort=score+desc%2C+name+asc&rows=6&fl=name%2C+score&wt=xml&indent=true

cloudera_search_solr-soapui_fuzzy

SOAP UI: REST query result

Abbreviations

  • EDH: enterprise data hub
  • DL: data lake
  • NRT: near real-time

Sources

References

EDH/DL vs EDW – Architecture Use Cases

Posted: January 13, 2018 in Hadoop
Tags:

Task: to compare EDH/DL vs. EDW and present architecture use cases based on main (IMHO known during writing) Apache Hadoop distributions: Cloudera (CDH) / Hortonworks (HDP)

EDH (source: Wikipedia)

A data hub is a collection of data from multiple sources organized for distribution, sharing, and often subsetting and sharing. Generally this data distribution is in the form of a hub and spoke architecture.

A data hub differs from a data warehouse in that it is generally unintegrated and often at different grains. It differs from an operational data store because a data hub does not need to be limited to operational data.

A data hub differs from a data lake by homogenizing data and possibly serving data in multiple desired formats, rather than simply storing it in one place, and by adding other value to the data such as de-duplication, quality, security, and a standardized set of query services. A Data Lake tends to store data in one place for availability, and allow/require the consumer to process or add value to the data.

DL (source: Wikipedia)

A data lake is a system or repository of data stored in its natural format, usually object blobs or files. A data lake is usually a single store of all enterprise data including raw copies of source system data and transformed data used for tasks such as reporting, visualization, analytics and machine learning. A data lake can include structured data from relational databases (rows and columns), semi-structured data (CSV, logs, XML, JSON), unstructured data (emails, documents, PDFs) and binary data (images, audio, video).

EDW (source: Wikipedia)

In computing, a data warehouse (DW or DWH), also known as an enterprise data warehouse (EDW), is a system used for reporting and data analysis, and is considered a core component of business intelligence. DWs are central repositories of integrated data from one or more disparate sources. They store current and historical data in one single place that are used for creating analytical reports for workers throughout the enterprise.

Lambda and Kappa Architectures

Lambda (source: Wikipedia)

Is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream-processing methods.

Kappa (source: Milinda Pathigage)

Is a software architecture pattern. Rather than using a relational DB like SQL or a key-value store like Cassandra, the canonical data store in a Kappa Architecture system is an append-only immutable log. From the log, data is streamed through a computational system and fed into auxiliary stores for serving.

Kappa Architecture is a simplification of Lambda Architecture. A Kappa Architecture system is like a Lambda Architecture system with the batch processing system removed. To replace batch processing, data is simply fed through the streaming system quickly.

Other sources:

Data Storage Formats / Data Storage Engines

  • Text (Data Storage Format)
  • Sequence File (Data Storage Format)
  • Apache Avro (Data Storage Format)
  • Apache Parquet (Data Storage Format)
  • Apache Optimized Row Columnar – ORC (Data Storage Format)
  • Apache HBase (Data Storage Engine)
  • Apache Kudu (Data Storage Engine)

Text:

  • More specifically text = csv, tsv, json records…
  • Convenient format to use to exchange with other applications or scripts that produce or read delimited files
  • Human readable and parsable
  • Data stores is bulky and not as efficient to query
  • Schema completely integrated with data
  • Do not support block compression

Sequence File:

  • Provides a persistent data structure for binary key-value pairs
  • Row based
  • Commonly used to transfer data between MapReduce jobs
  • Can be used as an archive to pack small files in Hadoop
  • Supports splitting even when the data is compressed

Apache Avro:

  • Widely used as a serialization platform
  • Row-based (row major format), offers a compact and fast binary format
  • Schema is encoded on the file, so the data can be untagged
  • Files support block compression and are splittable
  • Supports schema evolution
  • Supports nested data
  • No internal indexes (HDFS directory-based partitioning technique can be applied for fast random data access)

Apache Parquet:

  • Column-oriented binary file format (column major format suitable for efficient data analytics)
  • Uses the record shredding and assembly algorithm described in the Google’s Dremel paper
  • Each data file contains the values for a set of rows
  • Efficient in terms of disk I/O when specific columns need to be queried
  • Integrated compression (provides very good compaction ratios) and indexes
  • HDFS directory-based partitioning technique can be applied for fast random data access

Apache ORC – Optimized Row Columnar:

  • Considered the evolution of the RCFile (originally part of Hive)
  • Stores collections of rows and within the collection the row data is stored in columnar format
  • Introduces a lightweight indexing that enables skipping of irrelevant blocks of rows
  • Splittable: allows parallel processing of row collections
  • It comes with basic statistics on columns (min, max, sum, and count)
  • Integrated compression

Apache HBase:

  • Scalable and distributed NoSQL database on HDFS for storing key-value pairs (note: based on Google’s Bigtable) for hosting of very large tables: billions of rows X millions of columns
  • Keys are indexed which typically provides very quick access to the records
  • Suitable for: random, realtime read/write access to Big Data
  • Schemaless
  • Supports security labels

Apache Kudu:

  • Scalable and distributed table-based storage
  • Provides indexing and columnar data organization to achieve a good compromise between ingestion speed and analytics performance
  • Like in HBase case, Kudu APIs allows modifying the data already stored in the system

Data Storage Formats / Data Storage Engines Benchmarks:

Text (e.g. JSON): do not use it for processing!

Sequence File: MapReduce jobs relevant; not suitable to use it as a main data storage format!

Apache Avro: a fast-universal encoder for structured data. Due to very efficient serialization and deserialization, this format can guarantee very good performance whenever an access to all the attributes of a record is required at the same time – data transportation, staging areas etc.

Apache Parquet / Apache Kudu: columnar stores deliver very good flexibility between fast data ingestion, fast random data lookup and scalable data analytics, ensuring at the same time a system simplicity – only one technology for storing the data. Kudu excels faster random lookup when Parquet excels faster data scans and ingestion.

Apache ORC: minor differences in comparison to Apache Parquet (note: at the time of writing Impala does not support the ORC file format!)

Apache HBase: delivers very good random data access performance and the biggest flexibility in the way how data representations can be stored (schema-less tables). The performance of batch processing of HBase data heavily depends on a chosen data model and typically cannot compete on this field with the other technologies. Therefore, any analytics with HBase data should be performed rather rarely.

Alternatively to a single storage technology implementation, a hybrid system could be considered composed of a raw storage for batch processing (like Parquet) and indexing layer (like HBase) for random access. Notably, such approach comes at a price of data duplication and an overall complexity of a system architecture and higher maintenance costs. So, if a system simplicity is one of the important factors, Apache Kudu appears to be a good compromise.

Advantages / Disadvantages of “Row” and “Column” oriented Storages / Data Access Patterns:

  • In “row oriented” storage, the full contents of a record in a database is stored as a sequence of adjacent bytes. Reading a full record in row format is thus an efficient operation. However, reading the third column of each record in a file is not particularly efficient; disks read data in minimum amounts of 1 block (typically 4KB) which means that even if the exact location of the 3rd column of each record is known, lots of irrelevant data will be read and then discarded.
  • In the simplest form of “column oriented” storage, there is a separate file for each column in the table; for a single record each of its columns is written into a different file. Reading a full record in this format therefore requires reading a small amount of data from each file – not efficient. However, reading the third column of each record in a file is very efficient. There are ways of encoding data in “column-oriented” format which do not require file-per-column, but they all (in various ways) store column values from multiple records adjacent to each other.
  • Data access patterns which are oriented around reading whole records are best with “row oriented” formats. A typical example is a “call center” which retrieves a customer record and displays all fields of that record on the screen at once. Such applications often fall into the category “OLTP” (online transaction processing).
  • Queries which search large numbers of records for a small set of “matches” work well with “column oriented” formats. A typical example is “select count(*) from large_data_set where col3>10”. In this case, only col3 from the dataset is ever needed, and the “column oriented” layout minimises the total amount of disk reads needed. Operations which calculate sum/min/max and similar aggregate values over a set of records also work efficiently with column-oriented formats. Such applications often fall into the category “OLAP” (online analytics processing).
  • “Column oriented” storage also allows data to be compressed better than row-oriented formats; because all values in a column are adjacent, and they all have the same data type. A type specific compression algorithm can then be used (e.g. one specialized for compressing integers or dates or strings).
  • “Column oriented” storage does have a number of disadvantages. As noted earlier, reading a whole record is less efficient. Inserting records is also less efficient, as is deleting records. Supporting atomic and transactional behaviour is also more complex.

Infrastructure Overview (source: Cloudera)

Master Node (source: Cloudera)

Runs the Hadoop master daemons: NameNode, Standby NameNode, YARN Resource Manager and History Server, the HBase Master daemon, Sentry server, and the Impala StateStore Server and Catalog Server. Master nodes are also the location where Zookeeper and JournalNodes are installed. The daemons can often share single pool of servers. Depending on the cluster size, the roles can instead each be run on a dedicated server. Kudu Master Servers should also be deployed on master nodes.

Worker Node (source: Cloudera)

Runs the HDFS DataNode, YARN NodeManager, HBase RegionServer, Impala impalad, Search worker daemons and Kudu Tablet Servers.

Edge Node (source: Cloudera)

Contains all client-facing configurations and services, including gateway configurations for HDFS, YARN, Impala, Hive, and HBase. The edge node is also a good place for Hue, Oozie, HiveServer2, and Impala HAProxy. HiveServer2 and Impala HAProxy serve as a gateway to external applications such as Business Intelligence (BI) tools.

Utility Node (source: Cloudera)

Runs Cloudera Manager and the Cloudera Management Services. It can also host a MySQL (or another supported) database instance, which is used by Cloudera Manager, Hive, Sentry and other Hadoop-related projects.edh-dl_edw_architecture-use-cases_hdp-v3.png

Figure: Hortonworks Data Platform (source: Hortonworks)

Reference Vendor’s Infrastructure Architectures:

Security Overview (source: Cloudera)

Apache Atlas: Data Governance and Metadata framework for Hadoop: NOT supported by CDH platform; use CDH Navigator instead

Apache Knox: REST API and Application Gateway for the Apache Hadoop Ecosystem: NOT supported by CDH platform; a standard firewall will give you more or less the same functionality with respect to network security. More advanced security (authorization, authentication, encryption) are provided by other components in the stack (Kerberos, Sentry, HDFS encryption, etc.)

Apache Metron: Real-time big data security (cyber-crime) : NOT supported by CDH platform; use http://spot.incubator.apache.org/ instead

Apache Ranger: Framework to enable, monitor and manage comprehensive data security across the Hadoop platform: NOT supported by CDH platform; use Apache Sentry instead

Apache Sentry: Is a system for enforcing fine grained role based authorization to data and metadata stored on a Hadoop cluster: SUPPORTED by CDH platform; you can use Sentry or Ranger depends upon what Hadoop distribution tool that you are using like Cloudera or Hortonworks (Apache Sentry – Owned by Cloudera and Apache Ranger – Owned by Hortonworks; Ranger will not support Impala)

edh-dl_edw_architecture-use-cases_security-facets.png

Figure: Security overview (source: Cloudera)

edh-dl_edw_architecture-use-cases_security-ref-arch.png

Figure: Security architecture (source: Cloudera)

edh-dl_edw_architecture-use-cases_securing-and-governing-a-multitenant-data-lake.jpg

Figure: Securing and governing a multi-tenant data lake (source: Dataworks Summit)

References:

Big data introduction II

Posted: February 22, 2017 in Hadoop
Tags:

Big data introduction

Posted: November 29, 2016 in Hadoop
Tags: