What is the purpose of the pull request
This PR adds a new marker file strategy that optimizes the marker-related latency for file systems with non-trivial file I/O latency, such as Amazon S3. The existing marker file mechanism creates one marker file per data file written. When listing and deleting the marker files, S3 throttles the I/O operations if the number of marker files is huge and it can take 10 minutes for listing alone. When write concurrency is high, the marker creation can also be throttled by S3. Such behavior degrades the performance of the write. The new timeline-server-based marker file strategy delegates the marker creation and other marker-related operations from individual executors to the timeline server. The timeline server maintains the markers and consistency by periodically flushing the in-memory markers to a limited number of underlying files in the file system. In such a way, the number of actual file operations and latency related to markers can be reduced, thus improving the performance of the writes.
To improve the efficiency of processing marker creation requests, we design the batch processing in the handler of marker requests at the timeline server. Each marker creation request is handled asynchronously in the Javalin timeline server and queued before processing. For every batch interval, e.g., 20ms, a dispatching thread pulls the pending requests from the queue and sends them to the worker thread for processing. Each worker thread processes the marker creation requests, sets the responses, and flushes the new markers by overwriting the underlying file storing the markers in the file system. There can be multiple worker threads running concurrently, given that the file overwriting takes longer than the batch interval, and each worker thread writes to an exclusive file not touched by other threads, to guarantee consistency and correctness. Both the batch interval and the number of worker threads can be configured through the write options.
Note that the worker thread always checks whether the marker has already been created by comparing the marker name from the request with the memory copy of all markers maintained in the marker directory state class. The underlying files storing the markers are only read upon the first marker request (lazy loading). The responses of requests are only sent back once the new markers are flushed to the files, so that in the case of timeline server failure, the timeline server can recover the already created markers. These ensure consistency between the file system and the in-memory copy, and improve the performance of processing marker requests.
Since the new timeline-server-based marker mechanism is intended for cloud storage, it is not supported for HDFS at this point (an exception is thrown if timeline-server-based marker mechanism is configured for writes on HDFS).
Brief change log
- New marker-related write options in
HoodieWriteConfig
hoodie.write.markers.type
: marker type to use for the writes. Two modes are supported: direct
and timeline_server_based
.
hoodie.markers.timeline_server_based.batch.num_threads
: number of threads to use for batch processing marker creation requests at the timeline server for timeline-server-based marker strategy
hoodie.markers.timeline_server_based.batch.interval_ms
: the batch interval in milliseconds for marker creation batch processing for timeline-server-based marker strategy
- Abstraction of marker mechanism (
WriteMarkers
class)
- Provides an interface for marker file strategy, containing different operations (create marker, delete marker directory, get all markers, etc.)
- Creates an enum,
MarkerType
, to indicate the marker type used for the current write
- Creates a new class,
DirectWriteMarkers
, for the existing mechanism which directly creates a marker file per data file in the file system from each executor
- Creates a factory class,
WriteMarkersFactory
, to generate WriteMarkers
instance based on the marker type. Throws HoodieException
if timeline_server_based
is used for HDFS since the support for HDFS is not ready.
- Uses
WriteMarkersFactory
at all places where markers are needed (write handles, Java, Spark, Flink engines)
- New timeline-server-based marker file implementation
- Creates a new class,
TimelineServerBasedWriteMarkers
, for the timeline-server-based marker file strategy, which sends requests to the timeline server for all marker-related operations
- Handling marker requests at the timeline server
- Adds a class to store request URLs and parameters in
MarkerOperation
- Creates
MarkerDirState
class to store the state of a marker directory and operate on the markers inside the directory
- Adds a future class,
MarkerCreationFuture
, to be able to process marker creation requests asynchronously
- Creates a request handler,
MarkerHandler
, for marker-related requests, which schedules the dispatching thread at a fixed rate based on the batching interval upon the first marker creation request
- Creates a
MarkerCreationDispatchingRunnable
for scheduling periodic, batched processing of marker creation requests
- Creates a
BatchedMarkerCreationRunnable
for batch processing marker creation requests
- Adds a context class,
BatchedMarkerCreationContext
, for passing the pending requests from the dispatching thread to the worker threads
- New units around timeline-server-based marker file strategy
- Code cleanup
- Refactoring the configuration of
EmbeddedTimelineService
, TimelineService
and RequestHandler
Follow-up TODOs are summarized in this ticket: https://issues.apache.org/jira/projects/HUDI/issues/HUDI-2271
Verify this pull request
This direct marker file strategy is covered by existing tests in TestDirectWriteMarkers
. This change adds unit tests in TestTimelineServerBasedWriteMarkers
to verify the new timeline-server-based marker file strategy.
We have also manually verified the change by running multiple Spark jobs on large datasets in the cluster with different file system settings:
- Hadoop/Yarn cluster with HDFS testing succeeds with both marker file strategies for 30GB input data (1000 parquet files, 400M records) producing 43k data files (Note that this is done by explicitly commenting out the exception throwing in the
WriteMarkersFactory
so the new timeline-server-based markers can be used on HDFS. By default, the job should throw an exception in this setup, which is also tested below. We plan to enhance the support of timeline-server-based markers for HDFS in follow-ups.)
direct
: 19.6 mins (1175943 ms)
timeline-server-based
: 19.7 mins (1179346 ms)
- Amazon EMR with S3 testing succeeds with both marker file strategies for 100GB input data producing 165k data files, thanks to @nsivabalan 's help
direct
: 55 mins
timeline-server-based
: 38 mins
- Testing of retried stages in Spark succeeded with expected cleanup of invalid data files and correct number of records in the table
- Enables spark speculative execution in the spark shell with the following config so that the Spark automatically kills the tasks beyond a certain task duration and retries with new attempts, emulating the failed stages intentionally
spark.speculation=true
spark.speculation.multiplier=1.0
spark.speculation.quantile=0.5
- Runs the same job with timeline-server-based marker file strategy
- Driver logs show that there are quite some tasks killed and retried:
WARN scheduler.TaskSetManager: Lost task 1537.1 in stage 22.0 (TID 17375, executor 290): TaskKilled (another attempt succeeded)
- Executor logs show that invalid data files are indeed deleted at the end
- Stage:
Delete invalid files generated during the write operation
, collect at HoodieSparkEngineContext.java:73
- Log:
21/08/06 19:39:01 INFO table.HoodieTable: Deleting invalid data files=[(/tmp/temp_marker_test_table/asia/india/chennai,/tmp/temp_marker_test_table/asia/india/chennai/5dcae584-1563-4f13-9ce4-955864a2b616-38_162-12-2694_20210806192244.parquet),...
- Verifies that the number of records written to the table is the same as the input
val df2 = spark.read.parquet("/tmp/temp_marker_test_table/*/*/*/*.parquet")
df2.select("_hoodie_record_key").distinct.count
outputs the same count (400M) as the input df
- Testing of configuring timeline-server-based markers on HDFS and it throws
HoodieException
, which is expected:
Caused by: org.apache.hudi.exception.HoodieException: Timeline-server-based markers are not supported for HDFS: base path /tmp/temp_marker_test_table
More details regarding the Hadoop/Yarn cluster with HDFS testing:
val df = spark.read.parquet("/tmp/testdata/*/*.parquet")
- Spark shell command to run direct marker file strategy
spark.time(df.write.format("hudi").
| option("hoodie.bulkinsert.shuffle.parallelism", "200").
| option("hoodie.datasource.write.recordkey.field", "uuid").
| option("hoodie.datasource.write.partitionpath.field", "partitionpath").
| option("hoodie.datasource.write.operation", "bulk_insert").
| option("hoodie.write.markers.type", "direct").
| option("hoodie.parquet.max.file.size", "1048576").
| option("hoodie.parquet.block.size", "1048576").
| option("hoodie.table.name", "temp_marker_test_table").
| mode(Overwrite).
| save("/tmp/temp_marker_test_table/")
)
- Spark shell command to run timeline-server-based marker file strategy
spark.time(df.write.format("hudi").
| option("hoodie.bulkinsert.shuffle.parallelism", "200").
| option("hoodie.datasource.write.recordkey.field", "uuid").
| option("hoodie.datasource.write.partitionpath.field", "partitionpath").
| option("hoodie.datasource.write.operation", "bulk_insert").
| option("hoodie.write.markers.type", "timeline_server_based").
| option("hoodie.markers.timeline_server_based.batch.num_threads", "20").
| option("hoodie.markers.timeline_server_based.batch.interval_ms", "20").
| option("hoodie.parquet.max.file.size", "1048576").
| option("hoodie.parquet.block.size", "1048576").
| option("hoodie.table.name", "temp_marker_test_table").
| mode(Overwrite).
| save("/tmp/temp_marker_test_table/")
)
- Snapshot of files storing markers till the end of the job in timeline-server-based marker file strategy
hadoop fs -ls /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940
Found 20 items
251846 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS0
253029 2021-08-06 16:38 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS1
259197 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS10
252838 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS11
254775 2021-08-06 16:38 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS12
249139 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS13
278320 2021-08-06 16:38 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS14
266073 2021-08-06 16:38 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS15
252428 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS16
243916 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS17
260791 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS18
249626 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS19
252976 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS2
247380 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS3
253442 2021-08-06 16:38 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS4
248841 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS5
249906 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS6
246458 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS7
241654 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS8
243208 2021-08-06 16:39 /tmp/temp_marker_test_table/.hoodie/.temp/20210806161940/MARKERS9
Committer checklist
-
[ ] Has a corresponding JIRA in PR title & commit
-
[ ] Commit message is descriptive of the change
-
[ ] CI is green
-
[ ] Necessary doc changes done or have another open PR
-
[ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
priority:blocker