Prerequisites
- OS: Linux (RHEL 7.9)
- Hadoop: Cloudera (CDH 6.1.1)
- Scala: 2.11.12
- Spark: 2.4.0
- Authentication via Kerberos
- OpenJDK 64-Bit 1.8.0_292
Avro2Parquet
avro2parquet.scala (path: /avro2parquet/src/main/scala/example/)
package eu.placko.examples.spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hadoop.fs.{Path}
object Avro2Parquet {
def main(args: Array[String]) {
if (args.length != 2) {
println("wrong input! usage: /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet")
return
}
val avroPath = new Path(args(0))
val parquetPath = new Path(args(1))
val spark: SparkSession = SparkSession.builder()
//.master("local[1]")
//.master("yarn")
.appName("Avro2Parquet")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
try {
//read avro file
val df = spark.read.format("avro")
.load(avroPath + "/" + "episodes.avro")
df.show()
df.printSchema()
//convert to parquet
df.write.mode(SaveMode.Overwrite)
.parquet(parquetPath + "/" + "episodes.parquet")
} catch {
case e: Exception => println("Exception: " + e);
}
}
}
notes:
Dependencies.scala (path: /avro2parquet/project/)
import sbt._
object Dependencies {
val sparkVersion = "2.4.0"
lazy val sparkAvro = Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-avro" % sparkVersion
)}
build.sbt (path: /avro2parquet/)
import Dependencies._
ThisBuild / scalaVersion := "2.11.12"
ThisBuild / version := "0.1.0"
ThisBuild / organization := "eu.placko"
ThisBuild / organizationName := "examples.spark"
lazy val root = (project in file("."))
.settings(
name := "avro2parquet",
version := "0.1.0",
libraryDependencies ++= sparkAvro
)
// Uncomment the following for publishing to Sonatype.
// See https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html for more detail.
// ThisBuild / description := "Some descripiton about your project."
// ThisBuild / licenses := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
// ThisBuild / homepage := Some(url("https://github.com/example/project"))
// ThisBuild / scmInfo := Some(
// ScmInfo(
// url("https://github.com/your-account/your-project"),
// "scm:git@github.com:your-account/your-project.git"
// )
// )
// ThisBuild / developers := List(
// Developer(
// id = "Your identifier",
// name = "Your Name",
// email = "your@email",
// url = url("http://your.url")
// )
// )
// ThisBuild / pomIncludeRepository := { _ => false }
// ThisBuild / publishTo := {
// val nexus = "https://oss.sonatype.org/"
// if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
// else Some("releases" at nexus + "service/local/staging/deploy/maven2")
// }
// ThisBuild / publishMavenStyle := true
BUILD (path: /avro2parquet/sbt)
./sbt/bin/sbt compile
#./sbt/bin/sbt run
./sbt/bin/sbt package
notes:
- https://www.scala-sbt.org/download.html -> All platforms -> sbt-1.6.1.zip
EXECUTE avro2parquet.sh (path: /avro2parquet)
#!/bin/sh
on_error() {
printf "\n\nAn error occurred!\n";
exit 1;
}
trap on_error ERR
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.Avro2Parquet
appVersion="0.1.0"
appArtifact="/<REPLACE>/avro2parquet/target/scala-2.11/avro2parquet_2.11-$appVersion.jar /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet"
log4j_setting="-Dlog4j.configuration=file:///<REPLACE>/avro2parquet/conf/log4j.xml"
echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"
# only for "testing/debugging" purposes --deploy-mode client \
spark-submit \
--master yarn \
--deploy-mode cluster \
--class $appClass \
--conf spark.executor.memory=12G \
--conf spark.driver.memory=4G \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=5 \
--conf spark.executor.cores=5 \
--conf "spark.driver.extraJavaOptions=${log4j_setting}" \
--conf "spark.executor.extraJavaOptions=${log4j_setting}" \
$appArtifact
exit 0;
notes (source: https://sparkbyexamples.com/spark/sparksession-explained-with-examples/):
- master() – If you are running it on the cluster you need to use your master name as an argument to master(). Usually, it would be either yarn or mesos depends on your cluster setup.
- Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
- and
- https://sparkbyexamples.com/spark/spark-deploy-modes-client-vs-cluster/
Source Code
https://github.com/mplacko/avro2parquet
Additional Info
- https://spark.apache.org/
- https://sparkbyexamples.com/
- https://docs.databricks.com/data/data-sources/read-avro.html
- https://docs.databricks.com/data/data-sources/read-parquet.html
- https://repo1.maven.org/maven2/org/apache/spark/
- https://www.scala-sbt.org/
- https://github.com/scala-ide/scala-ide/
- https://github.com/mukunku/ParquetViewer/
- https://zymeworks.github.io/avro-viewer/
- https://github.com/Eugene-Mark/bigdata-file-viewer/
Spark-Shell Example
# Kerberos
# su - <user>
# kinit -kt /etc/security/keytabs/<user>.kaytab <user>
Start Spark-shell
spark-shell
Load dataframe from Parquet
var df = spark.read.format("parquet").option("inferSchema", "true").load("/user/<user>/avro2parquet/parquet/episodes.parquet")
Save dataframe as Parquet
import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet("/user/<user>/avro2parquet/parquet/checksum")
Print dataframe schema
df.printSchema
scala> df.printSchema
root
|-- title: string (nullable = true)
|-- air_date: string (nullable = true)
|-- doctor: integer (nullable = true)
Add column to dataframe
# add checksum column by performing MD5 hash on concatenation of all columns
df = df.withColumn("checksum", md5(concat_ws(",", col("title"), col("air_date"), col("doctor"))))
Print dataframe schema
df.printSchema
scala> df.printSchema
root
|-- title: string (nullable = true)
|-- air_date: string (nullable = true)
|-- doctor: integer (nullable = true)
|-- checksum: string (nullable = false)
Select from dataframe
df.select("checksum").show(false)
scala> df.select("checksum").show(false)
+--------------------------------+
|checksum |
+--------------------------------+
|60723b412082c7ecca173420db2dd620|
|a6e1f1847fff5d7795bb4ffadf7c337d|
|25ff41e3d2507c37fbcc24ab571e55d6|
|2deea188b3e6b7d654db4f98baf7cdcf|
|ab0e985ea1588b0a3e34b80dae197391|
|4901b9d1fc3f47f6b3283c7bc6e3cf8b|
|6a43f326e1a823108ee84f97cf2f59ba|
|984d29d372f8f10f3edbb0fa26b096cb|
+--------------------------------+