diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 3ce171e5fa3e..b45f3667be87 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -530,7 +530,7 @@ void ExportPartitionManifestUpdatingTask::poll() /// A replica exported the last part but the commit never landed. Try to fix it. try { - ExportPartitionUtils::commit(work.metadata, work.destination_storage, zk, log_ptr, work.entry_path, work.context, storage); + ExportPartitionUtils::commit(work.metadata, work.destination_storage, zk, log_ptr, work.entry_path, work.context, storage, storage.getReplicaName()); } catch (const Exception & e) { diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index ef5ddbdcdbff..d722fb77b2c0 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -297,7 +297,7 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess( try { auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest); - ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context, storage); + ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context, storage, storage.replica_name); } catch (const Exception & e) { diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 3badc49d65dd..4a447c8de3ab 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -158,7 +158,8 @@ namespace ExportPartitionUtils const LoggerPtr & log, const std::string & entry_path, const ContextPtr & context_in, - MergeTreeData & source_storage) + MergeTreeData & source_storage, + const String & replica_name) { auto context = Context::createCopy(context_in); context->setSetting("write_full_path_in_iceberg_metadata", manifest.write_full_path_in_iceberg_metadata); @@ -171,6 +172,19 @@ namespace ExportPartitionUtils "Failpoint: export_partition_commit_always_throw"); }); + /// Per-task ephemeral lock that serializes the commit phase across replicas. + /// Without it, `handlePartExportSuccess` (post-last-part path) and `tryCleanup` + /// (poll/recovery path) can drive `commitExportPartitionTransaction` concurrently + /// for the same task. + const auto commit_lock_path = fs::path(entry_path) / "commit_lock"; + auto commit_lock = zkutil::EphemeralNodeHolder::tryCreate(commit_lock_path, *zk, replica_name); + if (!commit_lock) + { + LOG_INFO(log, "ExportPartition: commit_lock for {} is held by another replica, skipping commit on this replica", entry_path); + return; + } + LOG_INFO(log, "ExportPartition: commit_lock for {} acquired by replica {}", entry_path, replica_name); + const auto exported_paths = ExportPartitionUtils::getExportedPaths(log, zk, entry_path); if (exported_paths.empty()) diff --git a/src/Storages/MergeTree/ExportPartitionUtils.h b/src/Storages/MergeTree/ExportPartitionUtils.h index d7bb83224755..eb67d288d71e 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.h +++ b/src/Storages/MergeTree/ExportPartitionUtils.h @@ -43,7 +43,8 @@ namespace ExportPartitionUtils const LoggerPtr & log, const std::string & entry_path, const ContextPtr & context, - MergeTreeData & source_storage + MergeTreeData & source_storage, + const String & replica_name ); /// Handles a commit-phase failure for a replicated partition export: