Unleashing the Power of Nested JSON: A Guide to Transforming JSON Data into CSV

In this article, I share my recent experience of wrangling a complex nested JSON structure into a neat and tidy CSV format. Join me on a journey of ETL transformation and discover practical tips for handling unwieldy data structures like a pro!

Aleksandar Brkljac
4 min readFeb 24, 2023

As a data platform developer using PySpark, I recently encountered a challenge when I received a complex nested JSON file for an ETL project. I knew that transforming this data into a clean and organized CSV file would require some serious data wrangling, but I was up for the task! In this article, I’ll walk you through my approach to tackling this problem using PySpark, and share some tips and tricks I learned along the way.

So let’s dive in and see how I transformed that complex nested JSON structure into a usable CSV format!

Input Data

{
"initialReleaseDate": "2021-08-26",
"nachbauNo": "nachbauNo",
"orderNo": "orderNo1",
"panels": [{
"feldName": "feldName1",
"ortsKz": "",
"panelInformation": {
"children": [{
"children": [{
"children": [{
"children": [],
"description": "",
"deviceName": "",
"kmat": "000000000011021650",
"kmatExt": "00",
"kmatName": "000000000011021650",
"position": 3,
"positionExt": "0010",
"qty": 100,
"unit": "ST"
}],
"description": "",
"deviceName": "",
"kmat": "000000000011063653",
"kmatExt": "03",
"kmatName": "000000000011021650",
"position": 2,
"positionExt": "0010",
"qty": 100,
"unit": "ST"
},
{
"children": [{
"children": [],
"description": "",
"deviceName": "",
"kmat": "000000004000000197",
"kmatExt": "00",
"kmatName": "000000000011021650",
"position": 3,
"positionExt": "5000",
"qty": 100,
"unit": "ST"
},
{
"children": [{
"children": [],
"description": "",
"deviceName": "",
"kmat": "000000001000000000",
"kmatExt": "00",
"kmatName": "000000000011021650",
"position": 4,
"positionExt": "0010",
"qty": 100,
"unit": "ST"
}],
"description": "",
"deviceName": "",
"kmat": "000000004107310262",
"kmatExt": "30",
"kmatName": "000000000011021650",
"position": 3,
"positionExt": "5010",
"qty": 100,
"unit": "ST"
}

],
"description": "",
"deviceName": "",
"kmat": "000000000076173303",
"kmatExt": "03",
"kmatName": "000000000076173303",
"position": 2,
"positionExt": "0030",
"qty": 100,
"unit": "ST"
}
],
"description": "",
"deviceName": "8BX2042-2TC",
"kmat": "000000000011069993",
"kmatExt": "03",
"kmatName": "name000000000011069993",
"position": 1,
"positionExt": "0001",
"qty": 1,
"unit": "ST"
}],
"description": "",
"deviceName": "8BK5004-8AA00-Z",
"kmat": "000000000030032011",
"kmatExt": "00",
"kmatName": "Name000000000030032011",
"position": 0,
"positionExt": "0000",
"qty": 1,
"unit": "ST"
},
"position": 20,
"typical": "2000"
}],
"projectName": "projectName"
}

Input data consists of a complex nested JSON structure. This means that the data is organized in a hierarchical way, with multiple levels of nested objects and arrays. The nested structure can make it challenging to work with the data, as it requires careful navigation and manipulation to extract the desired information.

Output data

Resulting schema

nachbau_no,typical,kmat,description,device_name,kmat_ext,position,position_ext,qty,unit,kmat_name,parent_id

In that transformation, my task was to take in a DataFrame with a nested structure that represented information about panels and their parent-child relationships. I had to flatten this structure so that each row represented a single panel and its associated information, including information about its parent panel if it had one.

Transformation

The JSON value was originally stored in a single column as a string value. Therefore, I needed to create a schema for the JSON value column and use it to parse the JSON data in the column. After that, I exploded the JSON array and selected the appropriate fields, which included details about the panel and its parent panel.

def _transform_source_data(self, source_df: DataFrame) -> DataFrame:
# Creating schema for json value column
json_data = source_df.select("json_value").collect()
json_rdd = self.spark.sparkContext.parallelize(json_data)
json_df = self.spark.read.json(json_rdd.map(lambda row: row[0]))
array_json_schema = tp.ArrayType(json_df.schema)
execution_timestamp = parser.parse(self.configs.extra_options["timeframe_end_date"])

transformed_df = (
source_df.withColumn("json_array", func.from_json(func.col("json_value"), array_json_schema))
.select(func.explode(func.col("json_array")).alias("json_values"), "loaded_on")
.select("json_values.*", "loaded_on")
.withColumn("panels", func.explode("panels"))
.withColumn("panelInformation", func.col("panels.panelInformation"))
.withColumn("nachbau_no", func.col("nachbauNo"))
.withColumn("typical", func.col("panels.typical"))
.withColumn("description", func.col("panelInformation.description"))
.withColumn("device_name", func.col("panelInformation.deviceName"))
.withColumn("kmat", func.col("panelInformation.kmat"))
.withColumn("kmat_ext", func.col("panelInformation.kmatExt"))
.withColumn("kmat_name", func.col("panelInformation.kmatName"))
.withColumn("position", func.col("panelInformation.position"))
.withColumn("position_ext", func.col("panelInformation.positionExt"))
.withColumn("qty", func.col("panelInformation.qty"))
.withColumn("unit", func.col("panelInformation.unit"))
.withColumn("parent_id", func.lit(None))
.drop("json_values")
)

Next, I selected the fields I wanted to flatten and renamed them as needed. I then used a while loop to iterate over the DataFrame, transforming the nested structure into a flattened structure.

        select_flatten_fields = [
"parent_id",
"loaded_on",
"nachbau_no",
"typical",
func.col("kmat").alias("parent_kmat"),
"panelInformation",
"children.*",
]

select_union_columns = [
"parent_id",
"nachbau_no",
"typical",
"position",
"kmat",
"description",
"qty",
"unit",
"position_ext",
"device_name",
"loaded_on",
"kmat_ext",
"kmat_name",
]

childrens_df = (
transformed_df.withColumn("children", func.explode(func.col("panelInformation.children")))
.select(*select_flatten_fields)
.withColumnRenamed("deviceName", "device_name")
.withColumnRenamed("positionExt", "position_ext")
.withColumnRenamed("kmatExt", "kmat_ext")
.withColumnRenamed("kmatName", "kmat_name")
)
while True:
try:
transformed_df = (
transformed_df.select(*select_union_columns)
.unionByName(childrens_df.select("parent_kmat", *select_union_columns), allowMissingColumns=True)
.withColumn(
TdNachbauPanelsInformation.parent_id.name,
func.coalesce(func.col("parent_id"), func.col("parent_kmat")),
)
.drop("parent_kmat")
)
childrens_df = (
childrens_df.withColumn("children", func.explode("children"))
.select(*select_flatten_fields)
.withColumnRenamed("deviceName", "device_name")
.withColumnRenamed("positionExt", "position_ext")
.withColumnRenamed("kmatExt", "kmat_ext")
.withColumnRenamed("kmatName", "kmat_name")
)
except Exception as ex:
LOGGER.info(ex)
break
return transformed_df.withColumn("active_flag", func.lit(True)).withColumn(
"published_from", func.to_date(func.lit(execution_timestamp))
)

Finally, I added two columns to the resulting DataFrame indicating that the panel was active and the timestamp at which it was published. The output of this transformation was a flattened DataFrame with one row per panel and all relevant information about each panel and its parent panel, if applicable.

--

--

Aleksandar Brkljac
Aleksandar Brkljac

Written by Aleksandar Brkljac

Software Engineer @ Foursquare || Automation || Cryptocurrencies || Lifestyle

No responses yet