Structured Streaming was first introduced in Apache Spark 2.0. This platform has established itself as the best choice for building distributed streaming applications. The unification of the SQL / Dataset / DataFrame API and the built-in Spark functions makes it much easier for developers to implement their complex essentials such as streaming aggregation, stream-stream join, and windowing support. Since the release of Structured Streaming, there has been a popular request from developers to improve streaming control, just like we did in Spark Streaming (like DStream). In Apache Spark 3.0, we released a new UI for Structured Streaming.
The new UI Structured Streaming provides an easy way to monitor all streaming jobs with actionable insights and statistics, making it easier to troubleshoot issues during debugging, and improve production visibility with real-time metrics. The UI presents two sets of statistics: 1) aggregated information about the streaming query job and 2) detailed statistics information about streaming requests, including Input Rate, Process Rate, Input Rows, Batch Duration, Operation Duration, etc.
Aggregated information about streaming query jobs
When a developer submits a streaming SQL query, it appears in the Structured Streaming tab, which includes both active streaming queries and completed ones. The results table will provide some basic information regarding streaming requests, including request name, status, ID, runID, send time, request duration, last packet ID, as well as aggregated information such as average receive rate and average processing rate. There are three types of streaming request status: RUNNING, FINISHED, and FAILED. All FINISHED and FAILED requests are listed in the completed streaming request table. The Error column displays the details of the failed request exception.
We can view detailed statistics of the streaming request by clicking on the Run ID link.
Detailed statistical information
The Statistics page displays metrics including ingestion / processing rate, latency, and detailed operation duration, which are useful for understanding the status of your streaming requests, making it easy to debug anomalies in request processing.
It contains the following metrics:
- Input Rate : aggregated (across all sources) rate of data arrival.
- Process Rate : The aggregated (across all sources) rate at which Spark processes data.
- Batch Duration : The duration of each batch.
- Operation Duration : the time taken to perform various operations in milliseconds.
The monitored transactions are listed below:
addBatch
: time spent on reading the input data of the micro batch from sources, processing them and writing the output data of the batch to sync. This usually takes up most of the micro batch time.getBatch
: time taken to prepare a logical request to read the input data of the current micropackage from sources.getOffset
: the time it took to query sources if they have new inputs.walCommit
: Writes offsets to metadata logs.queryPlanning
: Create an execution plan.
It should be noted that not all of the listed operations will be displayed in the UI. There are different operations with different types of data sources, so some of the listed operations can be performed in one streaming request.
Troubleshooting Streaming Performance Using the UI
In this section, we'll look at a few cases where the new UI Structured Streaming indicates that something out of the ordinary is happening. A high-level demo request looks like this, and in each case we will assume some preconditions:
import java.util.UUID
val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
Increased latency due to insufficient processing power
In the first case, we run a request to process Apache Kafka data as soon as possible. For each batch, a streaming job processes all available data in Kafka. If the processing power is insufficient to handle the packet data, the latency will increase rapidly. The most intuitive judgment is that Input Rows and Batch Duration will grow linearly. The Input Rows parameter specifies that the streaming job can process a maximum of 8000 writes per second. But the current Input Rate is about 20,000 records per second. We can provide the threading job with more resources to execute, or we can add enough partitions to handle all the consumers needed to keep up with the producers.
Stable but high latency
How is this case different from the previous one? The latency does not increase, but remains stable, as shown in the following screenshot:
We found that the Process Rate can remain stable at the same Input Rate. This means that the processing power of the job is sufficient to process the input data. However, the processing time for each batch, i.e. the delay, is still 20 seconds. The main reason for the high latency is too much data in each batch. We can usually reduce the latency by increasing the parallelism of this job. After adding 10 more Kafka partitions and 10 cores for Spark tasks, we found the latency to be around 5 seconds - much better than 20 seconds.
Use Operation Duration Chart for Troubleshooting
The Operation Duration chart displays the amount of time spent performing various operations in milliseconds. This is useful for understanding the timing of each batch and making troubleshooting easier. Let's take the performance work โ SPARK-30915 : Avoid reading the metadata log file when looking for the latest batch IDโ in the Apache Spark community as an example.
Prior to this improvement, each subsequent batch after compression took more time than other batches, when the compressed metadata log becomes huge.
After examining the code, unnecessary reading of the compressed log file was found and fixed. The following Operation Duration diagram confirms the expected effect:
Plans for the future
As shown above, the new UI Structured Streaming will help developers better control their streaming jobs by having much more useful information about streaming requests. As an early version, the new UI is still in development and will be improved in future releases. There are several features that may be implemented in the not too distant future, including but not limited to the following:
- Learn more about executing a streaming request: late data, watermarks, data state metrics, and more.
- Structured Streaming UI support on Spark History Server.
- More noticeable clues for unusual behavior: latency, etc.
Try new UI
Try this new Spark Streaming UI in Apache Spark 3.0 in the new Databricks Runtime 7.1. If you are using Databricks notebooks, this will also give you an easy way to observe the status of any streaming request in the notebook and manage your requests . You can sign up for a free account at Databricks and get started in minutes for free, without any credit information.
Data quality in DWH is the consistency of the Data Warehouse. free webinar.
Recommended reading:
Data Build Tool, or what the Data Warehouse and Smoothie have
in common Diving into Delta Lake: Schema Enforcement and Evolution
High Speed โโApache Parquet in Python with Apache Arrow