From d65b0d22e5162f123e6c1ff0f8862cf5035c3848 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 27 May 2026 15:03:48 -0300 Subject: [PATCH 1/2] introduce commit lock --- .../ExportPartitionManifestUpdatingTask.cpp | 2 +- .../ExportPartitionTaskScheduler.cpp | 2 +- .../MergeTree/ExportPartitionUtils.cpp | 19 ++++++++++++++++++- src/Storages/MergeTree/ExportPartitionUtils.h | 3 ++- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 106dff071188..7e10882ea049 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -247,7 +247,7 @@ namespace /// it sounds like a replica exported the last part, but was not able to commit the export. Try to fix it try { - ExportPartitionUtils::commit(metadata, destination_storage, zk, log, entry_path, context, storage); + ExportPartitionUtils::commit(metadata, destination_storage, zk, log, entry_path, context, storage, storage.getReplicaName()); } catch (const Exception & e) { diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index ef5ddbdcdbff..d722fb77b2c0 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -297,7 +297,7 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess( try { auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest); - ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context, storage); + ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context, storage, storage.replica_name); } catch (const Exception & e) { diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 81df09c86523..00173cde589b 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -152,7 +152,8 @@ namespace ExportPartitionUtils const LoggerPtr & log, const std::string & entry_path, const ContextPtr & context_in, - MergeTreeData & source_storage) + MergeTreeData & source_storage, + const String & replica_name) { auto context = Context::createCopy(context_in); context->setSetting("write_full_path_in_iceberg_metadata", manifest.write_full_path_in_iceberg_metadata); @@ -165,6 +166,22 @@ namespace ExportPartitionUtils "Failpoint: export_partition_commit_always_throw"); }); + /// Per-task ephemeral lock that serializes the commit phase across replicas. + /// Without it, `handlePartExportSuccess` (post-last-part path) and `tryCleanup` + /// (poll/recovery path) can drive `commitExportPartitionTransaction` concurrently + /// for the same task. The Iceberg metadata-file CAS (`If-None-Match: *`) plus the + /// `clickhouse.export-partition-transaction-id` summary check remain in place as + /// defense in depth for the residual case where the Keeper session expires while + /// the holder is still inside the Iceberg write. + const auto commit_lock_path = fs::path(entry_path) / "commit_lock"; + auto commit_lock = zkutil::EphemeralNodeHolder::tryCreate(commit_lock_path, *zk, replica_name); + if (!commit_lock) + { + LOG_INFO(log, "ExportPartition: commit_lock for {} is held by another replica, skipping commit on this replica", entry_path); + return; + } + LOG_INFO(log, "ExportPartition: commit_lock for {} acquired by replica {}", entry_path, replica_name); + const auto exported_paths = ExportPartitionUtils::getExportedPaths(log, zk, entry_path); if (exported_paths.empty()) diff --git a/src/Storages/MergeTree/ExportPartitionUtils.h b/src/Storages/MergeTree/ExportPartitionUtils.h index d7bb83224755..eb67d288d71e 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.h +++ b/src/Storages/MergeTree/ExportPartitionUtils.h @@ -43,7 +43,8 @@ namespace ExportPartitionUtils const LoggerPtr & log, const std::string & entry_path, const ContextPtr & context, - MergeTreeData & source_storage + MergeTreeData & source_storage, + const String & replica_name ); /// Handles a commit-phase failure for a replicated partition export: From e4e5580701c81b6e9afd0856cf5a6b3da9413ad3 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 27 May 2026 15:05:25 -0300 Subject: [PATCH 2/2] rmv unnecessary comment --- src/Storages/MergeTree/ExportPartitionUtils.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 00173cde589b..7cd58642e7a4 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -169,10 +169,7 @@ namespace ExportPartitionUtils /// Per-task ephemeral lock that serializes the commit phase across replicas. /// Without it, `handlePartExportSuccess` (post-last-part path) and `tryCleanup` /// (poll/recovery path) can drive `commitExportPartitionTransaction` concurrently - /// for the same task. The Iceberg metadata-file CAS (`If-None-Match: *`) plus the - /// `clickhouse.export-partition-transaction-id` summary check remain in place as - /// defense in depth for the residual case where the Keeper session expires while - /// the holder is still inside the Iceberg write. + /// for the same task. const auto commit_lock_path = fs::path(entry_path) / "commit_lock"; auto commit_lock = zkutil::EphemeralNodeHolder::tryCreate(commit_lock_path, *zk, replica_name); if (!commit_lock)