Spark/Scala – Avro2Parquet HDFS Converter

Posted: January 4, 2022 in Hadoop
Tags:

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292

Avro2Parquet

avro2parquet.scala (path: /avro2parquet/src/main/scala/example/)

package eu.placko.examples.spark

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.hadoop.fs.{Path}

object Avro2Parquet {
def main(args: Array[String]) {
  if (args.length != 2) {
    println("wrong input! usage: /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet")
    return
  }

  val avroPath = new Path(args(0))
  val parquetPath = new Path(args(1))

  val spark: SparkSession = SparkSession.builder()
    //.master("local[1]")
    //.master("yarn")
    .appName("Avro2Parquet")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  try {
    //read avro file
    val df = spark.read.format("avro")
      .load(avroPath + "/" + "episodes.avro")
    df.show()
    df.printSchema()

    //convert to parquet
    df.write.mode(SaveMode.Overwrite)
      .parquet(parquetPath + "/" + "episodes.parquet")
  } catch {
    case e: Exception => println("Exception: " + e);
  }
}
}

notes:

Dependencies.scala (path: /avro2parquet/project/)

import sbt._

object Dependencies {
  val sparkVersion = "2.4.0"
  lazy val sparkAvro = Seq(
    "org.apache.spark" %% "spark-sql" % sparkVersion,
    "org.apache.spark" %%  "spark-avro" % sparkVersion
)}

build.sbt (path: /avro2parquet/)

import Dependencies._

ThisBuild / scalaVersion    := "2.11.12"
ThisBuild / version    := "0.1.0"
ThisBuild / organization    := "eu.placko"
ThisBuild / organizationName    := "examples.spark"

lazy val root = (project in file("."))
  .settings(
    name := "avro2parquet",
    version := "0.1.0",
    libraryDependencies ++= sparkAvro
)

// Uncomment the following for publishing to Sonatype.
// See https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html for more detail.

// ThisBuild / description := "Some descripiton about your project."
// ThisBuild / licenses    := List("Apache 2" -> new URL("http://www.apache.org/licenses/LICENSE-2.0.txt"))
// ThisBuild / homepage    := Some(url("https://github.com/example/project"))
// ThisBuild / scmInfo := Some(
//   ScmInfo(
//     url("https://github.com/your-account/your-project"),
//     "scm:git@github.com:your-account/your-project.git"
//   )
// )
// ThisBuild / developers := List(
//   Developer(
//     id    = "Your identifier",
//     name  = "Your Name",
//     email = "your@email",
//     url   = url("http://your.url")
//   )
// )
// ThisBuild / pomIncludeRepository := { _ => false }
// ThisBuild / publishTo := {
//   val nexus = "https://oss.sonatype.org/"
//   if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
//   else Some("releases" at nexus + "service/local/staging/deploy/maven2")
// }
// ThisBuild / publishMavenStyle := true

BUILD (path: /avro2parquet/sbt)

./sbt/bin/sbt compile
#./sbt/bin/sbt run
./sbt/bin/sbt package

notes:

EXECUTE avro2parquet.sh (path: /avro2parquet)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.Avro2Parquet

appVersion="0.1.0"
appArtifact="/<REPLACE>/avro2parquet/target/scala-2.11/avro2parquet_2.11-$appVersion.jar /user/<REPLACE>/avro2parquet/avro /user/<REPLACE>/avro2parquet/parquet"
log4j_setting="-Dlog4j.configuration=file:///<REPLACE>/avro2parquet/conf/log4j.xml"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    --conf "spark.driver.extraJavaOptions=${log4j_setting}" \
    --conf "spark.executor.extraJavaOptions=${log4j_setting}" \
    $appArtifact

exit 0;

notes (source: https://sparkbyexamples.com/spark/sparksession-explained-with-examples/):

  • master() – If you are running it on the cluster you need to use your master name as an argument to master(). Usually, it would be either yarn or mesos depends on your cluster setup.
  • Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
  • and
  • https://sparkbyexamples.com/spark/spark-deploy-modes-client-vs-cluster/

Source Code

https://github.com/mplacko/avro2parquet

Additional Info

Spark-Shell Example

# Kerberos
# su - <user>
# kinit -kt /etc/security/keytabs/<user>.kaytab <user>

Start Spark-shell

spark-shell

Load dataframe from Parquet

var df = spark.read.format("parquet").option("inferSchema", "true").load("/user/<user>/avro2parquet/parquet/episodes.parquet")

Save dataframe as Parquet

import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet("/user/<user>/avro2parquet/parquet/checksum")

Print dataframe schema

df.printSchema

scala> df.printSchema
root
 |-- title: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- doctor: integer (nullable = true)

Add column to dataframe

# add checksum column by performing MD5 hash on concatenation of all columns
df = df.withColumn("checksum", md5(concat_ws(",", col("title"), col("air_date"), col("doctor"))))

Print dataframe schema

df.printSchema

scala> df.printSchema
root
 |-- title: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- doctor: integer (nullable = true)
 |-- checksum: string (nullable = false)

Select from dataframe

df.select("checksum").show(false)

scala> df.select("checksum").show(false)
+--------------------------------+
|checksum                        |
+--------------------------------+
|60723b412082c7ecca173420db2dd620|
|a6e1f1847fff5d7795bb4ffadf7c337d|
|25ff41e3d2507c37fbcc24ab571e55d6|
|2deea188b3e6b7d654db4f98baf7cdcf|
|ab0e985ea1588b0a3e34b80dae197391|
|4901b9d1fc3f47f6b3283c7bc6e3cf8b|
|6a43f326e1a823108ee84f97cf2f59ba|
|984d29d372f8f10f3edbb0fa26b096cb|
+--------------------------------+
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s