Continuously ingest and load CSV files into Delta using Spark Structure Streaming
Leverage Spark Structure Streaming to efficiently ingest CSV files and load as Delta. Spark structure streaming provides the advantages of fast, near real-time, scalable, fault-tolerant and exactly-on
👋 Hi folks, thanks for reading my newsletter! My name is Yunna Wei, and I write about modern data stack, MLOps implementation patterns and data engineering best practices. The purpose of this newsletter is to help organizations select and build an efficient Data+AI stack with real-world experiences and insights and assist individuals to build their data and AI career journey with deep-dive tutorials and guidance; Please consider subscribing if you haven’t already, reach out on LinkedIn if you ever want to connect!
Background
It is very common that raw data is loaded into a file system or a cloud storage location (S3, Azure Blob Storage or GCS) in the format of CSV. When data scientists or data engineers need to use the raw data, they will first need to ingest the data, and load it into a format that is more reliable and scalable, such as Delta.
The traditional approach to accomplish this is to build a batch ingestion pipeline and schedule the pipeline to run with a particular time interval. However with this approach, data engineers / data scientists will have to expend additional effort to:
Record which files have been ingested for each batch;
Declare and manage input parameters to make sure each batch only ingests newly arrived files since the last batch;
Manage roll-back and re-tries if there is a pipeline failure for a particular batch, to avoid either data duplication or data loss;
In today’s article, I would like to propose a different approach, which is to use Spark Structure Streaming to ingest and load these CSV files. Lots of people might have the impression that structured streaming is only needed when you want to build near-real time data pipelines. In fact, it can be used to make your batch pipelines more efficient and easier to manage. I will demonstrate this in today’s article.
First of all, Spark Structure Streaming is a stream processing engine built on the Spark SQL engine, which provides capabilities such as checkpointing and write-ahead logs that add fault-tolerance guarantees and end-to-end exactly-once ingestion, in your data ingestion. It provides a much more efficient and simpler approach to ingest CSV files, as you do not need to manage all the aforementioned overheads that come with the batch approach. Additionally, because it is built on top of the Spark SQL engine you can use the same Spark Dataframe/Dataset API. So, if you are already using Spark to ingest CSV files in batches, there should be not much code to change at all.
First let’s examine what the pipeline design looks like.
Then I will explain the step-by-step code implementation.
Pipeline Design
There are 3 key components in the above pipeline:
Input Data Source — Raw data is appended and stored in an S3 bucket in the format of CSV. You can find the raw data here. The schema of these CSV files is as follows:
csvSchema = StructType() \
.add("ride_id", "string") \
.add("rideable_type", "string") \
.add("started_at", "string") \
.add("ended_at", "string") \
.add("start_station_name", "string") \
.add("start_station_id", "string") \
.add("end_station_name", "string") \
.add("end_station_id", "string") \
.add("start_lat", "string") \
.add("start_lng", "string") \
.add("end_lat", "string") \
.add("end_lng", "string") \
.add("member_casual", "string")
Streaming Ingestion Pipeline — Build a data ingestion pipeline leveraging Spark Structured Streaming. Within the ingestion pipeline, you just need to specify the details of the source, such as data format (in today’s case the output data format is CSV), schema, and other necessary options to regulate the data ingestion behaviours.
Sink Data Source — Save the raw data with the output mode of “append” into another S3 bucket in the format of Delta. With the sink data source, you can specify the details of the output sink and the output mode. The details of the output sink include data format (in today’s case, the output data format is Delta) and the location where the data is saved to (in today’s case, the data location is an S3 bucket).
Implementation Using Spark Structure Streaming
Step 1 — Schema Management. When you ingest file-based data sources such as text, CSV, JSON, ORC, Parquet, you can either specify the schema or ask Spark to infer the schema for you. The recommended way is to always specify the schema when you can. If you ask Spark to automatically infer the schema for you, Spark will have to go through the input once to determine the input schema, which will increase your ingestion latency. Additionally, there could be data quality issues caused by incorrect schema inference. Therefore, it is always recommended to define the schema yourself when you can. Below is how you can specify the schema:
csvSchema = StructType() \
.add("ride_id", "string") \
.add("rideable_type", "string") \
.add("started_at", "string") \
.add("ended_at", "string") \
.add("start_station_name", "string") \
.add("start_station_id", "string") \
.add("end_station_name", "string") \
.add("end_station_id", "string") \
.add("start_lat", "string") \
.add("start_lng", "string") \
.add("end_lat", "string") \
.add("end_lng", "string") \
.add("member_casual", "string")
When there are situations when you need to rely on Spark to automatically infer the schema for you, you need to enable the following Spark setting:
spark.conf.set("spark.sql.streaming.schemaInference", "true")
Step 2 — Define the options that regulate the ingestion behaviours specific to your data source. The popular options for CSV based file sources include:
"sep" - "Sets a separator for each field and value. The default value is comma."
"header" - "For reading, uses the first line as names of columns. For writing, writes the names of columns as the first line. The default value is false."
"enforceSchema" - "If it is set to true, the specified or inferred schema will be forcibly applied to datasource files, and headers in CSV files will be ignored. The default value is true."
"multiLine" - "Parse one record, which may span multiple lines, per file. CSV built-in functions ignore this option. The default value is false."
"samplingRatio" - "Defines fraction of rows used for schema inferring.The default value is 1.0."
"lineSep" - "Defines the line separator that should be used for parsing/writing. For reading, the default value is \r, \r\n and \n , for writing, the default value is \n."
"compression" - "Compression code to use when saving to files. The supported compression algorithms include none, bzip2, gzip, lz4, snappy and deflate. The default value is none."
csvDF = (spark
.readStream
.option("sep",",")
.option("header", "True")
.option("enforceSchema", "True")
.option("multiLine", "True")
.schema(csvSchema)
.csv(<YOUR-CSV-DATA-LOCATION>))
Step 3 — Save the data into the data sink location in the format of Delta. There are several options that regulate how the data is saved and stored in the data sink locations:
1. "output sinks" - "Spark Structured Streaming has a few types of built-in output sinks including
file sink where the data is saved to a directory, kafka sink where the data is stored as
one or more topics in Kafka and for each sink where there is arbitrary computation on the records in the output.
In today's case, the output sink will be an S3 bucket"
2. "output modes" - "Spark Structured Streaming supports the 3 output modes.
The first is append mode, which is also the default mode. With append mode, only the new rows added to the result table since the last trigger will be outputted to the sink.
The second is complete mode. With complete mode, the whole result table will be outputted to the sink after every trigger. This is supported for aggregation queries.
The third is update mode. With update mode, only the rows in the result table that were updated since the last trigger will be outputted to the sink."
3. "Trigger interval" - "The trigger settings of a streaming query define the timing and latency of streaming data processing. Spark Structured Streaming supports 5 different kinds of triggers.
The first is unspecified (default). If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.
The second is fixed interval micro-batches. The query will be executed with micro-batches mode, where micro-batches will be kicked off at the user-specified intervals.
The third is one-time micro-batch. The query will execute only one micro-batch to process all the available data and then stop on its own. This is useful in scenarios you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings.
The fourth is available-now micro-batch. Similar to queries such as one-time micro-batch trigger, the query will process all the available data and then stop on its own. The difference is that, it will process the data in (possibly) multiple micro-batches based on the source options (e.g. maxFilesPerTrigger for file source), which will result in better query scalability.
The fifth and last is continuous with fixed checkpoint interval. Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees."
4. "Checkpoint location" - "specify the location where the system will write all the checkpoint information to provide end-to-end fault-tolerance guarantee for some output sinks."
5. "Partition Key" - "Specify the partitioning columns when the data is stored in the output location."
6. "Query Name" - "Optionally, specify a unique name of the query for identification."
(csvDF.writeStream
.format("delta")
.outputMode("append") # .outputMode("complete"), .outputMode("update")
.option("checkpointLocation",checkpointLocation)
.option("path", parquetLocation)
.trigger(availableNow=True) # .trigger(processingTime='2 seconds'), .trigger(once=True), .trigger(continuous='1 second')
.partitionBy('rideable_type')
.queryName("streaming csv files")
.start())
The Complete Solution
from pyspark.sql.types import StructType
csvSchema = StructType() \
.add("ride_id", "string") \
.add("rideable_type", "string") \
.add("started_at", "string") \
.add("ended_at", "string") \
.add("start_station_name", "string") \
.add("start_station_id", "string") \
.add("end_station_name", "string") \
.add("end_station_id", "string") \
.add("start_lat", "string") \
.add("start_lng", "string") \
.add("end_lat", "string") \
.add("end_lng", "string") \
.add("member_casual", "string")
csvLocation = <YOUR-CSV-DATA-LOCATION>
parquetLocation = <YOUR-OUTPUT-DATA-LOCATION>
checkpointLocation = <YOUR-STREAMING-CHECKPOINT-LOCATION>
csvDF = (spark
.readStream
.option("sep",",")
.option("header", "True")
.option("enforceSchema", "True")
.option("multiLine", "True")
.schema(csvSchema)
.csv(csvLocation))
(csvDF.writeStream
.format("delta")
.outputMode("append") # .outputMode("complete"), .outputMode("update")
.option("checkpointLocation",checkpointLocation)
.option("path", parquetLocation)
.trigger(availableNow=True) # .trigger(processingTime='2 seconds'), .trigger(once=True), .trigger(continuous='1 second')
.partitionBy('rideable_type')
.queryName("streaming csv files")
.start())
Conclusion
As demonstrated through today’s article, you can easily build a streaming data ingestion pipeline with Spark Structure Streaming, which provides the benefits of low latency of data processing, ease of management and a fault-tolerance guarantee.
Streaming data pipelines can help organizations unlock the value of real-time analytics, real-time Machine Learning (ML) and real-time applications. In the last a few years, we have seen the pattern of unifying batch and streaming data pipelines become more widely adopted in the industry. Organizations start realizing the value of building streaming data pipelines and moving away from the slow and batch oriented data pipelines.
Therefore I will have a series of articles produced, specifically focusing more on sharing knowledge, information and tutorials on building streaming data pipelines. Please feel free to subscribe if you want to be notified when these articles are published. I generally publish 1 or 2 articles on data and AI every week.