Spark/Java – SparkSql2Parquet – Tutorial

Posted: January 5, 2022 in Hadoop
Tags:

An example for reading (querying) and writing a parquet file and creating hive table.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292
  • IDE (for Windows 10 Enterprise): Eclipse IDE for Java Developers – 2020-09

SparkSql2Parquet

SparkConfig.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
import java.io.File;

public class SparkConfig {
	public static SparkSession createSparkSession() {
		String episodesLocation = new File("episodes").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("sparksql2parquet")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				.config("spark.sql.warehouse.dir", episodesLocation)
				.enableHiveSupport()
				.getOrCreate();	
		return spark;
	}
}

QueryBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class QueryBuilder {
	private Long limit = 0L;

	public QueryBuilder setLimit(Long limit) {
		this.limit = limit;
		return this;
	}

	public String build() {
		String sql = "SELECT title FROM tmp_episodes";
		if (limit != null && limit.longValue() > 0) {
			sql = sql + " limit " + limit;
		}
		return sql;
	}
}

HiveBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class HiveBuilder {
	public String buildDB() {
		String sql = "CREATE DATABASE IF NOT EXISTS episodes LOCATION '/user/hive/databases/episodes.db'";
		return sql;
	}
	
	public String buildTB() {
		String sql = "DROP TABLE IF EXISTS episodes.episodes";
		return sql;
	}
	
	public String buildTB(String path) {
		String sql = "CREATE EXTERNAL TABLE episodes.episodes(title string) STORED AS PARQUET LOCATION 'hdfs://nameservice1" + path + "episodes_titles_only.parquet'";
		return sql;
	}
}

NOTES:

  • nameservice1 -> HA (High Availability)
  • PARTITIONED BY (title string)
  • SHOW CREATE TABLE episodes.episodes

SparkSql2Parquet.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/app/)

package eu.placko.examples.spark.app;

import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;

import eu.placko.examples.spark.config.SparkConfig;
import eu.placko.examples.spark.sql.QueryBuilder;
import eu.placko.examples.spark.sql.HiveBuilder;

public class SparkSql2Parquet {
	private final static Logger LOGGER = Logger.getLogger(SparkSql2Parquet.class.getName());
	
	public static void main(String[] args) {
		if (args.length != 1) {
		    LOGGER.info("wrong input! usage: /user/<REPLACE>/sparksql2parquet/");
		    return;
		}
		String path = args[0];
		
		SparkSession sparkSession = SparkConfig.createSparkSession();
		LOGGER.info("Started Spark Session");
		
		Dataset<Row> parquetFileDF = sparkSession.read().parquet(path + "episodes.parquet");
		parquetFileDF.createOrReplaceTempView("tmp_episodes");
		LOGGER.info("Created temp view");
		
		String query = new QueryBuilder().setLimit(null).build();
		Dataset<Row> episodesDF = sparkSession.sql(query);
		episodesDF
			//.select("title")
			.write()
			.mode(SaveMode.Overwrite)
			.parquet(path + "episodes_titles_only.parquet");
		LOGGER.info("Written to the new parquet");
		
		String hive = new HiveBuilder().buildDB();
		sparkSession.sql(hive);
		LOGGER.info("Created Hive DB if not exists");
		
		hive = new HiveBuilder().buildTB();
		sparkSession.sql(hive);
		LOGGER.info("Dropped Hive table if exists");
		
		hive = new HiveBuilder().buildTB(path);
		sparkSession.sql(hive);
		LOGGER.info("Created Hive table over the parquet");
	}
}

NOTE: “episodes.parquet” does not need to be a parquet file directly; it can be a directory and inside will be responsible parquet files

e.g.:

/user/<REPLACE>/sparksql2parquet/episodes.parquet/
part-00000-d26aa3a3-ef94-422f-933e-d230b345cad3-c000.snappy.parquet

sparksql2parquet.sh (path: /sparksql2parquet/scripts/)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.app.SparkSql2Parquet

appVersion="1.0.0-SNAPSHOT"
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    $appArtifact

exit 0;

README.md (path: /sparksql2parquet/)

HOW TO CONFIGURE THE PROJECT

sparksql2parquet.sh
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"

INPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes.parquet
parquet location: \src\main\resources
avro source: downloaded from https://github.com/apache/hive/blob/master/data/files/episodes.avro

OUTPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes_titles_only.parquet


Building and Running

Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install

Submit to Spark cluster
For running spark application see shell script inside /scripts dir.

Source Code

https://github.com/mplacko/sparksql2parquet

Additional Info

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s