Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
00cba16
S3 tables iceberg support
subkanthi May 16, 2026
a975f19
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 16, 2026
284666c
Add allow_experimental_database_s3_tables to SettingsChangeHistory
subkanthi May 17, 2026
6c4e4ac
Merge branch 'antalya_26_3_s3_tables' of https://github.com/Altinity/…
subkanthi May 17, 2026
1291c0a
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 18, 2026
373e604
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 18, 2026
0e9f56d
Added logic to handle empty s3 credentials and removed extra setting …
subkanthi May 20, 2026
8f3b88d
Merge branch 'antalya_26_3_s3_tables' of https://github.com/Altinity/…
subkanthi May 20, 2026
4f0e6ad
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 21, 2026
26d820f
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 23, 2026
25ce146
trigger ci
alsugiliazova May 26, 2026
8ef0cbc
Retrigger CI
alsugiliazova May 26, 2026
9c2bda6
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 27, 2026
55635ea
Fixed defects
subkanthi May 28, 2026
9912d51
Merge branch 'antalya_26_3_s3_tables' of https://github.com/Altinity/…
subkanthi May 28, 2026
202c052
Fix audit defects
subkanthi May 28, 2026
3a87976
Fix compiler error
subkanthi May 28, 2026
832a30d
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 28, 2026
ab91d6a
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 29, 2026
1e8c14e
Merge branch 'antalya-26.3' into antalya_26_3_s3_tables
subkanthi May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7893,6 +7893,9 @@ Multiple algorithms can be specified, e.g. 'dpsize,greedy'.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_paimon_rest_catalog, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_rest'
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_database_s3_tables, false, R"(
Allow experimental database engine DataLakeCatalog with catalog_type = 's3tables' (Amazon S3 Tables Iceberg REST with SigV4)
)", EXPERIMENTAL) \
DECLARE(UInt64, webassembly_udf_max_fuel, 100'000, R"(
Fuel limit per WebAssembly UDF instance execution. Each WebAssembly instruction consumes some amount of fuel.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"iceberg_expire_default_max_ref_age_ms", 9223372036854775807, 9223372036854775807, "New setting."},
{"max_skip_unavailable_shards_num", 0, 0, "New setting to limit the number of shards that can be silently skipped when skip_unavailable_shards is enabled."},
{"max_skip_unavailable_shards_ratio", 0, 0, "New setting to limit the ratio of shards that can be silently skipped when skip_unavailable_shards is enabled."},
{"allow_experimental_database_s3_tables", false, false, "New setting to enable experimental database S3 tables (AWS Iceberg REST catalog)."},
});
addSettingsChanges(settings_changes_history, "26.2",
{
Expand Down
3 changes: 2 additions & 1 deletion src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ IMPLEMENT_SETTING_ENUM(
{"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE},
{"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE},
{"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE},
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}})
{"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST},
{"s3tables", DatabaseDataLakeCatalogType::S3_TABLES}})

IMPLEMENT_SETTING_ENUM(
FileCachePolicy,
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t
ICEBERG_ONELAKE,
ICEBERG_BIGLAKE,
PAIMON_REST,
S3_TABLES,
};

DECLARE_SETTING_ENUM(DatabaseDataLakeCatalogType)
Expand Down
110 changes: 110 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Databases/DataLake/AWSV4Signer.h>

#include <Common/Exception.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/String.h>

#include <aws/core/auth/signer/AWSAuthV4Signer.h>
#include <aws/core/http/standard/StandardHttpRequest.h>
#include <aws/core/http/URI.h>
#include <aws/core/utils/memory/AWSMemory.h>

#include <sstream>
#include <utility>

namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
}
}

namespace DataLake
{
namespace
{

Aws::Http::HttpMethod mapPocoMethodToAws(const String & method)
{
using Aws::Http::HttpMethod;
using Poco::Net::HTTPRequest;

static const std::pair<String, HttpMethod> supported_methods[] = {
{HTTPRequest::HTTP_GET, HttpMethod::HTTP_GET},
{HTTPRequest::HTTP_POST, HttpMethod::HTTP_POST},
{HTTPRequest::HTTP_PUT, HttpMethod::HTTP_PUT},
{HTTPRequest::HTTP_DELETE, HttpMethod::HTTP_DELETE},
{HTTPRequest::HTTP_HEAD, HttpMethod::HTTP_HEAD},
{HTTPRequest::HTTP_PATCH, HttpMethod::HTTP_PATCH},
};

for (const auto & [poco_method, aws_method] : supported_methods)
if (method == poco_method)
return aws_method;

throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported HTTP method for AWS SigV4 signing: {}", method);
}

}

void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers)
{
const Aws::Http::URI aws_uri(uri.toString().c_str());
Aws::Http::Standard::StandardHttpRequest request(aws_uri, mapPocoMethodToAws(method));

for (const auto & h : extra_headers)
{
if (Poco::icompare(h.name, "authorization") == 0)
continue;
request.SetHeaderValue(Aws::String(h.name.c_str(), h.name.size()), Aws::String(h.value.c_str(), h.value.size()));
}

if (!payload.empty())
{
auto body_stream = Aws::MakeShared<std::stringstream>("AWSV4Signer");
body_stream->write(payload.data(), static_cast<std::streamsize>(payload.size()));
body_stream->seekg(0);
request.AddContentBody(body_stream);
}

static constexpr bool sign_body = true;
if (!signer.SignRequest(request, region.c_str(), service.c_str(), sign_body))
throw DB::Exception(DB::ErrorCodes::S3_ERROR, "AWS SigV4 signing failed");

bool has_authorization = false;
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "authorization") == 0 && !value.empty())
has_authorization = true;
}
if (!has_authorization)
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"AWS credentials are missing or incomplete; cannot sign S3 Tables REST request");

out_headers.clear();
for (const auto & [key, value] : request.GetHeaders())
{
if (Poco::icompare(key, "host") == 0)
continue;
out_headers.emplace_back(String(key.c_str(), key.size()), String(value.c_str(), value.size()));
}
}

}

#endif
34 changes: 34 additions & 0 deletions src/Databases/DataLake/AWSV4Signer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "config.h"

#if USE_AVRO && USE_SSL && USE_AWS_S3

#include <Core/Types.h>
#include <IO/HTTPHeaderEntries.h>
#include <Poco/URI.h>

namespace Aws::Client
{
class AWSAuthV4Signer;
}

namespace DataLake
{

/// Sign a Poco-style HTTP request using the AWS SDK's AWSAuthV4Signer.
/// Builds a temporary Aws::Http::StandardHttpRequest, signs it, then extracts
/// the resulting headers into out_headers (excluding Host; ReadWriteBufferFromHTTP sets it from the URI).
void signRequestWithAWSV4(
const String & method,
const Poco::URI & uri,
const DB::HTTPHeaderEntries & extra_headers,
const String & payload,
Aws::Client::AWSAuthV4Signer & signer,
const String & region,
const String & service,
DB::HTTPHeaderEntries & out_headers);

}

#endif
54 changes: 51 additions & 3 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include <Databases/DataLake/RestCatalog.h>
#include <Databases/DataLake/GlueCatalog.h>
#include <Databases/DataLake/PaimonRestCatalog.h>
#if USE_AWS_S3 && USE_SSL
#include <Databases/DataLake/S3TablesCatalog.h>
#endif
#include <DataTypes/DataTypeString.h>

#include <Storages/ObjectStorage/S3/Configuration.h>
Expand Down Expand Up @@ -91,6 +94,7 @@ namespace Setting
extern const SettingsBool allow_experimental_database_glue_catalog;
extern const SettingsBool allow_experimental_database_hms_catalog;
extern const SettingsBool allow_experimental_database_paimon_rest_catalog;
extern const SettingsBool allow_experimental_database_s3_tables;
extern const SettingsBool use_hive_partitioning;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
Expand Down Expand Up @@ -143,8 +147,20 @@ void DatabaseDataLake::validateSettings()
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue Catalog. "
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for Glue catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::catalog_type].value == DB::DatabaseDataLakeCatalogType::S3_TABLES)
{
if (settings[DatabaseDataLakeSetting::region].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`region` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS region=<region_name>' in the CREATE DATABASE query");

if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty for S3 Tables catalog. "
"Please specify 'SETTINGS warehouse=<table_bucket_arn>' in the CREATE DATABASE query");
}
else if (settings[DatabaseDataLakeSetting::warehouse].value.empty())
{
Expand Down Expand Up @@ -299,6 +315,23 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
}
break;
}
case DB::DatabaseDataLakeCatalogType::S3_TABLES:
{
#if USE_AWS_S3 && USE_SSL
catalog_impl = std::make_shared<DataLake::S3TablesCatalog>(
settings[DatabaseDataLakeSetting::warehouse].value,
url,
settings[DatabaseDataLakeSetting::region].value,
catalog_parameters,
settings[DatabaseDataLakeSetting::namespaces].value,
Context::getGlobalContextInstance());
#else
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Amazon S3 Tables catalog requires ClickHouse built with USE_AWS_S3 and USE_SSL");
#endif
break;
}
}

return catalog_impl;
Expand Down Expand Up @@ -332,6 +365,7 @@ StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
case DatabaseDataLakeCatalogType::ICEBERG_HIVE:
case DatabaseDataLakeCatalogType::ICEBERG_REST:
case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE:
case DatabaseDataLakeCatalogType::S3_TABLES:
{
switch (type)
{
Expand Down Expand Up @@ -955,9 +989,10 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}

if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST
&& catalog_type != DatabaseDataLakeCatalogType::S3_TABLES)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must use `rest` or `s3tables` catalog type only");
}

for (auto & engine_arg : engine_args)
Expand Down Expand Up @@ -1043,6 +1078,19 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
engine_func->name = "Paimon";
break;
}
case DatabaseDataLakeCatalogType::S3_TABLES:
{
if (!args.create_query.attach
&& !args.context->getSettingsRef()[Setting::allow_experimental_database_s3_tables])
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"DatabaseDataLake with S3 Tables catalog is experimental. "
"To allow its usage, enable setting allow_experimental_database_s3_tables");
}

engine_func->name = "Iceberg";
break;
}
case DatabaseDataLakeCatalogType::NONE:
break;
}
Expand Down
1 change: 0 additions & 1 deletion src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class TableMetadata
bool hasLocation() const;
bool hasSchema() const;
bool hasStorageCredentials() const;
bool hasDataLakeSpecificProperties() const;

void setLocation(const std::string & location_);
std::string getLocation() const;
Expand Down
35 changes: 25 additions & 10 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Pt
result.default_base_location = object->get("default-base-location").extract<String>();
}

DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const
DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(
bool update_token,
const String & /*method*/,
const Poco::URI & /*url*/,
const DB::HTTPHeaderEntries & /*extra_headers*/,
const String & /*body*/) const
{
/// Option 1: user specified auth header manually.
/// Header has format: 'Authorization: <scheme> <token>'.
Expand Down Expand Up @@ -387,7 +392,12 @@ BigLakeCatalog::BigLakeCatalog(
config = loadConfig();
}

DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(bool update_token) const
DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(
bool update_token,
const String & /*method*/,
const Poco::URI & /*url*/,
const DB::HTTPHeaderEntries & /*extra_headers*/,
const String & /*body*/) const
{
/// Google Cloud OAuth2 for BigLake.
/// Uses GCP metadata service or Application Default Credentials to get access token.
Expand Down Expand Up @@ -542,7 +552,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(

auto create_buffer = [&](bool update_token)
{
auto result_headers = getAuthHeaders(update_token);
auto result_headers = getAuthHeaders(update_token, Poco::Net::HTTPRequest::HTTP_GET, url, headers, {});
std::move(headers.begin(), headers.end(), std::back_inserter(result_headers));

return DB::BuilderRWBufferFromHTTP(url)
Expand Down Expand Up @@ -981,9 +991,6 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r
request_body->stringify(oss);
const std::string body_str = DB::removeEscapedSlashes(oss.str());

DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true);
headers.emplace_back("Content-Type", "application/json");

const auto & context = getContext();

DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback;
Expand All @@ -997,6 +1004,12 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r

/// enable_url_encoding=false to allow use tables with encoded sequences in names like 'foo%2Fbar'
Poco::URI url(endpoint, /* enable_url_encoding */ false);

DB::HTTPHeaderEntries extra_headers;
extra_headers.emplace_back("Content-Type", "application/json");

DB::HTTPHeaderEntries headers = getAuthHeaders(/* update_token = */ true, method, url, extra_headers, body_str);
headers.emplace_back("Content-Type", "application/json");
auto wb = DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withMethod(method)
Expand All @@ -1017,7 +1030,7 @@ void RestCatalog::sendRequest(const String & endpoint, Poco::JSON::Object::Ptr r

void RestCatalog::createNamespaceIfNotExists(const String & namespace_name, const String & location) const
{
const std::string endpoint = fmt::format("{}/namespaces", base_url);
const std::string endpoint = base_url / config.prefix / "namespaces";

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Expand Down Expand Up @@ -1049,7 +1062,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

createNamespaceIfNotExists(namespace_name, metadata_content->getValue<String>("location"));

const std::string endpoint = fmt::format("{}/namespaces/{}/tables", base_url, namespace_name);
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables";

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
request_body->set("name", table_name);
Expand Down Expand Up @@ -1086,7 +1099,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const
{
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name;

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Expand Down Expand Up @@ -1156,7 +1169,9 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_
"Failed to drop table {}, namespace {} is filtered by `namespaces` database parameter",
table_name, namespace_name);

const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}?purgeRequested=False", base_url, namespace_name, table_name);
const std::string endpoint
= (base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name).string()
+ "?purgeRequested=False";

Poco::JSON::Object::Ptr request_body = nullptr;
try
Expand Down
Loading
Loading