Archive for January, 2022

An example for reading (querying) and writing a parquet file and creating hive table.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292
  • IDE (for Windows 10 Enterprise): Eclipse IDE for Java Developers – 2020-09

SparkSql2Parquet

SparkConfig.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
import java.io.File;

public class SparkConfig {
	public static SparkSession createSparkSession() {
		String episodesLocation = new File("episodes").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("sparksql2parquet")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				.config("spark.sql.warehouse.dir", episodesLocation)
				.enableHiveSupport()
				.getOrCreate();	
		return spark;
	}
}

QueryBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class QueryBuilder {
	private Long limit = 0L;

	public QueryBuilder setLimit(Long limit) {
		this.limit = limit;
		return this;
	}

	public String build() {
		String sql = "SELECT title FROM tmp_episodes";
		if (limit != null && limit.longValue() > 0) {
			sql = sql + " limit " + limit;
		}
		return sql;
	}
}

HiveBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class HiveBuilder {
	public String buildDB() {
		String sql = "CREATE DATABASE IF NOT EXISTS episodes LOCATION '/user/hive/databases/episodes.db'";
		return sql;
	}
	
	public String buildTB() {
		String sql = "DROP TABLE IF EXISTS episodes.episodes";
		return sql;
	}
	
	public String buildTB(String path) {
		String sql = "CREATE EXTERNAL TABLE episodes.episodes(title string) STORED AS PARQUET LOCATION 'hdfs://nameservice1" + path + "episodes_titles_only.parquet'";
		return sql;
	}
}

NOTES:

  • nameservice1 -> HA (High Availability)
  • PARTITIONED BY (title string)
  • SHOW CREATE TABLE episodes.episodes

SparkSql2Parquet.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/app/)

package eu.placko.examples.spark.app;

import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;

import eu.placko.examples.spark.config.SparkConfig;
import eu.placko.examples.spark.sql.QueryBuilder;
import eu.placko.examples.spark.sql.HiveBuilder;

public class SparkSql2Parquet {
	private final static Logger LOGGER = Logger.getLogger(SparkSql2Parquet.class.getName());
	
	public static void main(String[] args) {
		if (args.length != 1) {
		    LOGGER.info("wrong input! usage: /user/<REPLACE>/sparksql2parquet/");
		    return;
		}
		String path = args[0];
		
		SparkSession sparkSession = SparkConfig.createSparkSession();
		LOGGER.info("Started Spark Session");
		
		Dataset<Row> parquetFileDF = sparkSession.read().parquet(path + "episodes.parquet");
		parquetFileDF.createOrReplaceTempView("tmp_episodes");
		LOGGER.info("Created temp view");
		
		String query = new QueryBuilder().setLimit(null).build();
		Dataset<Row> episodesDF = sparkSession.sql(query);
		episodesDF
			//.select("title")
			.write()
			.mode(SaveMode.Overwrite)
			.parquet(path + "episodes_titles_only.parquet");
		LOGGER.info("Written to the new parquet");
		
		String hive = new HiveBuilder().buildDB();
		sparkSession.sql(hive);
		LOGGER.info("Created Hive DB if not exists");
		
		hive = new HiveBuilder().buildTB();
		sparkSession.sql(hive);
		LOGGER.info("Dropped Hive table if exists");
		
		hive = new HiveBuilder().buildTB(path);
		sparkSession.sql(hive);
		LOGGER.info("Created Hive table over the parquet");
	}
}

NOTE: “episodes.parquet” does not need to be a parquet file directly; it can be a directory and inside will be responsible parquet files

e.g.:

/user/<REPLACE>/sparksql2parquet/episodes.parquet/
part-00000-d26aa3a3-ef94-422f-933e-d230b345cad3-c000.snappy.parquet

sparksql2parquet.sh (path: /sparksql2parquet/scripts/)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.app.SparkSql2Parquet

appVersion="1.0.0-SNAPSHOT"
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    $appArtifact

exit 0;

README.md (path: /sparksql2parquet/)

HOW TO CONFIGURE THE PROJECT

sparksql2parquet.sh
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

INPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes.parquet
parquet location: \src\main\resources
avro source: downloaded from https://github.com/apache/hive/blob/master/data/files/episodes.avro

OUTPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes_titles_only.parquet


Building and Running

Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install

Submit to Spark cluster
For running spark application see shell script inside /scripts dir.

Source Code

https://github.com/mplacko/sparksql2parquet

Additional Info

Advertisement

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

Avro2Parquet

avro2parquet.scala (path: /avro2parquet/src/main/scala/example/)

package eu.placko.examples.spark

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hadoop.fs.{Path}

object Avro2Parquet {
def main(args: Array[String]) {
  if (args.length != 2) {
    println("wrong input! usage: /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet")
    return
  }

  val avroPath = new Path(args(0))
  val parquetPath = new Path(args(1))

  val spark: SparkSession = SparkSession.builder()
    //.master("local[1]")
    //.master("yarn")
    .appName("Avro2Parquet")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  try {
    //read avro file
    val df = spark.read.format("avro")
      .load(avroPath + "/" + "episodes.avro")
    df.show()
    df.printSchema()

    //convert to parquet
    df.write.mode(SaveMode.Overwrite)
      .parquet(parquetPath + "/" + "episodes.parquet")
  } catch {
    case e: Exception => println("Exception: " + e);
  }
}
}

notes:

Dependencies.scala (path: /avro2parquet/project/)

import sbt._

object Dependencies {
  val sparkVersion = "2.4.0"
  lazy val sparkAvro = Seq(
    "org.apache.spark" %% "spark-sql" % sparkVersion,
    "org.apache.spark" %%  "spark-avro" % sparkVersion
)}

build.sbt (path: /avro2parquet/)

import Dependencies._

ThisBuild / scalaVersion    := "2.11.12"
ThisBuild / version    := "0.1.0"
ThisBuild / organization    := "eu.placko"
ThisBuild / organizationName    := "examples.spark"

lazy val root = (project in file("."))
  .settings(
    name := "avro2parquet",
    version := "0.1.0",
    libraryDependencies ++= sparkAvro
)

// Uncomment the following for publishing to Sonatype.
// See https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html for more detail.

// ThisBuild / description := "Some descripiton about your project."
// ThisBuild / licenses    := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
// ThisBuild / homepage    := Some(url("https://github.com/example/project"))
// ThisBuild / scmInfo := Some(
//   ScmInfo(
//     url("https://github.com/your-account/your-project"),
//     "scm:git@github.com:your-account/your-project.git"
//   )
// )
// ThisBuild / developers := List(
//   Developer(
//     id    = "Your identifier",
//     name  = "Your Name",
//     email = "your@email",
//     url   = url("http://your.url")
//   )
// )
// ThisBuild / pomIncludeRepository := { _ => false }
// ThisBuild / publishTo := {
//   val nexus = "https://oss.sonatype.org/"
//   if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
//   else Some("releases" at nexus + "service/local/staging/deploy/maven2")
// }
// ThisBuild / publishMavenStyle := true

BUILD (path: /avro2parquet/sbt)

./sbt/bin/sbt compile
#./sbt/bin/sbt run
./sbt/bin/sbt package

notes:

EXECUTE avro2parquet.sh (path: /avro2parquet)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.Avro2Parquet

appVersion="0.1.0"
appArtifact="/<REPLACE>/avro2parquet/target/scala-2.11/avro2parquet_2.11-$appVersion.jar /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet"
log4j_setting="-Dlog4j.configuration=file:///<REPLACE>/avro2parquet/conf/log4j.xml"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    --conf "spark.driver.extraJavaOptions=${log4j_setting}" \
    --conf "spark.executor.extraJavaOptions=${log4j_setting}" \
    $appArtifact

exit 0;

notes (source: https://sparkbyexamples.com/spark/sparksession-explained-with-examples/):

  • master() – If you are running it on the cluster you need to use your master name as an argument to master(). Usually, it would be either yarn or mesos depends on your cluster setup.
  • Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
  • and
  • https://sparkbyexamples.com/spark/spark-deploy-modes-client-vs-cluster/

Source Code

https://github.com/mplacko/avro2parquet

Additional Info

Spark-Shell Example

# Kerberos
# su - <user>
# kinit -kt /etc/security/keytabs/<user>.kaytab <user>

Start Spark-shell

spark-shell

Load dataframe from Parquet

var df = spark.read.format("parquet").option("inferSchema", "true").load("/user/<user>/avro2parquet/parquet/episodes.parquet")

Save dataframe as Parquet

import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet("/user/<user>/avro2parquet/parquet/checksum")

Print dataframe schema

df.printSchema

scala> df.printSchema
root
 |-- title: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- doctor: integer (nullable = true)

Add column to dataframe

# add checksum column by performing MD5 hash on concatenation of all columns
df = df.withColumn("checksum", md5(concat_ws(",", col("title"), col("air_date"), col("doctor"))))

Print dataframe schema

df.printSchema

scala> df.printSchema
root
 |-- title: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- doctor: integer (nullable = true)
 |-- checksum: string (nullable = false)

Select from dataframe

df.select("checksum").show(false)

scala> df.select("checksum").show(false)
+--------------------------------+
|checksum                        |
+--------------------------------+
|60723b412082c7ecca173420db2dd620|
|a6e1f1847fff5d7795bb4ffadf7c337d|
|25ff41e3d2507c37fbcc24ab571e55d6|
|2deea188b3e6b7d654db4f98baf7cdcf|
|ab0e985ea1588b0a3e34b80dae197391|
|4901b9d1fc3f47f6b3283c7bc6e3cf8b|
|6a43f326e1a823108ee84f97cf2f59ba|
|984d29d372f8f10f3edbb0fa26b096cb|
+--------------------------------+