Parquet – Schema Evolution (Schema Merging) in Spark

Posted: March 24, 2022 in Hadoop
Tags:

An example for explaining schema evolution with Parquet (column based) binary data format. Starting with a simple schema, adding new column, deleting existing column and ending up with multiple Parquet files with different but compatible schemas.

Prerequisites

  • OS: Linux (RHEL 7.9)
  • Hadoop: Cloudera (CDH 6.1.1)
  • Scala: 2.11.12
  • Spark: 2.4.0
  • Authentication via Kerberos
  • OpenJDK 64-Bit 1.8.0_292
  • IDE (for Windows 10 Enterprise): Eclipse IDE for Java Developers – 2020-09

In Spark, Parquet data storage is able automatically detect schema changes and merge schemas of all these Parquet files what is internally called as schema merging. Without automatic schema merging, the typical way of handling schema evolution is through historical data reload that requires much work.
The schema merging is a relatively expensive operation and therefore this feature is turned off by default.
It can be enabled by:

  1. setting the data source option mergeSchema to true when reading Parquet files
    or
  2. setting the global SQL option spark.sql.parquet.mergeSchema to true

ParquetSchemaMerging

Schema v1: SchemaVersion1.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/datafactory/)

package eu.placko.examples.spark.datafactory;

import java.io.Serializable;

public class SchemaVersion1 implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String col_a;

    public Integer getId() {return id;}
    public String getCol_A() {return col_a;}

    public void setId(Integer id) {this.id = id;}
    public void setCol_A(String col_a) {this.col_a = col_a;}
}

Schema v2: SchemaVersion2.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/datafactory/)

package eu.placko.examples.spark.datafactory;

import java.io.Serializable;

public class SchemaVersion2 implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String col_b;
    private String col_a;

    public Integer getId() {return id;}
    public String getCol_B() {return col_b;}
    public String getCol_A() {return col_a;}

    public void setId(Integer id) {this.id = id;}
    public void setCol_B(String col_b) {this.col_b = col_b;}
    public void setCol_A(String col_a) {this.col_a = col_a;}
}

Schema v3: SchemaVersion3.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/datafactory/)

package eu.placko.examples.spark.datafactory;

import java.io.Serializable;

public class SchemaVersion3 implements Serializable {
    private static final long serialVersionUID = 1L;

    private Integer id;
    private String col_b;

    public Integer getId() {return id;}
    public String getCol_B() {return col_b;}

    public void setId(Integer id) {this.id = id;}
    public void setCol_B(String col_b) {this.col_b = col_b;}
}

ParquetSchemaMerging.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/app/)

package eu.placko.examples.spark.app;

import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import eu.placko.examples.spark.config.SparkConfig;
import eu.placko.examples.spark.config.SparkSQLConfig;
import eu.placko.examples.spark.sql.HiveBuilder;
import eu.placko.examples.spark.sql.QueryBuilder;
import eu.placko.examples.spark.datafactory.*;

import java.util.ArrayList;
import java.util.List;
import java.io.IOException;


public class ParquetSchemaMerging {
	private final static Logger LOGGER = Logger.getLogger(ParquetSchemaMerging.class.getName());
	
	public static void main(String[] args) throws IOException {
		if (args.length != 1) {
		    LOGGER.info("wrong input! /user/<REPLACE>/ParquetSchemaMerging/");
		    return;
		}
		String path = args[0];
		
		SparkSession sparkSession = SparkConfig.createSparkSession();
		LOGGER.info("Started Spark Session");	
		
		
		SchemaVersion1 row1V1 = new SchemaVersion1();
		row1V1.setId(1);
		row1V1.setCol_A("A1");
		SchemaVersion1 row2V1 = new SchemaVersion1();
		row2V1.setId(2);
		row2V1.setCol_A("A2");
		
		List<SchemaVersion1> dataV1 = new ArrayList<SchemaVersion1>();
		dataV1.add(row1V1);
		dataV1.add(row2V1);
		
		Dataset<Row> dsV1 = sparkSession.createDataFrame(dataV1, SchemaVersion1.class);
		dsV1.show();
		dsV1.printSchema();
		dsV1
			.write()
			.format("parquet")
			.mode(SaveMode.Overwrite)
			.save(path + "data/partition-date=2022-01-01");
		LOGGER.info("Created dsV1");
		
		
		SchemaVersion2 row1V2 = new SchemaVersion2();
		row1V2.setId(1);
		row1V2.setCol_B("B1");
		row1V2.setCol_A("A1");
		SchemaVersion2 row2V2 = new SchemaVersion2();
		row2V2.setId(2);
		row2V2.setCol_B("B2");
		row2V2.setCol_A("A2");
		
		List<SchemaVersion2> dataV2 = new ArrayList<SchemaVersion2>();
		dataV2.add(row1V2);
		dataV2.add(row2V2);
		
		Dataset<Row> dsV2 = sparkSession.createDataFrame(dataV2, SchemaVersion2.class);
		dsV2.show();
		dsV2.printSchema();
		dsV2
			.write()
			.format("parquet")
			.mode(SaveMode.Overwrite)
			.save(path + "data/partition-date=2022-01-02");
		LOGGER.info("Created dsV2");
		
		
		SchemaVersion3 row1V3 = new SchemaVersion3();
		row1V3.setId(1);
		row1V3.setCol_B("B1");
		SchemaVersion3 row2V3 = new SchemaVersion3();
		row2V3.setId(2);
		row2V3.setCol_B("B2");
		
		List<SchemaVersion3> dataV3 = new ArrayList<SchemaVersion3>();
		dataV3.add(row1V3);
		dataV3.add(row2V3);
		
		Dataset<Row> dsV3 = sparkSession.createDataFrame(dataV3, SchemaVersion3.class);
		dsV3.show();
		dsV3.printSchema();
		dsV3
			.write()
			.format("parquet")
			.mode(SaveMode.Overwrite)
			.save(path + "data/partition-date=2022-01-03");
		LOGGER.info("Created dsV3");
		
		
		Dataset<Row> dsMergeSchema = sparkSession.read()
				.option("mergeSchema","true")
				.parquet(path + "data");
		dsMergeSchema.show();
		dsMergeSchema.printSchema();
		LOGGER.info("Read dsMergeSchema");
		
		Dataset<Row> dsWithoutMergeSchema = sparkSession.read()
				//.option("mergeSchema","false")
				.parquet(path + "data");
		dsWithoutMergeSchema.show();
		dsWithoutMergeSchema.printSchema();
		LOGGER.info("Read dsWithoutMergeSchema");
		
		Dataset<Row> dsMergeSchemaSparkSQL = eu.placko.examples.spark.config.SparkSQLConfig.createSparkSQLSession().read()
				.parquet(path + "data");
		dsMergeSchemaSparkSQL.show();
		dsMergeSchemaSparkSQL.printSchema();
		LOGGER.info("Read dsMergeSchemaSparkSQL");
		
		
		String hive = new HiveBuilder().buildDB();
		sparkSession.sql(hive);
		LOGGER.info("Created Hive DB if not exists");
		
		hive = new HiveBuilder().buildTB();
		sparkSession.sql(hive);
		LOGGER.info("Dropped Hive table if exists");
		
		hive = new HiveBuilder().buildTB(path);
		sparkSession.sql(hive);
		LOGGER.info("Created Hive table over the parquet");
		
		
		hive = new QueryBuilder().build();
		Dataset<Row> dsReadFromHiveTableBeforeRepair = sparkSession.sql(hive);
		dsReadFromHiveTableBeforeRepair.show();
		dsReadFromHiveTableBeforeRepair.printSchema();
		LOGGER.info("Read dsReadFromHiveTableBeforeRepair");
		
		hive = new HiveBuilder().repairTB();
		sparkSession.sql(hive);
		LOGGER.info("MSCK REPAIR TABLE");
		
		hive = new QueryBuilder().build();
		Dataset<Row> dsReadFromHiveTableAfterRepair = sparkSession.sql(hive);
		dsReadFromHiveTableAfterRepair.show();
		dsReadFromHiveTableAfterRepair.printSchema();
		LOGGER.info("Read dsReadFromHiveTableAfterRepair");
	}
}

SparkConfig.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
//import java.io.File;

public class SparkConfig {
	public static SparkSession createSparkSession() {
		//String hiveDBLocation = new File("parquet_schema_evolution").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("ParquetSchemaMerging")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				//.config("spark.sql.warehouse.dir", hiveDBLocation)
				.enableHiveSupport()
				.getOrCreate();	
		return spark;
	}
}

SparkSQLConfig.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/config/)

package eu.placko.examples.spark.config;

import org.apache.spark.sql.SparkSession;
//import java.io.File;

public class SparkSQLConfig {
	public static SparkSession createSparkSQLSession() {
		//String hiveDBLocation = new File("parquet_schema_evolution").getAbsolutePath();
		SparkSession spark = SparkSession
				.builder()
				.appName("ParquetSchemaMerging")
				//.master("local[1]")
				//.master("yarn")
				.config("spark.sql.broadcastTimeout","36000")
				.config("spark.sql.parquet.mergeSchema", "true") // Use Spark SQL
				//.config("spark.sql.warehouse.dir", hiveDBLocation)
				.enableHiveSupport()
				.getOrCreate();
		return spark;
	}
}

HiveBuilder.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class HiveBuilder {
	public String buildDB() {
		String sql = "CREATE DATABASE IF NOT EXISTS parquet_schema_evolution LOCATION '/user/hive/databases/parquet_schema_evolution.db'";
		return sql;
	}
	
	public String buildTB() {
		String sql = "DROP TABLE IF EXISTS parquet_schema_evolution.parquet_merge";
		return sql;
	}
	
	public String buildTB(String path) {
		String sql = "CREATE EXTERNAL TABLE parquet_schema_evolution.parquet_merge (id INT, col_a STRING) PARTITIONED BY (`partition-date` STRING) STORED AS PARQUET LOCATION 'hdfs://nameservice1" + path + "data'";
		return sql;
	}
	
	public String repairTB() {
		String sql = "MSCK REPAIR TABLE parquet_schema_evolution.parquet_merge";
		return sql;
	}
}

QueryBuilder.java (path: /ParquetSchemaMerging/src/main/java/eu/placko/examples/spark/sql/)

package eu.placko.examples.spark.sql;

public class QueryBuilder {
	private Long limit = 0L;

	public QueryBuilder setLimit(Long limit) {
		this.limit = limit;
		return this;
	}

	public String build() {
		String sql = "SELECT * FROM parquet_schema_evolution.parquet_merge";
		if (limit != null && limit.longValue() > 0) {
			sql = sql + " LIMIT " + limit;
		}
		return sql;
	}
}

ParquetSchemaMerging.sh (path: /ParquetSchemaMerging/scripts/)

#!/bin/sh


on_error() {
  printf "\n\nAn error occurred!\n";
  exit 1;
}
trap on_error ERR


keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab
appClass=eu.placko.examples.spark.app.ParquetSchemaMerging

appVersion="1.0.0-SNAPSHOT"
appArtifact="/home/<REPLACE>/spark/java/ParquetSchemaMerging-$appVersion.jar /user/<REPLACE>/ParquetSchemaMerging/"
log4j_setting="-Dlog4j.configuration=file:///home/<REPLACE>/spark/java/log4j.xml"

echo "Start kinit"
kinit -kt $keytab $keytabUser
echo "Kinit done"

# only for "testing/debugging" purposes  --deploy-mode client \
spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class $appClass \
    --conf spark.executor.memory=12G \
    --conf spark.driver.memory=4G \
    --conf spark.dynamicAllocation.minExecutors=1 \
    --conf spark.dynamicAllocation.maxExecutors=5 \
    --conf spark.executor.cores=5 \
    --conf "spark.driver.extraJavaOptions=${log4j_setting}" \
    --conf "spark.executor.extraJavaOptions=${log4j_setting}" \
    $appArtifact

exit 0;

Results

2022-03-22 08:34:34 INFO  ParquetSchemaMerging:31 - Started Spark Session
+-----+---+
|col_A| id|
+-----+---+
|   A1|  1|
|   A2|  2|
+-----+---+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)

2022-03-22 08:34:41 INFO  ParquetSchemaMerging:53 - Created dsV1
+-----+-----+---+
|col_A|col_B| id|
+-----+-----+---+
|   A1|   B1|  1|
|   A2|   B2|  2|
+-----+-----+---+

root
 |-- col_A: string (nullable = true)
 |-- col_B: string (nullable = true)
 |-- id: integer (nullable = true)

2022-03-22 08:34:42 INFO  ParquetSchemaMerging:77 - Created dsV2
+-----+---+
|col_B| id|
+-----+---+
|   B1|  1|
|   B2|  2|
+-----+---+

root
 |-- col_B: string (nullable = true)
 |-- id: integer (nullable = true)

2022-03-22 08:34:42 INFO  ParquetSchemaMerging:99 - Created dsV3
+-----+---+-----+--------------+
|col_A| id|col_B|partition-date|
+-----+---+-----+--------------+
|   A1|  1|   B1|    2022-01-02|
|   A2|  2|   B2|    2022-01-02|
|   A1|  1| null|    2022-01-01|
|   A2|  2| null|    2022-01-01|
| null|  1|   B1|    2022-01-03|
| null|  2|   B2|    2022-01-03|
+-----+---+-----+--------------+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- col_B: string (nullable = true)
 |-- partition-date: date (nullable = true)

2022-03-22 08:34:44 INFO  ParquetSchemaMerging:107 - Read dsMergeSchema
+-----+---+--------------+
|col_A| id|partition-date|
+-----+---+--------------+
|   A1|  1|    2022-01-02|
|   A2|  2|    2022-01-02|
|   A1|  1|    2022-01-01|
|   A2|  2|    2022-01-01|
| null|  1|    2022-01-03|
| null|  2|    2022-01-03|
+-----+---+--------------+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- partition-date: date (nullable = true)

2022-03-22 08:34:44 INFO  ParquetSchemaMerging:114 - Read dsWithoutMergeSchema *
* Without schema merge, the schema will be decided randomly based on partitioned files. 

2022-03-22 08:34:44 WARN  SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
+-----+---+-----+--------------+
|col_A| id|col_B|partition-date|
+-----+---+-----+--------------+
|   A1|  1|   B1|    2022-01-02|
|   A2|  2|   B2|    2022-01-02|
|   A1|  1| null|    2022-01-01|
|   A2|  2| null|    2022-01-01|
| null|  1|   B1|    2022-01-03|
| null|  2|   B2|    2022-01-03|
+-----+---+-----+--------------+

root
 |-- col_A: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- col_B: string (nullable = true)
 |-- partition-date: date (nullable = true)

2022-03-22 08:34:44 INFO  ParquetSchemaMerging:120 - Read dsMergeSchemaSparkSQL *
* The result is same as using mergeSchema option. The advantage of using this option is that it is effective in the whole Spark session instead of specifying it in all read functions.

2022-03-22 08:34:45 INFO  ParquetSchemaMerging:125 - Created Hive DB if not exists
2022-03-22 08:34:45 INFO  ParquetSchemaMerging:129 - Dropped Hive table if exists
2022-03-22 08:34:46 INFO  ParquetSchemaMerging:133 - Created Hive table over the parquet
+---+-----+--------------+
| id|col_a|partition-date|
+---+-----+--------------+
+---+-----+--------------+ *
* Partitions not in metastore:
parquet_merge:partition-date=2022-01-01
parquet_merge:partition-date=2022-01-02
parquet_merge:partition-date=2022-01-03
Repair: Added partition to metastore parquet_merge: partition-date=2022-01-01
Repair: Added partition to metastore parquet_merge: partition-date=2022-01-02
Repair: Added partition to metastore parquet_merge: partition-date=2022-01-03

root
 |-- id: integer (nullable = true)
 |-- col_a: string (nullable = true)
 |-- partition-date: string (nullable = true)

2022-03-22 08:34:46 INFO  ParquetSchemaMerging:140 - Read dsReadFromHiveTableBeforeRepair
2022-03-22 08:34:46 INFO  ParquetSchemaMerging:144 - MSCK REPAIR TABLE
+---+-----+--------------+
| id|col_a|partition-date|
+---+-----+--------------+
|  1|   A1|    2022-01-02|
|  2|   A2|    2022-01-02|
|  1|   A1|    2022-01-01|
|  2|   A2|    2022-01-01|
|  1| null|    2022-01-03|
|  2| null|    2022-01-03|
+---+-----+--------------+

root
 |-- id: integer (nullable = true)
 |-- col_a: string (nullable = true)
 |-- partition-date: string (nullable = true)

2022-03-22 08:34:47 INFO  ParquetSchemaMerging:150 - Read dsReadFromHiveTableAfterRepair

README.md (path: /ParquetSchemaMerging/)

HOW TO CONFIGURE THE PROJECT
 
ParquetSchemaMerging.sh
keytabUser=<REPLACE>
keytab=/etc/security/keytabs/<REPLACE>.keytab

appArtifact="/home/<REPLACE>/spark/java/ParquetSchemaMerging-$appVersion.jar /user/<REPLACE>/ParquetSchemaMerging/"
log4j_setting="-Dlog4j.configuration=file:///home/<REPLACE>/spark/java/log4j.xml"

log4j.xml
<param name="file" value="/home/<REPLACE>/spark/java/log.out" />

Building and Running
 
Build
To build the application it is required to have this installed:
Java 9
Maven 3.x
Then just run this:
mvn clean install
 
Submit to Spark cluster
For running spark application see shell script inside /scripts dir.

Source Code

https://github.com/mplacko/ParquetSchemaMerging

Additional Info

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s