Read and Parse Configuration File in a PySpark Job
4 min read

Read and Parse Configuration File in a PySpark Job

Read and Parse Configuration File in a PySpark Job
Photo by Tezos / Unsplash

One of the best practices in programming or code design is to minimize hard-coding of values and variables and instead use a configuration file to store them and read the configurations from that single source during the code execution. The reason for this approach is clear. No one wants to go down the route of refactoring code when the values and the variables have changed. Besides being a tedious task, it may also introduce bugs into the code.

In PySpark job, loading and parsing configuration file might be tricky depending on the deploy mode defined when submitting code for execution with spark-submit statement. Deploy mode, which can either be in client or in cluster mode describes whether to deploy your driver on the worker nodes (cluster) or locally as an external client.

When a spark application is submitted from a machine that is physically co-located with other machines in a cluster, this is referred to as client mode. In this mode, the driver process is launched directly from the spark-submit process that is acting as a client to the cluster. When the application is submitted locally from your laptop or from a machine that is far away from the worker nodes, a cluster deployment mode is used in order to minimize latency between the driver and the executors.

Use Case Scenario

You have a configparser configuration file placed in an S3 bucket. When a PySpark job is submitted via a spark-submit statement, the file must be available to both the driver and executor nodes in Spark to be read and parsed for values required in the application.

Available Approaches and Potential Challenges

In PySpark spark-submit statement, a config file can be provided using the --files argument in the submit statement. However, the file will only be available in the driver but not in the executor node. Hence, the application might fail when executed.

Another way is to use a SparkContext addFile method. This method actually makes the config file available in both driver and executor nodes. However, there's a downside that addFile method, according to Spark documentation only loads file from local or HDFS sources. So, to make a config file that is stored S3 bucket available to both driver and executor nodes in a PySpark application, another way has to be found.

In this post, we will design a reusable function that can be used in a PySpark job to read and parse a configuration file stored in an S3 bucket when running the application with a spark-submit method.

Solution Steps

  • Define a configuration file in a style supported by the configparser Python library.
  • Save the file in an S3 bucket and note the file URI.
  • Assuming a SparkSession has already been defined, copy the config file from the S3 bucket to Spark /tmp directory.
  • Split the config file S3 path and grab the file name from the URI.
  • Define a sparkContext from the existing SparkSession.
  • Use the sparkContext's addFile method to share the config file to the Spark driver and executors.
  • Use SparkFiles get method to grab the absolute path to the config file.
  • Create an instance of a configparser and parse the config file. Use the optional argument: interpolation=ExtendedInterpolation() if you would like to preprocess config values with ${} syntax before returning them from get() calls.
  • Return a configparser object from which config values can be extracted using get() call.

Code Design


from pyspark.sql import SparkSession
from pyspark import SparkFiles
from configparser import ConfigParser, ExtendedInterpolation
import subprocess

def getConfigFromS3(configFileS3Path: str):

    """ A function to read and parse config file in a PySpark Job.
    Args:
        configFileS3Path (str): Path of the configuration file placed in an S3 bucket.
    Returns:
        ConfigParser object from which config values can be extracted using get() call.
    """
    
    try:
        # copy config file from s3 bucket to spark tmp directory
        cmdx = f"""aws s3 cp {configFileS3Path} /tmp"""
        cmd_result = subprocess.run(
            ["/bin/bash"], input=cmdx, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")

        # if copy file is successful
        if cmd_result.returncode == 0:

            # get config file name from s3 path
            objkey = configFileS3Path.split("/")[-1]

            # add config file to spark driver and executors
            spkContext = SparkSession.getActiveSession().sparkContext
            spkContext.addFile(f"/tmp/{objkey}")

            # get the absolute path to the config file
            configFilePath = SparkFiles.get(objkey)

            # parse the config file
            config = ConfigParser(interpolation=ExtendedInterpolation())
            config.read(configFilePath)

            # return configparser object
            return config

        else:
            print(f"Copy config file from s3 bucket to spark /tmp directory failed!. This is the error message: {cmd_result.stderr}")

    except Exception as err:
        print(err)
        
        

Function test and result

According to Python's configparser documentation page, "A configuration file consists of sections, each led by a section header, followed by key/value entries separated by a specific string (= or : by default 1). By default, section names are case sensitive but keys are not".

A sample config file (testConfig.ini) based on the above definition and stored in an S3 is highlighted below:


[Default]
aws_dev_env = dev
aws_qa_env = qa
aws_prod_env = prod

[Source]
dev_data_src = s3://myBucket-${Default:aws_dev_env}/myFile.csv
prod_data_src = s3://myBucket-${Default:aws_prod_env}/myFile.csv

[Target]
dev_data_tgt = s3://mybucket-${Default:aws_dev_env}/targetdir/
prod_data_tgt = s3://mybucket-${Default:aws_prod_env}/targetdir/

We define a SparkSession as indicated below:


spark = SparkSession.builder.master("local[*]").appName("justMe").getOrCreate()

And we call the function getConfigFromS3() as stated below:


s3Config = getConfigFromS3("s3://my-bucket/testConfig.ini")


s3Config.get("Source", "dev_data_src")
# 's3://myBucket-dev/myFile.csv'


s3Config.get("Source", "prod_data_src")
# 's3://myBucket-prod/myFile.csv'

See the code and other artifacts in my Github repository.

Thanks for reading.