-7.9 C
New York
Monday, February 9, 2026

RocksDB 101: Optimizing stateful streaming in Apache Spark with Amazon EMR and AWS Glue


Actual-time streaming knowledge processing is a strategic crucial that instantly impacts enterprise competitiveness. Organizations face mounting stress to course of large knowledge streams instantaneously—from detecting fraudulent transactions and delivering customized buyer experiences to optimizing complicated provide chains and responding to market dynamics milliseconds forward of rivals.

Apache Spark Structured Streaming addresses these essential enterprise challenges by its stateful processing capabilities, enabling purposes to keep up and replace intermediate outcomes throughout a number of knowledge streams or time home windows. RocksDB was launched in Apache Spark 3.2, providing a extra environment friendly different to the default HDFS-based in-memory retailer. RocksDB excels in stateful streaming in situations that require dealing with massive portions of state knowledge. It delivers optimum efficiency advantages, notably in decreasing Java digital machine (JVM) reminiscence stress and rubbish assortment (GC) overhead.

This submit explores RocksDB’s key options and demonstrates its implementation utilizing Spark on Amazon EMR and AWS Glue, offering you with the information it’s worthwhile to scale your real-time knowledge processing capabilities.

RocksDB state retailer overview

Spark Structured Streaming processes fall into two classes:

  • Stateful: Requires monitoring intermediate outcomes throughout micro-batches (for instance, when working aggregations and de-duplication).
  • Stateless: Processes every batch independently.

A state retailer is required by stateful purposes that observe intermediate question outcomes. That is important for computations that rely on steady occasions and alter outcomes based mostly on every batch of enter, or on combination knowledge over time, together with late arriving knowledge. By default, Spark provides a state retailer that retains states in JVM reminiscence, which is performant and adequate for many normal streaming instances. Nonetheless, you probably have a lot of stateful operations in a streaming software—resembling, streaming aggregation, streaming dropDuplicates, stream-stream joins, and so forth—the default in-memory state retailer would possibly face out-of-memory (OOM) points due to a big JVM reminiscence footprint or frequent GC pauses, leading to degraded efficiency.

Benefits of RocksDB over in-memory state retailer

RocksDB addresses the challenges of an in-memory state retailer by off-heap reminiscence administration and environment friendly checkpointing.

  • Off-heap reminiscence administration: RocksDB shops state knowledge in OS-managed off-heap reminiscence, decreasing GC stress. Whereas off-heap reminiscence nonetheless consumes machine reminiscence, it doesn’t occupy area within the JVM. As an alternative, its core reminiscence constructions, resembling block cache or memTables, allocate instantly from the working system, bypassing the JVM heap. This strategy makes RocksDB an optimum alternative for memory-intensive purposes.
  • Environment friendly checkpointing: RocksDB mechanically saves state adjustments to checkpoint areas, resembling Amazon Easy Storage Service (Amazon S3) paths or native directories, serving to to make sure full fault tolerance. When interacting with S3, RocksDB is designed to enhance checkpointing effectivity; it does this by incremental updates and compaction to cut back the quantity of information transferred to S3 throughout checkpoints, and by persisting fewer massive state information in comparison with the various small information of the default state retailer, decreasing S3 API calls and latency.

Implementation concerns

RocksDB operates as a local C++ library embedded inside the Spark executor, utilizing off-heap reminiscence. Whereas it doesn’t fall beneath JVM GC management, it nonetheless impacts general executor reminiscence utilization from the YARN or OS perspective. RocksDB’s off-heap reminiscence utilization would possibly exceed YARN container limits with out triggering container termination, doubtlessly resulting in OOM points. It’s best to take into account the next approaches to handle Spark’s reminiscence:

Alter the Spark executor reminiscence measurement

Enhance spark.executor.memoryOverheadorspark.executor.memoryOverheadFactor to go away extra room for off-heap utilization. The next instance units half (4 GB) of spark.executor.reminiscence (8 GB) because the reminiscence overhead measurement.

# Complete executor reminiscence = 8GB (heap) + 4GB (overhead) = 12GB
spark-submit 
. . . . . . . .
--conf spark.executor.reminiscence=8g          # JVM Heap
--conf spark.executor.memoryOverhead=4g  # Off-heap allocation (RocksDB + different native)
. . . . . . . .

For Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), enabling YARN reminiscence management with the next strict container reminiscence enforcement by polling methodology preempts containers to keep away from node-wide OOM failures:

yarn.nodemanager.useful resource.reminiscence.enforced = false
yarn.nodemanager.elastic-memory-control.enabled = false
yarn.nodemanager.pmem-check-enabled = true 
or 
yarn.nodemanager.vmem-check-enabled = true

Off-heap reminiscence management

Use RocksDB-specific settings to configure reminiscence utilization. Extra particulars might be discovered within the Finest practices and concerns part.

Get began with RocksDB on Amazon EMR and AWS Glue

To activate the state retailer RocksDB in Spark, configure your software with the next setting:

spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

Within the following sections, we discover making a pattern Spark Structured Streaming job with RocksDB enabled working on Amazon EMR and AWS Glue respectively.

RocksDB on Amazon EMR

Amazon EMR variations 6.6.0 and later help RocksDB, together with Amazon EMR on EC2, Amazon EMR serverless and Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS). On this case, we use Amazon EMR on EC2 for example.

Use the next steps to run a pattern streaming job with RocksDB enabled.

  1. Add the next pattern script to s3:///script/sample_script.py
from pyspark.sql import SparkSession
from pyspark.sql.capabilities import explode, cut up, col, expr
import random

# Record of phrases
phrases = ["apple", "banana", "orange", "grape", "melon", 
         "peach", "berry", "mango", "kiwi", "lemon"]

# Create random strings from phrases
def generate_random_string():
    return " ".be a part of(random.selections(phrases, ok=5)) 
    
    
# Create Spark Session
spark = SparkSession 
    .builder 
    .appName("StreamingWordCount") 
    .config("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") 
    .getOrCreate()


# Register UDF
spark.udf.register("random_string", generate_random_string)

# Create streaming knowledge
raw_stream = spark.readStream 
    .format("fee") 
    .possibility("rowsPerSecond", 1) 
    .load() 
    .withColumn("phrases", expr("random_string()"))

# Execute phrase counts
wordCounts = raw_stream.choose(explode(cut up(raw_stream.phrases, " ")).alias("phrase")).groupby("phrase").depend()

# Output the outcomes
question = wordCounts 
    .writeStream 
    .outputMode("full") 
    .format("console") 
    .begin()

question.awaitTermination()

  1. On the AWS Administration Console for Amazon EMR, select Create Cluster
  2. For Title and purposes – required, choose the newest Amazon EMR launch.
  3. For Steps, select Add. For Sort, choose Spark software.
  4. For Title, enter GettingStartedWithRocksDB and s3:///script/sample_script.py because the Utility location.
  5. Select Save step.
  6. For different settings, select the suitable settings based mostly in your use case.
  7. Select Create cluster to begin the streaming software by way of Amazon EMR step.

RocksDB on AWS Glue

AWS Glue 4.0 and later variations help RocksDB. Use the next steps to run the pattern job with RocksDB enabled on AWS Glue.

  1. On the AWS Glue console, within the navigation pane, select ETL jobs.
  2. Select Script editor and Create script.
  3. For the job title, enter GettingStartedWithRocksDB.
  4. Copy the script from the earlier instance and paste it on the Script tab.
  5. On Job particulars tab, for Sort, choose Spark Streaming.
  6. Select Save, after which select Run to begin the streaming job on AWS Glue.

Walkthrough particulars

Let’s dive deep into the script to know the right way to run a easy stateful Spark software with RocksDB utilizing the next instance pySpark code.

  1. First, arrange RocksDB as your state retailer by configuring the supplier class:
spark = SparkSession 
    .builder 
    .appName("StreamingWordCount") 
    .config("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") 
    .getOrCreate()

  1. To simulate streaming knowledge, create an information stream utilizing the fee supply sort. It generates one document per second, containing 5 random fruit names from a pre-defined record.
# Record of phrases
phrases = ["apple", "banana", "orange", "grape", "melon", 
         "peach", "berry", "mango", "kiwi", "lemon"]

# Create random strings from phrases
def generate_random_string():
    return " ".be a part of(random.selections(phrases, ok=5))
# Register UDF
spark.udf.register("random_string", generate_random_string)

# Create streaming knowledge
raw_stream = spark.readStream 
    .format("fee") 
    .possibility("rowsPerSecond", 1) 
    .load() 
    .withColumn("phrases", expr("random_string()"))

  1. Create a phrase counting operation on the incoming stream. It is a stateful operation as a result of it maintains working counts between processing intervals, that’s, earlier counts have to be saved to calculate the following new totals.
# Cut up raw_stream into phrases and counts them
wordCounts = raw_stream.choose(explode(cut up(raw_stream.phrases, " ")).alias("phrase")).groupby("phrase").depend()

  1. Lastly, output the phrase depend totals to the console:
# Output the outcomes
question = wordCounts 
    .writeStream 
    .outputMode("full") 
    .format("console") 
    .begin()

Enter knowledge

In the identical pattern code, check knowledge (raw_stream) is generated at a fee of one-row-per-second, as proven within the following instance:

+-----------------------+-----+--------------------------------+
|timestamp              |worth|phrases                           |
+-----------------------+-----+--------------------------------+
|2025-04-18 07:05:57.204|125  |berry peach orange banana banana|
+-----------------------+-----+--------------------------------+

Output outcome

The streaming job produces the next ends in the output logs. It demonstrates how Spark Structured Streaming maintains and updates the state throughout a number of micro-batches:

  • Batch 0: Begins with an empty state
  • Batch 1: Processes a number of enter information, leading to preliminary counts for each one of many 10 fruits (for instance, banana seems 8 instances)
  • Batch 2: Operating totals based mostly on new occurrences from the following set of information are added to the counts (for instance,  banana will increase from 8 to fifteen, indicating 7 new occurrences).

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|phrase|depend|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  phrase|depend|
+------+-----+
|banana|    8|
|orange|    4|
| apple|    3|
| berry|    5|
| lemon|    7|
|  kiwi|    6|
| melon|    8|
| peach|    8|
| mango|    7|
| grape|    9|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  phrase|depend|
+------+-----+
|banana|   15|
|orange|    8|
| apple|    7|
| berry|   11|
| lemon|   12|
|  kiwi|   11|
| melon|   16|
| peach|   15|
| mango|   12|
| grape|   13|
+------+-----+

State retailer logs

RocksDB generates detailed logs in the course of the job run, like the next:

INFO    2025-04-18T07:52:28,378 83933   org.apache.spark.sql.execution.streaming.MicroBatchExecution    [stream execution thread for [id = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx, runId = xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx]] 60  Streaming question made progress: {
  "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "runId": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
  "title": null,
  "timestamp": "2025-04-18T07:52:27.642Z",
  "batchId": 39,
  "numInputRows": 1,
  "inputRowsPerSecond": 100.0,
  "processedRowsPerSecond": 1.3623978201634879,
  "durationMs": {
    "addBatch": 648,
    "commitOffsets": 39,
    "getBatch": 0,
    "latestOffset": 0,
    "queryPlanning": 10,
    "triggerExecution": 734,
    "walCommit": 35
  },
  "stateOperators": [
    {
      "operatorName": "stateStoreSave",
      "numRowsTotal": 10,
      "numRowsUpdated": 4,
      "allUpdatesTimeMs": 18,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 3629,
      "memoryUsedBytes": 174179,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 36,
      "numStateStoreInstances": 36,
      "customMetrics": {
        "rocksdbBytesCopied": 5009,
        "rocksdbCommitCheckpointLatency": 533,
        "rocksdbCommitCompactLatency": 0,
        "rocksdbCommitFileSyncLatencyMs": 2991,
        "rocksdbCommitFlushLatency": 44,
        "rocksdbCommitPauseLatency": 0,
        "rocksdbCommitWriteBatchLatency": 0,
        "rocksdbFilesCopied": 4,
        "rocksdbFilesReused": 24,
        "rocksdbGetCount": 8,
        "rocksdbGetLatency": 0,
        "rocksdbPinnedBlocksMemoryUsage": 3168,
        "rocksdbPutCount": 4,
        "rocksdbPutLatency": 0,
        "rocksdbReadBlockCacheHitCount": 8,
        "rocksdbReadBlockCacheMissCount": 0,
        "rocksdbSstFileSize": 35035,
        "rocksdbTotalBytesRead": 136,
        "rocksdbTotalBytesReadByCompaction": 0,
        "rocksdbTotalBytesReadThroughIterator": 0,
        "rocksdbTotalBytesWritten": 228,
        "rocksdbTotalBytesWrittenByCompaction": 0,
        "rocksdbTotalBytesWrittenByFlush": 5653,
        "rocksdbTotalCompactionLatencyMs": 0,
        "rocksdbWriterStallLatencyMs": 0,
        "rocksdbZipFileBytesUncompressed": 266452
      }
    }
  ],
  "sources": [
    {
      "description": "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, numPartitions=default",
      "startOffset": 63,
      "endOffset": 64,
      "latestOffset": 64,
      "numInputRows": 1,
      "inputRowsPerSecond": 100.0,
      "processedRowsPerSecond": 1.3623978201634879
    }
  ],
  "sink": {
    "description": "org.apache.spark.sql.execution.streaming.ConsoleTable$@2cf39784",
    "numOutputRows": 10
  }
}

In Amazon EMR on EC2, these logs can be found on the node the place the YARN ApplicationMaster container is working. They are often discovered at/var/log/hadoop-yarn/containers///stderr.

As for AWS Glue, you could find the RocksDB metrics in Amazon CloudWatch, beneath the log group /aws-glue/jobs/error.

RocksDB metrics

The metrics from the previous logs present insights on RocksDB standing. The followings are some instance metrics you would possibly discover helpful when investigating streaming job points:

  • rocksdbCommitCheckpointLatency: Time spent writing checkpoints to native storage
  • rocksdbCommitCompactLatency: Length of checkpoint compaction operations throughout checkpoint commits
  • rocksdbSstFileSize: Present measurement of SST information in RocksDB.

Deep dive into RocksDB key ideas

To raised perceive the state metrics proven within the logs, we deep dive into RocksDB’s key ideas: MemTable, sorted string desk (SST) file, and checkpoints. Moreover, we offer some ideas for finest practices and fine-tuning.

Excessive stage structure

RocksDB is a neighborhood, non-distributed persistent key-value retailer embedded in Spark executors. It permits scalable state administration for streaming workloads, backed by Spark’s checkpointing for fault tolerance. As proven within the previous determine, RocksDB shops knowledge in reminiscence and likewise on disk. RocksDB’s capacity to spill knowledge over to disk is what permits Spark Structured Streaming to deal with state knowledge that exceeds the obtainable reminiscence.

Reminiscence:

  • Write buffers (MemTables): Designated reminiscence to buffer writes earlier than flushing onto disk
  • Block cache (learn buffer): Reduces question time by caching outcomes from disk

Disk:

  • SST information: Sorted String Desk saved as SST file format for quick entry

MemTable: Saved off-heap

MemTable, proven within the previous determine, is an in-memory retailer the place knowledge is first written off-heap, earlier than being flushed to disk as an SST file. RocksDB caches the newest two batches of information (scorching knowledge) in MemTable to cut back streaming course of latency. By default, RocksDB solely has two MemTables—one is lively and the opposite is read-only. When you have adequate reminiscence, the configuration spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber might be elevated to have greater than two MemTables. Amongst these MemTables, there may be at all times one lively desk, and the remaining are read-only MemTables used as write buffers.

SST information: Saved on Spark executor’s native disk

SST information are block-based tables saved on the Spark executor’s native disk. When the in-memory state knowledge can not match right into a MemTable (outlined by a Spark configuration writeBufferSizeMB), the lively desk is marked as immutable, saving it because the SST file format, which switches it to a read-only MemTable whereas asynchronously flushing it to native disks. Whereas flushing, the immutable MemTable can nonetheless be learn, in order that the newest state knowledge is out there with minimal learn latency.

Studying from RocksDB follows the sequence demonstrated by the previous diagram:

  1. Learn from the lively MemTable.
  2. If not discovered, iterate by read-only MemTables within the order of latest to oldest.
  3. If not discovered, learn from BlockCache (learn buffer).
  4. If misses, load index (one index per SST) from disk into BlockCache. Search for key from index and if hits, load knowledge block onto BlockCache and return outcome.

SST information are saved on executors’ native directories beneath the trail of spark.native.dir (default: /tmp) or yarn.nodemanager.local-dirs:

  • Amazon EMR on EC2 – ${yarn.nodemanager.local-dirs}/usercache/hadoop/appcache///
  • Amazon EMR Serverless, Amazon EMR on EKS, AWS Glue – ${spark.native.dir}//

Moreover, by utilizing software logs, you’ll be able to observe the MemTable flush and SST file add standing beneath the file path:

  • Amazon EMR on EC2 – /var/log/hadoop-yarn/containers///stderr
  • Amazon EMR on EKS –/var/log/spark/person/-/stderr

The next is an instance command to verify the SST file standing in an executor log from Amazon EMR on EKS:

cat /var/log/spark/person/-/stderr/present | grep previous

or

kubectl logs --namespace emr -c spark-kubernetes-executor | grep previous

The next screenshot is an instance of the output of both command.

You should utilize the next examples to verify if the MemTable information have been deleted and flushed out to SST:

cat /var/log/spark/person/-/stderr/present | grep deletes

or

kubectl logs --namespace emr -c spark-kubernetes-executor | grep deletes

The next screenshot is an instance of the output of both command.

Checkpoints: Saved on the executor’s native disk or in an S3 bucket

To deal with fault tolerance and fail over from the final dedicated level, RocksDB helps checkpoints. The checkpoint information are often saved on the executor’s disk or in an S3 bucket, together with snapshot and delta or changelog knowledge information.

Beginning with Amazon EMR 7.0 and AWS Glue5.0, RocksDB state retailer supplies a brand new function referred to as changelog checkpointing to improve checkpoint efficiency. when the changelog is enabled (disabled by default) utilizing the setting spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled, RocksDB writes smaller change logs to the storage location (the native disk by default) as an alternative of steadily persisting massive snapshot knowledge. Notice that snapshots are nonetheless created however much less steadily, as proven within the following screenshot.

Right here’s an instance of a checkpoint location path when overridden to an S3 bucket: s3:////state/0/spark_parition_ID/state_version_ID.zip

Finest practices and concerns

This part outlines key methods for fine-tuning RocksDB efficiency and avoiding widespread pitfalls.

1. Reminiscence administration for RocksDB

To stop OOM errors on Spark executors, you’ll be able to configure RocksDB’s reminiscence utilization at both the node stage or occasion stage:

  • Node stage (advisable): Implement a worldwide off-heap reminiscence restrict per executor. On this context, every executor is handled as a RocksDB node. If an executor processes N partitions of a stateful operator, it’s going to have N variety of RocksDB situations on a single executor.
  • Occasion-level: Wonderful-tune particular person RocksDB situations.

Node-level reminiscence management per executor

Beginning with Amazon EMR 7.0 and AWS Glue 5.0 (Spark 3.5), a essential Spark configuration, boundedMemoryUsage, was launched (by SPARK-43311) to implement a worldwide reminiscence cap at a single executor stage that’s shared by a number of RocksDB situations. This prevents RocksDB from consuming unbounded off-heap reminiscence, which might result in OOM errors or executor termination by useful resource managers resembling YARN or Kubernetes.

The next instance exhibits the node-level configuration:

 # Sure complete reminiscence utilization per executor 
 "spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage": "true"
 # Set a static complete reminiscence measurement per executor
 "spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB": "500"
 # For read-heavy workloads, cut up reminiscence allocation between write buffers (30%) and block cache (70%) 
 "spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio": "0.3"

A single RocksDB occasion stage management

For granular reminiscence administration, you’ll be able to configure particular person RocksDB situations utilizing the next settings:

# Management MemTable (write buffer) measurement and depend
"spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB": "64"
"spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber": "4"

  • writeBufferSizeMB (default: 64, advised: 64 – 128): Controls the most measurement of a single MemTable in RocksDB, affecting reminiscence utilization and write throughput. This setting is out there in Spark3.5 – [SPARK-42819] and later. It determines the scale of the reminiscence buffer earlier than state knowledge is flushed to disk. Bigger buffer sizes can enhance write efficiency by decreasing SST flush frequency however will improve the executor’s reminiscence utilization. Adjusting this parameter is essential for optimizing reminiscence utilization and write throughput.
  • maxWriteBufferNumber (default: 2, advised: 3 – 4): Units the whole variety of lively and immutable MemTables.

For read-heavy workloads, prioritize the next block cache tuning over write buffers to cut back disk I/O. You’ll be able to configure SST block measurement and caching as follows:

"spark.sql.streaming.stateStore.rocksdb.blockSizeKB": "64"
"spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB": "128"

  •  blockSizeKB (default: 4, advised: 64–128): When an lively MemTable is full, it turns into a read-only memTable. From there, new writes proceed to build up in a brand new desk. The read-only MemTable is flushed into SST information on the disk. The info in SST information is roughly chunked into fixed-sized blocks (default is 4 KB). Every block, in flip, retains a number of knowledge entries. When writing knowledge to SST information, you’ll be able to compress or encode knowledge effectively inside a block, which frequently ends in a smaller knowledge measurement in contrast with its uncooked format.

For workloads with a small state measurement (resembling lower than 10 GB), the default block measurement is often adequate. For a big state (resembling greater than 50 GB), rising the block measurement can enhance compression effectivity and sequential learn efficiency however improve CPU overhead.

  • blockCacheSizeMB (default: 8, advised: 64–512, massive state: greater than 1024): When retrieving knowledge from SST information, RocksDB supplies a cache layer (block cache) to enhance the learn efficiency. It first locates the information block the place the goal document would possibly reside, then caches the block to reminiscence, and eventually searches that document inside the cached block. To keep away from frequent reads of the identical block, the block cache can be utilized to maintain the loaded blocks in reminiscence.

2. Clear up state knowledge at checkpoint

To assist make sure that your state file sizes and storage prices stay beneath management when checkpoint efficiency turns into a priority, use the next Spark configurations to regulate cleanup frequency, retention limits, and checkpoint file sorts:

# clear up RocksDB state each 30 seconds
"spark.sql.streaming.stateStore.maintenanceInterval":"30s"
# retain solely the final 50 state variations  
"spark.sql.streaming.minBatchesToRetain":"50"
# use changelog as an alternative of snapshots
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled":"true"

  • maintenanceInterval (default: 60 seconds): Retaining a state for an extended time frame will help cut back upkeep value and background IO. Nonetheless, longer intervals improve file itemizing time, as a result of state shops usually scan each retained file.
  • minBatchesToRetain (default: 100, advised: 10–50): Limits the variety of state variations retained at checkpoint areas. Decreasing this quantity ends in fewer information being persevered and reduces storage utilization.
  • changelogCheckpointing (default: false, advised: true): Historically, RocksDB snapshots and uploads incremental SST information to checkpoint. To keep away from this value, changelog checkpointing was launched in Amazon EMR7.0+ and AWS Glue 5.0, which write solely state adjustments for the reason that final checkpoint.

To trace an SST file’s retention standing, you’ll be able to search RocksDBFileManager entries within the executor logs. Think about the next logs in Amazon EMR on EKS for example. The output (proven within the screenshot) exhibits that 4 SST information beneath model 102 have been uploaded to an S3 checkpoint location, and that an previous changelog state file with model 97 was cleaned up.

cat /var/log/spark/person/-/stderr/ present | grep RocksDBFileManager

or

kubectl logs  -n emr -c spark-kubernetes-executor | grep RocksDBFileManager

3. Optimize native disk utilization

RocksDB consumes native disk area when producing SST information at every Spark executor. Whereas disk utilization doesn’t scale linearly, RocksDB can accumulate storage over time based mostly on state knowledge measurement. When working streaming jobs, if native obtainable disk area will get inadequate, No area left on system errors can happen.

To optimize disk utilization by RocksDB, regulate the next Spark configurations:

# compact state information throughout commit (default:false)
"spark.sql.streaming.stateStore.rocksdb.compactOnCommit": "true"
# variety of delta SST information earlier than turns into a consolidated snapshot file(default:10)
"spark.sql.streaming.stateStore.minDeltasForSnapshot": "5" 

Infrastructure changes can additional mitigate the disk difficulty:

For Amazon EMR:

For AWS Glue:

  • Use AWS Glue G.2X or bigger employee sorts to keep away from the restricted disk capability of G.1X employees.
  • Schedule common upkeep home windows at optimum timing to release disk area based mostly on workload wants.

Conclusion

On this submit, we explored RocksDB because the new state retailer implementation in Apache Spark Structured Streaming, obtainable on Amazon EMR and AWS Glue. RocksDB provides benefits over the default HDFS-backed in-memory state retailer, notably for purposes coping with large-scale stateful operations. RocksDB helps forestall JVM reminiscence stress and rubbish assortment points widespread with the default state retailer.

The implementation is simple, requiring minimal configuration adjustments, although it is best to pay cautious consideration to reminiscence and disk area administration for optimum efficiency. Whereas RocksDB isn’t assured to cut back job latency, it supplies a sturdy answer for dealing with large-scale stateful operations in Spark Structured Streaming purposes.

We encourage you to guage RocksDB in your use instances, notably for those who’re experiencing reminiscence stress points with the default state retailer or have to deal with massive quantities of state knowledge in your streaming purposes.


In regards to the authors

Melody Yang is a Senior Massive Information Resolution Architect for Amazon EMR at AWS. She is an skilled analytics chief working with AWS clients to supply finest apply steering and technical recommendation with a view to help their success in knowledge transformation. Her areas of pursuits are open-source frameworks and automation, knowledge engineering and DataOps.

Dai Ozaki is a Cloud Assist Engineer on the AWS Massive Information Assist crew. He’s captivated with serving to clients construct knowledge lakes utilizing ETL workloads. In his spare time, he enjoys taking part in desk tennis.

Noritaka Sekiyama is a Principal Massive Information Architect with Amazon Net Companies (AWS) Analytics companies. He’s liable for constructing software program artifacts to assist clients. In his spare time, he enjoys biking on his street bike.

Amir Shenavandeh is a Sr Analytics Specialist Options Architect and Amazon EMR material knowledgeable at Amazon Net Companies. He helps clients with architectural steering and optimisation. He leverages his expertise to assist individuals convey their concepts to life, specializing in distributed processing and massive knowledge architectures.

Xi Yang is a Senior Hadoop System Engineer and Amazon EMR material knowledgeable at Amazon Net Companies. He’s captivated with serving to clients resolve difficult points within the Massive Information space.

Related Articles

Latest Articles