How To Union Multiple Dataframes in PySpark and Spark Scala
4 min read

How To Union Multiple Dataframes in PySpark and Spark Scala

How To Union Multiple Dataframes in PySpark and Spark Scala

One of the recurring tasks in data engineering and machine learning workloads is unioning or combining one or two tables, queries or dataframes into one distinct single result containing all the rows from the tables in the union.

Traditionally, union functionality was implemented as one of the inbuilt functions in ANSI SQL engine to manipulate query results. The function has since been included in all major analytics tools and platforms.

In Spark API, union operator is provided in three forms: Union, UnionAll and UnionByName.

In this post, we will take a look at how these union functions can be used to transform data using both Python and Scala. We will also cover a specific use case that involves combining multiple dataframes into one.

Create sample dataframes


# PySpark - Samples Dataframes

pDf1 = spark.createDataFrame([("France", "Paris", "French"), ("China", "Beijing", "Mandarin"), ("Germany", "Berlin", "German")], ["Country", "Capital", "Language"],)

pDf2 = spark.createDataFrame([("Brazil", "Brasilia", "Portuguese"), ("Egypt", "Cairo", "Arabic"), ("Guinea", "Conakry", "French")], ["Country", "Capital", "Language"],)

pDf3 = spark.createDataFrame([("Mexico", "Mexico City", "Spanish"), ("Gambia", "Banjul", "English"), ("Angola", "Luanda", "Portuguese")], ["Country", "Capital", "Language"],)

pDf4 = spark.createDataFrame([("USA", "North America", "Imperial"), ("UK", "Europe", "Metric")], ["Country", "Continent", "Measurement_System"],)

pDf3.show(truncate=False)

+-------+-----------+----------+
|Country|Capital    |Language  |
+-------+-----------+----------+
|Mexico |Mexico City|Spanish   |
|Gambia |Banjul     |English   |
|Angola |Luanda     |Portuguese|
+-------+-----------+----------+


// Spark Scala - Sample Dataframes

val sDf1 = spark.createDataFrame(Seq(("France", "Paris", "French"), ("China", "Beijing", "Mandarin"), ("Germany", "Berlin", "German"))).toDF("Country", "Capital", "Language")
val sDf2 = spark.createDataFrame(Seq(("Brazil", "Brasilia", "Portuguese"), ("Egypt", "Cairo", "Arabic"), ("Guinea", "Conakry", "French"))).toDF("Country", "Capital", "Language")
val sDf3 = spark.createDataFrame(Seq(("Mexico", "Mexico City", "Spanish"), ("Gambia", "Banjul", "English"), ("Angola", "Luanda", "Portuguese"))).toDF("Country", "Capital", "Language")
val sDf4 = spark.createDataFrame(Seq(("USA", "North America", "Imperial"), ("UK", "Europe", "Metric"))).toDF("Country", "Continent", "Measurement_System")

sDf1.show()

+-------+-------+--------+
|Country|Capital|Language|
+-------+-------+--------+
| France|  Paris|  French|
|  China|Beijing|Mandarin|
|Germany| Berlin|  German|
+-------+-------+--------+

Union and UnionAll

These two functions work the same way and use same syntax in both PySpark and Spark Scala. They combine two or more dataframes and create a new one. Union function expects each table or dataframe in the combination to have the same data type. Otherwise, this will lead to error.

Union in Spark SQL API is equivalent to UNIONALL in ANSI SQL. The union result may contain duplicate records. To deduplicate, include distinct() at the end of the code.


# PySpark - Union Function

pDf1.union(pDf2).union(pDf3).union(pDf4).show(truncate=False)

+-------+-------------+----------+
|Country|Capital      |Language  |
+-------+-------------+----------+
|France |Paris        |French    |
|China  |Beijing      |Mandarin  |
|Germany|Berlin       |German    |
|Brazil |Brasilia     |Portuguese|
|Egypt  |Cairo        |Arabic    |
|Guinea |Conakry      |French    |
|Mexico |Mexico City  |Spanish   |
|Gambia |Banjul       |English   |
|Angola |Luanda       |Portuguese|
|USA    |North America|Imperial  |
|UK     |Europe       |Metric    |
+-------+-------------+----------+

 

// Spark Scala - Union Function

sDf1.union(sDf2).union(sDf3).show()

+-------+-----------+----------+
|Country|    Capital|  Language|
+-------+-----------+----------+
| France|      Paris|    French|
|  China|    Beijing|  Mandarin|
|Germany|     Berlin|    German|
| Brazil|   Brasilia|Portuguese|
|  Egypt|      Cairo|    Arabic|
| Guinea|    Conakry|    French|
| Mexico|Mexico City|   Spanish|
| Gambia|     Banjul|   English|
| Angola|     Luanda|Portuguese|
+-------+-----------+----------+

UnionByName

UnionByName is different from both Union and UnionAll functions in that it resolves columns by name not by position as done by Union and UnionAll. Additionally, UnionByName has an optional parameter allowMissingColumns which when set to True can allow combining dataframes with different column names as long as the data types remain the same.

The parameter also allows unioning of disparate dataframes with unequal columns. In this case, the missing values will be filled with nulls while the missing columns will be included in the schema at the end of the union result.

Note that in the example highlighted below, Continent and Measurement_System fields missing in the first dataframe, have been added to the union because the parameter allowMissingColumns has been set to True. Setting the parameter to False will result in error.


pDf4.show(truncate=False)

+-------+-------------+------------------+
|Country|Continent    |Measurement_System|
+-------+-------------+------------------+
|USA    |North America|Imperial          |
|UK     |Europe       |Metric            |
+-------+-------------+------------------+


pDf1.unionByName(pDf4, allowMissingColumns=True).show()

+-------+-------+--------+-------------+------------------+
|Country|Capital|Language|    Continent|Measurement_System|
+-------+-------+--------+-------------+------------------+
| France|  Paris|  French|         null|              null|
|  China|Beijing|Mandarin|         null|              null|
|Germany| Berlin|  German|         null|              null|
|    USA|   null|    null|North America|          Imperial|
|     UK|   null|    null|       Europe|            Metric|
+-------+-------+--------+-------------+------------------+

Handling Multiple Dataframes Union

In a real-world use case that require combining multiple dynamic dataframes, chaining dataframes with several union functions can look untidy and impractical.

In such situation, it is better to design a reuseable function that can efficiently handle multiple unions of dataframes including scenarios where one or more columns can be missing or where columns names can be different.

An example of such function is presented below using Python's functools library reduce function for PySpark dataframes and Scala's reduceLeft function for Spark dataframes in Scala.


# PySpark - Union Multiple Dataframes Function

from functools import reduce
from pyspark.sql import DataFrame
from typing import List


def unionMultipleDf(DfList: List) -> DataFrame:
  """
  This function combines multiple dataframes rows into a single data frame  
  Parameter: DfList - a list of all dataframes to be unioned
  """
  # create anonymous function with unionByName
  unionDfWithMissingColumns = lambda dfa, dfb: dfa.unionByName(dfb, allowMissingColumns=True)
  
  # use reduce to combine all the dataframes
  finalDf = reduce(unionDfWithMissingColumns, DfList)
  
  return finalDf
  

# PySpark - Using unionMultipleDf function

unionMultipleDf([pDf1, pDf2, pDf3, pDf4]).show()

+-------+-----------+----------+-------------+------------------+
|Country|    Capital|  Language|    Continent|Measurement_System|
+-------+-----------+----------+-------------+------------------+
| France|      Paris|    French|         null|              null|
|  China|    Beijing|  Mandarin|         null|              null|
|Germany|     Berlin|    German|         null|              null|
| Brazil|   Brasilia|Portuguese|         null|              null|
|  Egypt|      Cairo|    Arabic|         null|              null|
| Guinea|    Conakry|    French|         null|              null|
| Mexico|Mexico City|   Spanish|         null|              null|
| Gambia|     Banjul|   English|         null|              null|
| Angola|     Luanda|Portuguese|         null|              null|
|    USA|       null|      null|North America|          Imperial|
|     UK|       null|      null|       Europe|            Metric|
+-------+-----------+----------+-------------+------------------+


// Spark Scala - Union Multiple Dataframes Function

import org.apache.spark.sql.DataFrame

def unionMultipleDf(DfList: Seq[DataFrame]): DataFrame = {

  val unionDfWithMissingColumns = (dfa: DataFrame, dfb: DataFrame) => dfa.unionByName(dfb, allowMissingColumns=true)
  
  val finalDf = DfList.reduceLeft(unionDfWithMissingColumns)
  
  finalDf
}


// Spark Scala - Using unionMultipleDf function

val dfList = Vector(sDf1, sDf2, sDf3, sDf4)

unionMultipleDf(dfList).show()

+-------+-----------+----------+-------------+------------------+
|Country|    Capital|  Language|    Continent|Measurement_System|
+-------+-----------+----------+-------------+------------------+
| France|      Paris|    French|         null|              null|
|  China|    Beijing|  Mandarin|         null|              null|
|Germany|     Berlin|    German|         null|              null|
| Brazil|   Brasilia|Portuguese|         null|              null|
|  Egypt|      Cairo|    Arabic|         null|              null|
| Guinea|    Conakry|    French|         null|              null|
| Mexico|Mexico City|   Spanish|         null|              null|
| Gambia|     Banjul|   English|         null|              null|
| Angola|     Luanda|Portuguese|         null|              null|
|    USA|       null|      null|North America|          Imperial|
|     UK|       null|      null|       Europe|            Metric|
+-------+-----------+----------+-------------+------------------+

I hope this post is helpful in clarifying union functions in Spark API.

Thanks for reading.