Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ void ExportPartitionManifestUpdatingTask::poll()
/// A replica exported the last part but the commit never landed. Try to fix it.
try
{
ExportPartitionUtils::commit(work.metadata, work.destination_storage, zk, log_ptr, work.entry_path, work.context, storage);
ExportPartitionUtils::commit(work.metadata, work.destination_storage, zk, log_ptr, work.entry_path, work.context, storage, storage.getReplicaName());
}
catch (const Exception & e)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(
try
{
auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest);
ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context, storage);
ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context, storage, storage.replica_name);
}
catch (const Exception & e)
{
Expand Down
16 changes: 15 additions & 1 deletion src/Storages/MergeTree/ExportPartitionUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ namespace ExportPartitionUtils
const LoggerPtr & log,
const std::string & entry_path,
const ContextPtr & context_in,
MergeTreeData & source_storage)
MergeTreeData & source_storage,
const String & replica_name)
{
auto context = Context::createCopy(context_in);
context->setSetting("write_full_path_in_iceberg_metadata", manifest.write_full_path_in_iceberg_metadata);
Expand All @@ -171,6 +172,19 @@ namespace ExportPartitionUtils
"Failpoint: export_partition_commit_always_throw");
});

/// Per-task ephemeral lock that serializes the commit phase across replicas.
/// Without it, `handlePartExportSuccess` (post-last-part path) and `tryCleanup`
/// (poll/recovery path) can drive `commitExportPartitionTransaction` concurrently
/// for the same task.
const auto commit_lock_path = fs::path(entry_path) / "commit_lock";
auto commit_lock = zkutil::EphemeralNodeHolder::tryCreate(commit_lock_path, *zk, replica_name);
if (!commit_lock)
{
LOG_INFO(log, "ExportPartition: commit_lock for {} is held by another replica, skipping commit on this replica", entry_path);
return;
}
LOG_INFO(log, "ExportPartition: commit_lock for {} acquired by replica {}", entry_path, replica_name);

const auto exported_paths = ExportPartitionUtils::getExportedPaths(log, zk, entry_path);

if (exported_paths.empty())
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/ExportPartitionUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ namespace ExportPartitionUtils
const LoggerPtr & log,
const std::string & entry_path,
const ContextPtr & context,
MergeTreeData & source_storage
MergeTreeData & source_storage,
const String & replica_name
);

/// Handles a commit-phase failure for a replicated partition export:
Expand Down
Loading