An example for reading (querying) and writing a parquet file and creating hive table.
Prerequisites
- OS: Linux (RHEL 7.9)
- Hadoop: Cloudera (CDH 6.1.1)
- Scala: 2.11.12
- Spark: 2.4.0
- Authentication via Kerberos
- OpenJDK 64-Bit 1.8.0_292
- IDE (for Windows 10 Enterprise): Eclipse IDE for Java Developers – 2020-09
SparkSql2Parquet
SparkConfig.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/config/)
package eu.placko.examples.spark.config;
import org.apache.spark.sql.SparkSession;
import java.io.File;
public class SparkConfig {
public static SparkSession createSparkSession() {
String episodesLocation = new File("episodes").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("sparksql2parquet")
//.master("local[1]")
//.master("yarn")
.config("spark.sql.broadcastTimeout","36000")
.config("spark.sql.warehouse.dir", episodesLocation)
.enableHiveSupport()
.getOrCreate();
return spark;
}
}
QueryBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)
package eu.placko.examples.spark.sql;
public class QueryBuilder {
private Long limit = 0L;
public QueryBuilder setLimit(Long limit) {
this.limit = limit;
return this;
}
public String build() {
String sql = "SELECT title FROM tmp_episodes";
if (limit != null && limit.longValue() > 0) {
sql = sql + " limit " + limit;
}
return sql;
}
}
HiveBuilder.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/sql/)
package eu.placko.examples.spark.sql;
public class HiveBuilder {
public String buildDB() {
String sql = "CREATE DATABASE IF NOT EXISTS episodes LOCATION '/user/hive/databases/episodes.db'";
return sql;
}
public String buildTB() {
String sql = "DROP TABLE IF EXISTS episodes.episodes";
return sql;
}
public String buildTB(String path) {
String sql = "CREATE EXTERNAL TABLE episodes.episodes(title string) STORED AS PARQUET LOCATION 'hdfs://nameservice1" + path + "episodes_titles_only.parquet'";
return sql;
}
}
NOTES:
- nameservice1 -> HA (High Availability)
- PARTITIONED BY (title string)
- SHOW CREATE TABLE episodes.episodes
SparkSql2Parquet.java (path: /sparksql2parquet/src/main/java/eu/placko/examples/spark/app/)
package eu.placko.examples.spark.app;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;
import eu.placko.examples.spark.config.SparkConfig;
import eu.placko.examples.spark.sql.QueryBuilder;
import eu.placko.examples.spark.sql.HiveBuilder;
public class SparkSql2Parquet {
private final static Logger LOGGER = Logger.getLogger(SparkSql2Parquet.class.getName());
public static void main(String[] args) {
if (args.length != 1) {
LOGGER.info("wrong input! usage: /user/<REPLACE>/sparksql2parquet/");
return;
}
String path = args[0];
SparkSession sparkSession = SparkConfig.createSparkSession();
LOGGER.info("Started Spark Session");
Dataset<Row> parquetFileDF = sparkSession.read().parquet(path + "episodes.parquet");
parquetFileDF.createOrReplaceTempView("tmp_episodes");
LOGGER.info("Created temp view");
String query = new QueryBuilder().setLimit(null).build();
Dataset<Row> episodesDF = sparkSession.sql(query);
episodesDF
//.select("title")
.write()
.mode(SaveMode.Overwrite)
.parquet(path + "episodes_titles_only.parquet");
LOGGER.info("Written to the new parquet");
String hive = new HiveBuilder().buildDB();
sparkSession.sql(hive);
LOGGER.info("Created Hive DB if not exists");
hive = new HiveBuilder().buildTB();
sparkSession.sql(hive);
LOGGER.info("Dropped Hive table if exists");
hive = new HiveBuilder().buildTB(path);
sparkSession.sql(hive);
LOGGER.info("Created Hive table over the parquet");
}
}
NOTE: “episodes.parquet” does not need to be a parquet file directly; it can be a directory and inside will be responsible parquet files
e.g.:
/user/<REPLACE>/sparksql2parquet/episodes.parquet/
part-00000-d26aa3a3-ef94-422f-933e-d230b345cad3-c000.snappy.parquet
sparksql2parquet.sh (path: /sparksql2parquet/scripts/)
#!/bin/sh
on_error() {
printf "\n\nAn error occurred!\n";
exit 1;
}
trap on_error ERR
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.app.SparkSql2Parquet
appVersion="1.0.0-SNAPSHOT"
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"
echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"
# only for "testing/debugging" purposes --deploy-mode client \
spark-submit \
--master yarn \
--deploy-mode cluster \
--class $appClass \
--conf spark.executor.memory=12G \
--conf spark.driver.memory=4G \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=5 \
--conf spark.executor.cores=5 \
$appArtifact
exit 0;
README.md (path: /sparksql2parquet/)
HOW TO CONFIGURE THE PROJECT
sparksql2parquet.sh
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appArtifact="/<REPLACE>/sparksql2parquet-$appVersion.jar /user/<REPLACE>/sparksql2parquet/"
INPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes.parquet
parquet location: \src\main\resources
avro source: downloaded from https://github.com/apache/hive/blob/master/data/files/episodes.avro
OUTPUT
HDFS: /user/<REPLACE>/sparksql2parquet/episodes_titles_only.parquet
Building and Running
Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install
Submit to Spark cluster
For running spark application see shell script inside /scripts dir.
Source Code
https://github.com/mplacko/sparksql2parquet
Additional Info
- Performance/Partitioning
- Hive Tables