diff --git a/.github/workflows/Linux.yml b/.github/workflows/Linux.yml index fadfab375..984ad7b4d 100644 --- a/.github/workflows/Linux.yml +++ b/.github/workflows/Linux.yml @@ -80,7 +80,7 @@ jobs: - name: Setup vcpkg uses: lukka/run-vcpkg@v11.1 with: - vcpkgGitCommitId: a42af01b72c28a8e1d7b48107b33e4f286a55ef6 + vcpkgGitCommitId: 84bab45d415d22042bd0b9081aea57f362da3f35 - name: Build extension env: @@ -92,8 +92,9 @@ jobs: - name: Test extension env: POSTGRES_TEST_DATABASE_AVAILABLE: 1 + LOCAL_EXTENSION_REPO: 'build/reldebug/repository' PGSERVICE: postgres run: | psql -c "SELECT 43" source ./create-postgres-tables.sh - make test_reldebug \ No newline at end of file + ./build/reldebug/test/unittest --autoloading available diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index b1ea791eb..9924fdf5c 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -14,21 +14,21 @@ concurrency: jobs: duckdb-stable-build: name: Build extension binaries - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.4.2 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main with: - duckdb_version: v1.4.2 - ci_tools_version: v1.4.2 + duckdb_version: 2323327f4 + ci_tools_version: main extension_name: postgres_scanner exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw' duckdb-stable-deploy: name: Deploy extension binaries needs: duckdb-stable-build - uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.4.2 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main secrets: inherit with: - duckdb_version: v1.4.2 - ci_tools_version: v1.4.2 + duckdb_version: 2323327f4 + ci_tools_version: main extension_name: postgres_scanner exclude_archs: 'wasm_mvp;wasm_eh;wasm_threads;windows_amd64_mingw' deploy_latest: ${{ startsWith(github.ref, 'refs/heads/v') || github.ref == 'refs/heads/main' }} diff --git a/duckdb b/duckdb index 68d7555f6..2323327f4 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 68d7555f68bd25c1a251ccca2e6338949c33986a +Subproject commit 2323327f46a9e0116fc9830f0fa92c13e2d3ce5b diff --git a/extension-ci-tools b/extension-ci-tools index aac964061..e9f3106bf 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit aac9640615e51d6e7e8b72d4bf023703cfd8e479 +Subproject commit e9f3106bf8fb56c232255bf58995f06f47ba0fd3 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 20ada727a..5d47e2718 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -17,7 +17,8 @@ add_library( postgres_scanner.cpp postgres_storage.cpp postgres_text_reader.cpp - postgres_utils.cpp) + postgres_utils.cpp + postgres_logging.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ PARENT_SCOPE) diff --git a/src/include/postgres_binary_reader.hpp b/src/include/postgres_binary_reader.hpp index f4bc33654..5b5cca9ab 100644 --- a/src/include/postgres_binary_reader.hpp +++ b/src/include/postgres_binary_reader.hpp @@ -19,7 +19,7 @@ struct PostgresBinaryReader : public PostgresResultReader { ~PostgresBinaryReader() override; public: - void BeginCopy(const string &sql) override; + void BeginCopy(ClientContext &context, const string &sql) override; PostgresReadResult Read(DataChunk &result) override; protected: diff --git a/src/include/postgres_connection.hpp b/src/include/postgres_connection.hpp index 6a85b2276..69787217c 100644 --- a/src/include/postgres_connection.hpp +++ b/src/include/postgres_connection.hpp @@ -45,14 +45,14 @@ class PostgresConnection { public: static PostgresConnection Open(const string &dsn, const string &attach_path); - void Execute(const string &query); - unique_ptr TryQuery(const string &query, optional_ptr error_message = nullptr); - unique_ptr Query(const string &query); + void Execute(optional_ptr context, const string &query); + unique_ptr TryQuery(optional_ptr context, const string &query, optional_ptr error_message = nullptr); + unique_ptr Query(optional_ptr context, const string &query); //! Submits a set of queries to be executed in the connection. - vector> ExecuteQueries(const string &queries); + vector> ExecuteQueries(ClientContext &context, const string &queries); - PostgresVersion GetPostgresVersion(); + PostgresVersion GetPostgresVersion(ClientContext &context); vector GetIndexInfo(const string &table_name); @@ -64,7 +64,7 @@ class PostgresConnection { void CopyChunk(ClientContext &context, PostgresCopyState &state, DataChunk &chunk, DataChunk &varchar_chunk); void FinishCopyTo(PostgresCopyState &state); - void BeginCopyFrom(const string &query, ExecStatusType expected_result); + void BeginCopyFrom(ClientContext &context, const string &query, ExecStatusType expected_result); bool IsOpen(); void Close(); @@ -87,7 +87,7 @@ class PostgresConnection { static bool DebugPrintQueries(); private: - PGresult *PQExecute(const string &query); + PGresult *PQExecute(optional_ptr context, const string &query); shared_ptr connection; string dsn; diff --git a/src/include/postgres_logging.hpp b/src/include/postgres_logging.hpp new file mode 100644 index 000000000..5c038efca --- /dev/null +++ b/src/include/postgres_logging.hpp @@ -0,0 +1,20 @@ + +#pragma once + +#include "duckdb/logging/logging.hpp" +#include "duckdb/common/string_util.hpp" + +namespace duckdb { + +class PostgresQueryLogType : public LogType { +public: + static constexpr const char *NAME = "PostgresQueryLog"; + static constexpr LogLevel LEVEL = LogLevel::LOG_DEBUG; + + PostgresQueryLogType() : LogType(NAME, LEVEL, GetLogType()) {}; + + static string ConstructLogMessage(const string &str, int64_t duration); + static LogicalType GetLogType(); +}; + +} // namespace diff --git a/src/include/postgres_result_reader.hpp b/src/include/postgres_result_reader.hpp index 2b452ec48..1915c3b53 100644 --- a/src/include/postgres_result_reader.hpp +++ b/src/include/postgres_result_reader.hpp @@ -31,7 +31,7 @@ struct PostgresResultReader { } public: - virtual void BeginCopy(const string &sql) = 0; + virtual void BeginCopy(ClientContext &context, const string &sql) = 0; virtual PostgresReadResult Read(DataChunk &result) = 0; protected: diff --git a/src/include/postgres_text_reader.hpp b/src/include/postgres_text_reader.hpp index 46aabb22c..cbd9491af 100644 --- a/src/include/postgres_text_reader.hpp +++ b/src/include/postgres_text_reader.hpp @@ -20,7 +20,7 @@ struct PostgresTextReader : public PostgresResultReader { ~PostgresTextReader() override; public: - void BeginCopy(const string &sql) override; + void BeginCopy(ClientContext &context, const string &sql) override; PostgresReadResult Read(DataChunk &result) override; private: diff --git a/src/include/storage/postgres_catalog.hpp b/src/include/storage/postgres_catalog.hpp index d6f273a9a..957be9bd5 100644 --- a/src/include/storage/postgres_catalog.hpp +++ b/src/include/storage/postgres_catalog.hpp @@ -21,7 +21,7 @@ class PostgresSchemaEntry; class PostgresCatalog : public Catalog { public: explicit PostgresCatalog(AttachedDatabase &db_p, string connection_string, string attach_path, - AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level); + AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level, ClientContext &context); ~PostgresCatalog(); string connection_string; diff --git a/src/include/storage/postgres_catalog_set.hpp b/src/include/storage/postgres_catalog_set.hpp index cbda4c1cc..0c24441a4 100644 --- a/src/include/storage/postgres_catalog_set.hpp +++ b/src/include/storage/postgres_catalog_set.hpp @@ -23,9 +23,9 @@ class PostgresCatalogSet { public: PostgresCatalogSet(Catalog &catalog, bool is_loaded); - optional_ptr GetEntry(PostgresTransaction &transaction, const string &name); + optional_ptr GetEntry(ClientContext &context, PostgresTransaction &transaction, const string &name); void DropEntry(PostgresTransaction &transaction, DropInfo &info); - void Scan(PostgresTransaction &transaction, const std::function &callback); + void Scan(ClientContext& context, PostgresTransaction &transaction, const std::function &callback); virtual optional_ptr CreateEntry(PostgresTransaction &transaction, shared_ptr entry); void ClearEntries(); virtual bool SupportReload() const { @@ -34,13 +34,13 @@ class PostgresCatalogSet { virtual optional_ptr ReloadEntry(PostgresTransaction &transaction, const string &name); protected: - virtual void LoadEntries(PostgresTransaction &transaction) = 0; + virtual void LoadEntries(ClientContext &context, PostgresTransaction &transaction) = 0; //! Whether or not the catalog set contains dependencies to itself that have //! to be resolved WHILE loading virtual bool HasInternalDependencies() const { return false; } - void TryLoadEntries(PostgresTransaction &transaction); + void TryLoadEntries(ClientContext &context, PostgresTransaction &transaction); protected: Catalog &catalog; diff --git a/src/include/storage/postgres_delete.hpp b/src/include/storage/postgres_delete.hpp index e709ceed1..4e878de01 100644 --- a/src/include/storage/postgres_delete.hpp +++ b/src/include/storage/postgres_delete.hpp @@ -22,7 +22,7 @@ class PostgresDelete : public PhysicalOperator { public: // Source interface - SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; + SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; bool IsSource() const override { return true; diff --git a/src/include/storage/postgres_index.hpp b/src/include/storage/postgres_index.hpp index 47c46c0d4..76593a023 100644 --- a/src/include/storage/postgres_index.hpp +++ b/src/include/storage/postgres_index.hpp @@ -24,7 +24,7 @@ class PostgresCreateIndex : public PhysicalOperator { public: // Source interface - SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; + SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; bool IsSource() const override { return true; diff --git a/src/include/storage/postgres_index_set.hpp b/src/include/storage/postgres_index_set.hpp index 240a2ff42..26236670e 100644 --- a/src/include/storage/postgres_index_set.hpp +++ b/src/include/storage/postgres_index_set.hpp @@ -26,7 +26,7 @@ class PostgresIndexSet : public PostgresInSchemaSet { TableCatalogEntry &table); protected: - void LoadEntries(PostgresTransaction &transaction) override; + void LoadEntries(ClientContext &context, PostgresTransaction &transaction) override; protected: unique_ptr index_result; diff --git a/src/include/storage/postgres_insert.hpp b/src/include/storage/postgres_insert.hpp index d22b4f195..4bd0e2991 100644 --- a/src/include/storage/postgres_insert.hpp +++ b/src/include/storage/postgres_insert.hpp @@ -35,7 +35,7 @@ class PostgresInsert : public PhysicalOperator { public: // Source interface - SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; + SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; bool IsSource() const override { return true; diff --git a/src/include/storage/postgres_optimizer.hpp b/src/include/storage/postgres_optimizer.hpp index 1e573f066..832a5d86c 100644 --- a/src/include/storage/postgres_optimizer.hpp +++ b/src/include/storage/postgres_optimizer.hpp @@ -9,6 +9,7 @@ #pragma once #include "duckdb/main/config.hpp" +#include "duckdb/optimizer/optimizer_extension.hpp" namespace duckdb { diff --git a/src/include/storage/postgres_schema_set.hpp b/src/include/storage/postgres_schema_set.hpp index f9f49f6e7..979a545e5 100644 --- a/src/include/storage/postgres_schema_set.hpp +++ b/src/include/storage/postgres_schema_set.hpp @@ -24,7 +24,7 @@ class PostgresSchemaSet : public PostgresCatalogSet { static string GetInitializeQuery(const string &schema = string()); protected: - void LoadEntries(PostgresTransaction &transaction) override; + void LoadEntries(ClientContext &context, PostgresTransaction &transaction) override; protected: //! Schema to load - if empty loads all schemas (default behavior) diff --git a/src/include/storage/postgres_table_set.hpp b/src/include/storage/postgres_table_set.hpp index 422c2a181..a5993ba35 100644 --- a/src/include/storage/postgres_table_set.hpp +++ b/src/include/storage/postgres_table_set.hpp @@ -26,24 +26,24 @@ class PostgresTableSet : public PostgresInSchemaSet { static unique_ptr GetTableInfo(PostgresTransaction &transaction, PostgresSchemaEntry &schema, const string &table_name); - static unique_ptr GetTableInfo(PostgresConnection &connection, const string &schema_name, + static unique_ptr GetTableInfo(ClientContext &context, PostgresConnection &connection, const string &schema_name, const string &table_name); optional_ptr ReloadEntry(PostgresTransaction &transaction, const string &table_name) override; - void AlterTable(PostgresTransaction &transaction, AlterTableInfo &info); + void AlterTable(ClientContext &context, PostgresTransaction &transaction, AlterTableInfo &info); static string GetInitializeQuery(const string &schema = string(), const string &table = string()); protected: - void LoadEntries(PostgresTransaction &transaction) override; + void LoadEntries(ClientContext &context, PostgresTransaction &transaction) override; bool SupportReload() const override { return true; } - void AlterTable(PostgresTransaction &transaction, RenameTableInfo &info); - void AlterTable(PostgresTransaction &transaction, RenameColumnInfo &info); - void AlterTable(PostgresTransaction &transaction, AddColumnInfo &info); - void AlterTable(PostgresTransaction &transaction, RemoveColumnInfo &info); + void AlterTable(ClientContext &context, PostgresTransaction &transaction, RenameTableInfo &info); + void AlterTable(ClientContext &context, PostgresTransaction &transaction, RenameColumnInfo &info); + void AlterTable(ClientContext &context, PostgresTransaction &transaction, AddColumnInfo &info); + void AlterTable(ClientContext &context, PostgresTransaction &transaction, RemoveColumnInfo &info); static void AddColumn(optional_ptr transaction, optional_ptr schema, PostgresResult &result, idx_t row, PostgresTableInfo &table_info); @@ -55,7 +55,7 @@ class PostgresTableSet : public PostgresInSchemaSet { void CreateEntries(PostgresTransaction &transaction, PostgresResult &result, idx_t start, idx_t end); private: - string GetAlterTablePrefix(PostgresTransaction &transaction, const string &name); + string GetAlterTablePrefix(ClientContext &context, PostgresTransaction &transaction, const string &name); string GetAlterTablePrefix(const string &name, optional_ptr entry); string GetAlterTableColumnName(const string &name, optional_ptr entry); diff --git a/src/include/storage/postgres_transaction.hpp b/src/include/storage/postgres_transaction.hpp index 176fd71b6..77739c8c4 100644 --- a/src/include/storage/postgres_transaction.hpp +++ b/src/include/storage/postgres_transaction.hpp @@ -30,12 +30,12 @@ class PostgresTransaction : public Transaction { PostgresConnection &GetConnectionWithoutTransaction(); PostgresConnection &GetConnection(); - ClientContext &GetContext(); + optional_ptr GetContext(); string GetDSN(); unique_ptr Query(const string &query); unique_ptr QueryWithoutTransaction(const string &query); - vector> ExecuteQueries(const string &queries); + vector> ExecuteQueries(ClientContext &context, const string &queries); static PostgresTransaction &Get(ClientContext &context, Catalog &catalog); optional_ptr ReferenceEntry(shared_ptr &entry); diff --git a/src/include/storage/postgres_type_set.hpp b/src/include/storage/postgres_type_set.hpp index de489990a..9b6321b78 100644 --- a/src/include/storage/postgres_type_set.hpp +++ b/src/include/storage/postgres_type_set.hpp @@ -33,7 +33,7 @@ class PostgresTypeSet : public PostgresInSchemaSet { // composite types can refer to other types return true; } - void LoadEntries(PostgresTransaction &transaction) override; + void LoadEntries(ClientContext &context, PostgresTransaction &transaction) override; void CreateEnum(PostgresTransaction &transaction, PostgresResult &result, idx_t start_row, idx_t end_row); void CreateCompositeType(PostgresTransaction &transaction, PostgresResult &result, idx_t start_row, idx_t end_row); diff --git a/src/include/storage/postgres_update.hpp b/src/include/storage/postgres_update.hpp index 12b5e5e7a..dd25e7594 100644 --- a/src/include/storage/postgres_update.hpp +++ b/src/include/storage/postgres_update.hpp @@ -29,7 +29,7 @@ class PostgresUpdate : public PhysicalOperator { public: // Source interface - SourceResultType GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; + SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override; bool IsSource() const override { return true; diff --git a/src/postgres_attach.cpp b/src/postgres_attach.cpp index dbaac409c..a7aceafda 100644 --- a/src/postgres_attach.cpp +++ b/src/postgres_attach.cpp @@ -59,7 +59,7 @@ GROUP BY relname ORDER BY relname; )", KeywordHelper::WriteQuoted(data.source_schema)); - auto res = conn.Query(fetch_table_query); + auto res = conn.Query(context, fetch_table_query); for (idx_t row = 0; row < PQntuples(res->res); row++) { auto table_name = res->GetString(row, 0); string query; diff --git a/src/postgres_binary_reader.cpp b/src/postgres_binary_reader.cpp index 86ebe3043..0b85922bb 100644 --- a/src/postgres_binary_reader.cpp +++ b/src/postgres_binary_reader.cpp @@ -12,8 +12,8 @@ PostgresBinaryReader::~PostgresBinaryReader() { Reset(); } -void PostgresBinaryReader::BeginCopy(const string &sql) { - con.BeginCopyFrom(sql, PGRES_COPY_OUT); +void PostgresBinaryReader::BeginCopy(ClientContext &context, const string &sql) { + con.BeginCopyFrom(context, sql, PGRES_COPY_OUT); if (!Next()) { throw IOException("Failed to fetch header for COPY \"%s\"", sql); } diff --git a/src/postgres_connection.cpp b/src/postgres_connection.cpp index 4baf81b12..7c890ef54 100644 --- a/src/postgres_connection.cpp +++ b/src/postgres_connection.cpp @@ -2,6 +2,7 @@ #include "duckdb/parser/column_list.hpp" #include "duckdb/parser/parser.hpp" #include "postgres_connection.hpp" +#include "postgres_logging.hpp" #include "duckdb/common/types/uuid.hpp" #include "duckdb/common/shared_ptr.hpp" #include "duckdb/common/helper.hpp" @@ -60,16 +61,26 @@ static bool ResultHasError(PGresult *result) { } } -PGresult *PostgresConnection::PQExecute(const string &query) { +PGresult *PostgresConnection::PQExecute(optional_ptr context, const string &query) { if (PostgresConnection::DebugPrintQueries()) { Printer::Print(query + "\n"); } - return PQexec(GetConn(), query.c_str()); + int64_t start_time = std::chrono::time_point_cast(std::chrono::steady_clock::now()) + .time_since_epoch() + .count(); + auto res = PQexec(GetConn(), query.c_str()); + int64_t end_time = std::chrono::time_point_cast(std::chrono::steady_clock::now()) + .time_since_epoch() + .count(); + if (context) { + DUCKDB_LOG(*context, PostgresQueryLogType, query, end_time - start_time); + } + return res; } -unique_ptr PostgresConnection::TryQuery(const string &query, optional_ptr error_message) { +unique_ptr PostgresConnection::TryQuery(optional_ptr context, const string &query, optional_ptr error_message) { lock_guard guard(connection->connection_lock); - auto result = PQExecute(query.c_str()); + auto result = PQExecute(context, query.c_str()); if (ResultHasError(result)) { if (error_message) { *error_message = StringUtil::Format("Failed to execute query \"" + query + @@ -81,23 +92,26 @@ unique_ptr PostgresConnection::TryQuery(const string &query, opt return make_uniq(result); } -unique_ptr PostgresConnection::Query(const string &query) { +unique_ptr PostgresConnection::Query(optional_ptr context, const string &query) { string error_msg; - auto result = TryQuery(query, &error_msg); + auto result = TryQuery(context, query, &error_msg); if (!result) { throw std::runtime_error(error_msg); } return result; } -void PostgresConnection::Execute(const string &query) { - Query(query); +void PostgresConnection::Execute(optional_ptr context, const string &query) { + Query(context, query); } -vector> PostgresConnection::ExecuteQueries(const string &queries) { +vector> PostgresConnection::ExecuteQueries(ClientContext &context, const string &queries) { if (PostgresConnection::DebugPrintQueries()) { Printer::Print(queries + "\n"); } + int64_t start_time = std::chrono::time_point_cast(std::chrono::steady_clock::now()) + .time_since_epoch() + .count(); auto res = PQsendQuery(GetConn(), queries.c_str()); if (res == 0) { throw std::runtime_error("Failed to execute query \"" + queries + "\": " + string(PQerrorMessage(GetConn()))); @@ -118,11 +132,15 @@ vector> PostgresConnection::ExecuteQueries(const stri } results.push_back(std::move(result)); } + int64_t end_time = std::chrono::time_point_cast(std::chrono::steady_clock::now()) + .time_since_epoch() + .count(); + DUCKDB_LOG(context, PostgresQueryLogType, queries, end_time - start_time); return results; } -PostgresVersion PostgresConnection::GetPostgresVersion() { - auto result = TryQuery("SELECT version(), (SELECT COUNT(*) FROM pg_settings WHERE name LIKE 'rds%')"); +PostgresVersion PostgresConnection::GetPostgresVersion(ClientContext &context) { + auto result = TryQuery(context, "SELECT version(), (SELECT COUNT(*) FROM pg_settings WHERE name LIKE 'rds%')"); if (!result) { PostgresVersion version; version.type_v = PostgresInstanceType::UNKNOWN; diff --git a/src/postgres_copy_from.cpp b/src/postgres_copy_from.cpp index 7108ae2b4..a39fe419d 100644 --- a/src/postgres_copy_from.cpp +++ b/src/postgres_copy_from.cpp @@ -3,8 +3,8 @@ namespace duckdb { -void PostgresConnection::BeginCopyFrom(const string &query, ExecStatusType expected_result) { - PostgresResult pg_res(PQExecute(query.c_str())); +void PostgresConnection::BeginCopyFrom(ClientContext &context, const string &query, ExecStatusType expected_result) { + PostgresResult pg_res(PQExecute(context, query.c_str())); auto result = pg_res.res; if (!result || PQresultStatus(result) != expected_result) { throw std::runtime_error("Failed to prepare COPY \"" + query + "\": " + string(PQresultErrorMessage(result))); diff --git a/src/postgres_copy_to.cpp b/src/postgres_copy_to.cpp index a235b4591..ef8027c50 100644 --- a/src/postgres_copy_to.cpp +++ b/src/postgres_copy_to.cpp @@ -56,7 +56,7 @@ void PostgresConnection::BeginCopyTo(ClientContext &context, PostgresCopyState & } query += ")"; - PostgresResult pg_res(PQExecute(query.c_str())); + PostgresResult pg_res(PQExecute(context, query.c_str())); auto result = pg_res.res; if (!result || PQresultStatus(result) != PGRES_COPY_IN) { throw std::runtime_error("Failed to prepare COPY \"" + query + "\": " + string(PQresultErrorMessage(result))); diff --git a/src/postgres_extension.cpp b/src/postgres_extension.cpp index 42465559f..dce65b121 100644 --- a/src/postgres_extension.cpp +++ b/src/postgres_extension.cpp @@ -19,6 +19,7 @@ #include "duckdb/main/client_context_state.hpp" #include "duckdb/main/connection_manager.hpp" #include "duckdb/common/error_data.hpp" +#include "postgres_logging.hpp" using namespace duckdb; @@ -160,7 +161,7 @@ static void LoadInternal(ExtensionLoader &loader) { loader.RegisterFunction(postgres_secret_function); auto &config = DBConfig::GetConfig(loader.GetDatabaseInstance()); - config.storage_extensions["postgres_scanner"] = make_uniq(); + StorageExtension::Register(config, "postgres_scanner", make_shared_ptr()); config.AddExtensionOption("pg_use_binary_copy", "Whether or not to use BINARY copy to read data", LogicalType::BOOLEAN, Value::BOOLEAN(true)); @@ -190,12 +191,16 @@ static void LoadInternal(ExtensionLoader &loader) { OptimizerExtension postgres_optimizer; postgres_optimizer.optimize_function = PostgresOptimizer::Optimize; - config.optimizer_extensions.push_back(std::move(postgres_optimizer)); + OptimizerExtension::Register(config, std::move(postgres_optimizer)); - config.extension_callbacks.push_back(make_uniq()); + ExtensionCallback::Register(config, make_shared_ptr()); for (auto &connection : ConnectionManager::Get(loader.GetDatabaseInstance()).GetConnectionList()) { connection->registered_state->Insert("postgres_extension", make_shared_ptr()); } + +auto &instance = loader.GetDatabaseInstance(); +auto &log_manager = instance.GetLogManager(); + log_manager.RegisterLogType(make_uniq()); } void PostgresScannerExtension::Load(ExtensionLoader &loader) { diff --git a/src/postgres_logging.cpp b/src/postgres_logging.cpp new file mode 100644 index 000000000..55ab8ab66 --- /dev/null +++ b/src/postgres_logging.cpp @@ -0,0 +1,35 @@ +#include "duckdb/main/attached_database.hpp" +#include "duckdb/logging/file_system_logger.hpp" +#include "duckdb/logging/log_type.hpp" +#include "duckdb/common/file_opener.hpp" +#include "duckdb/common/http_util.hpp" +#include "duckdb/main/client_context.hpp" +#include "duckdb/main/database.hpp" +#include "duckdb/execution/physical_operator.hpp" +#include "postgres_logging.hpp" + +namespace duckdb { + +constexpr LogLevel PostgresQueryLogType::LEVEL; + +//===--------------------------------------------------------------------===// +// PostgresQueryLogType +//===--------------------------------------------------------------------===// +string PostgresQueryLogType::ConstructLogMessage(const string &str, int64_t duration) { + child_list_t child_list = { + {"query", str}, + {"duration_ms", duration}, + }; + + return Value::STRUCT(std::move(child_list)).ToString(); +} + +LogicalType PostgresQueryLogType::GetLogType() { + child_list_t child_list = { + {"query", LogicalType::VARCHAR}, + {"duration_ms", LogicalType::BIGINT}, + }; + return LogicalType::STRUCT(child_list); +} + +} // namespace diff --git a/src/postgres_scanner.cpp b/src/postgres_scanner.cpp index bb4136f41..829f4552d 100644 --- a/src/postgres_scanner.cpp +++ b/src/postgres_scanner.cpp @@ -64,7 +64,7 @@ struct PostgresGlobalState : public GlobalTableFunctionState { PostgresConnection connection; }; -static void PostgresGetSnapshot(PostgresVersion version, const PostgresBindData &bind_data, +static void PostgresGetSnapshot(ClientContext &context, PostgresVersion version, const PostgresBindData &bind_data, PostgresGlobalState &gstate) { unique_ptr result; // by default disable snapshotting @@ -79,7 +79,7 @@ static void PostgresGetSnapshot(PostgresVersion version, const PostgresBindData auto &con = gstate.GetConnection(); // pg_stat_wal_receiver was introduced in PostgreSQL 9.6 if (version < PostgresVersion(9, 6, 0)) { - result = con.TryQuery("SELECT pg_is_in_recovery(), pg_export_snapshot()"); + result = con.TryQuery(context, "SELECT pg_is_in_recovery(), pg_export_snapshot()"); if (result) { auto in_recovery = result->GetBool(0, 0); if (!in_recovery) { @@ -90,7 +90,7 @@ static void PostgresGetSnapshot(PostgresVersion version, const PostgresBindData } result = - con.TryQuery("SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)"); + con.TryQuery(context, "SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)"); if (result) { auto in_recovery = result->GetBool(0, 0) || result->GetInt64(0, 2) > 0; gstate.snapshot = ""; @@ -183,9 +183,9 @@ static unique_ptr PostgresBind(ClientContext &context, TableFuncti bind_data->attach_path = bind_data->dsn; auto con = PostgresConnection::Open(bind_data->dsn, bind_data->attach_path); - auto version = con.GetPostgresVersion(); + auto version = con.GetPostgresVersion(context); // query the table schema so we can interpret the bits in the pages - auto info = PostgresTableSet::GetTableInfo(con, bind_data->schema_name, bind_data->table_name); + auto info = PostgresTableSet::GetTableInfo(context, con, bind_data->schema_name, bind_data->table_name); bind_data->postgres_types = info->postgres_types; for (auto &col : info->create_info->columns.Logical()) { @@ -303,10 +303,10 @@ static idx_t PostgresMaxThreads(ClientContext &context, const FunctionData *bind static unique_ptr GetLocalState(ClientContext &context, TableFunctionInitInput &input, PostgresGlobalState &gstate); -static void PostgresScanConnect(PostgresConnection &conn, string snapshot) { - conn.Execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY"); +static void PostgresScanConnect(ClientContext &context, PostgresConnection &conn, string snapshot) { + conn.Execute(context, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY"); if (!snapshot.empty()) { - conn.Query(StringUtil::Format("SET TRANSACTION SNAPSHOT '%s'", snapshot)); + conn.Query(context, StringUtil::Format("SET TRANSACTION SNAPSHOT '%s'", snapshot)); } } @@ -323,7 +323,7 @@ static unique_ptr PostgresInitGlobalState(ClientContex } else { auto con = PostgresConnection::Open(bind_data.dsn, bind_data.attach_path); if (bind_data.use_transaction) { - PostgresScanConnect(con, string()); + PostgresScanConnect(context, con, string()); } result->SetConnection(std::move(con)); } @@ -353,7 +353,7 @@ static unique_ptr PostgresInitGlobalState(ClientContex result->collection->InitializeScan(result->scan_state); } else { // we create a transaction here, and get the snapshot id to enable transaction-safe parallelism - PostgresGetSnapshot(bind_data.version, bind_data, *result); + PostgresGetSnapshot(context, bind_data.version, bind_data, *result); } return std::move(result); } @@ -407,7 +407,7 @@ bool PostgresGlobalState::TryOpenNewConnection(ClientContext &context, PostgresL } else { lstate.connection = PostgresConnection::Open(bind_data.dsn, bind_data.attach_path); } - PostgresScanConnect(lstate.connection, snapshot); + PostgresScanConnect(context, lstate.connection, snapshot); return true; } @@ -458,7 +458,7 @@ void PostgresLocalState::ScanChunk(ClientContext &context, const PostgresBindDat return; } if (!exec) { - reader->BeginCopy(sql); + reader->BeginCopy(context, sql); exec = true; } auto read_result = reader->Read(output); diff --git a/src/postgres_storage.cpp b/src/postgres_storage.cpp index cfff7466e..bdafb2ba1 100644 --- a/src/postgres_storage.cpp +++ b/src/postgres_storage.cpp @@ -1,5 +1,6 @@ #include "duckdb.hpp" +#include "duckdb/main/settings.hpp" #include "postgres_storage.hpp" #include "storage/postgres_catalog.hpp" #include "duckdb/parser/parsed_data/attach_info.hpp" @@ -11,7 +12,7 @@ static unique_ptr PostgresAttach(optional_ptr sto AttachedDatabase &db, const string &name, AttachInfo &info, AttachOptions &attach_options) { auto &config = DBConfig::GetConfig(context); - if (!config.options.enable_external_access) { + if (!Settings::Get(context)) { throw PermissionException("Attaching Postgres databases is disabled through configuration"); } string attach_path = info.path; @@ -45,7 +46,7 @@ static unique_ptr PostgresAttach(optional_ptr sto } auto connection_string = PostgresCatalog::GetConnectionString(context, attach_path, secret_name); return make_uniq(db, std::move(connection_string), std::move(attach_path), - attach_options.access_mode, std::move(schema_to_load), isolation_level); + attach_options.access_mode, std::move(schema_to_load), isolation_level, context); } static unique_ptr PostgresCreateTransactionManager(optional_ptr storage_info, diff --git a/src/postgres_text_reader.cpp b/src/postgres_text_reader.cpp index 066e0721a..94db174ea 100644 --- a/src/postgres_text_reader.cpp +++ b/src/postgres_text_reader.cpp @@ -13,8 +13,8 @@ PostgresTextReader::~PostgresTextReader() { Reset(); } -void PostgresTextReader::BeginCopy(const string &sql) { - result = con.Query(sql); +void PostgresTextReader::BeginCopy(ClientContext &context, const string &sql) { + result = con.Query(context, sql); row_offset = 0; } diff --git a/src/postgres_utils.cpp b/src/postgres_utils.cpp index 5fc394418..ae3d02172 100644 --- a/src/postgres_utils.cpp +++ b/src/postgres_utils.cpp @@ -90,7 +90,7 @@ LogicalType PostgresUtils::TypeToLogicalType(optional_ptr t // postgres array types start with an _ if (StringUtil::StartsWith(pgtypename, "_")) { if (transaction) { - auto context = transaction->context.lock(); + auto context = transaction->GetContext(); if (!context) { throw InternalException("Context is destroyed!?"); } @@ -204,7 +204,7 @@ LogicalType PostgresUtils::TypeToLogicalType(optional_ptr t postgres_type.info = PostgresTypeAnnotation::CAST_TO_VARCHAR; return LogicalType::VARCHAR; } - auto context = transaction->context.lock(); + auto context = transaction->GetContext(); if (!context) { throw InternalException("Context is destroyed!?"); } diff --git a/src/storage/postgres_catalog.cpp b/src/storage/postgres_catalog.cpp index f91c1f245..888463b27 100644 --- a/src/storage/postgres_catalog.cpp +++ b/src/storage/postgres_catalog.cpp @@ -11,7 +11,7 @@ namespace duckdb { PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_string_p, string attach_path_p, - AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level) + AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level, ClientContext &context) : Catalog(db_p), connection_string(std::move(connection_string_p)), attach_path(std::move(attach_path_p)), access_mode(access_mode), isolation_level(isolation_level), schemas(*this, schema_to_load), connection_pool(*this), default_schema(schema_to_load) { @@ -25,7 +25,7 @@ PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, string connection_strin } auto connection = connection_pool.GetConnection(); - this->version = connection.GetConnection().GetPostgresVersion(); + this->version = connection.GetConnection().GetPostgresVersion(context); } string EscapeConnectionString(const string &input) { @@ -109,7 +109,7 @@ void PostgresCatalog::Initialize(bool load_builtin) { optional_ptr PostgresCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { auto &postgres_transaction = PostgresTransaction::Get(transaction.GetContext(), *this); - auto entry = schemas.GetEntry(postgres_transaction, info.schema); + auto entry = schemas.GetEntry(transaction.GetContext(), postgres_transaction, info.schema); if (entry) { switch (info.on_conflict) { case OnCreateConflict::REPLACE_ON_CONFLICT: { @@ -138,7 +138,7 @@ void PostgresCatalog::DropSchema(ClientContext &context, DropInfo &info) { void PostgresCatalog::ScanSchemas(ClientContext &context, std::function callback) { auto &postgres_transaction = PostgresTransaction::Get(context, *this); - schemas.Scan(postgres_transaction, [&](CatalogEntry &schema) { callback(schema.Cast()); }); + schemas.Scan(context, postgres_transaction, [&](CatalogEntry &schema) { callback(schema.Cast()); }); } optional_ptr PostgresCatalog::LookupSchema(CatalogTransaction transaction, @@ -149,7 +149,7 @@ optional_ptr PostgresCatalog::LookupSchema(CatalogTransactio if (schema_name == "pg_temp") { schema_name = postgres_transaction.GetTemporarySchema(); } - auto entry = schemas.GetEntry(postgres_transaction, schema_name); + auto entry = schemas.GetEntry(transaction.GetContext(), postgres_transaction, schema_name); if (!entry && if_not_found != OnEntryNotFound::RETURN_NULL) { throw BinderException("Schema with name \"%s\" not found", schema_name); } diff --git a/src/storage/postgres_catalog_set.cpp b/src/storage/postgres_catalog_set.cpp index ac13420ad..f0218f093 100644 --- a/src/storage/postgres_catalog_set.cpp +++ b/src/storage/postgres_catalog_set.cpp @@ -8,8 +8,8 @@ namespace duckdb { PostgresCatalogSet::PostgresCatalogSet(Catalog &catalog, bool is_loaded_p) : catalog(catalog), is_loaded(is_loaded_p) { } -optional_ptr PostgresCatalogSet::GetEntry(PostgresTransaction &transaction, const string &name) { - TryLoadEntries(transaction); +optional_ptr PostgresCatalogSet::GetEntry(ClientContext &context, PostgresTransaction &transaction, const string &name) { + TryLoadEntries(context, transaction); { lock_guard l(entry_lock); auto entry = entries.find(name); @@ -40,7 +40,7 @@ optional_ptr PostgresCatalogSet::GetEntry(PostgresTransaction &tra return nullptr; } -void PostgresCatalogSet::TryLoadEntries(PostgresTransaction &transaction) { +void PostgresCatalogSet::TryLoadEntries(ClientContext &context, PostgresTransaction &transaction) { if (HasInternalDependencies()) { if (is_loaded) { return; @@ -51,7 +51,7 @@ void PostgresCatalogSet::TryLoadEntries(PostgresTransaction &transaction) { return; } is_loaded = true; - LoadEntries(transaction); + LoadEntries(context, transaction); } optional_ptr PostgresCatalogSet::ReloadEntry(PostgresTransaction &transaction, const string &name) { @@ -78,8 +78,8 @@ void PostgresCatalogSet::DropEntry(PostgresTransaction &transaction, DropInfo &i entries.erase(info.name); } -void PostgresCatalogSet::Scan(PostgresTransaction &transaction, const std::function &callback) { - TryLoadEntries(transaction); +void PostgresCatalogSet::Scan(ClientContext &context, PostgresTransaction &transaction, const std::function &callback) { + TryLoadEntries(context, transaction); lock_guard l(entry_lock); for (auto &entry : entries) { callback(*entry.second); diff --git a/src/storage/postgres_delete.cpp b/src/storage/postgres_delete.cpp index 8e1c01437..b50fbc171 100644 --- a/src/storage/postgres_delete.cpp +++ b/src/storage/postgres_delete.cpp @@ -96,7 +96,7 @@ SinkFinalizeType PostgresDelete::Finalize(Pipeline &pipeline, Event &event, Clie //===--------------------------------------------------------------------===// // GetData //===--------------------------------------------------------------------===// -SourceResultType PostgresDelete::GetData(ExecutionContext &context, DataChunk &chunk, +SourceResultType PostgresDelete::GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { auto &insert_gstate = sink_state->Cast(); chunk.SetCardinality(1); diff --git a/src/storage/postgres_index.cpp b/src/storage/postgres_index.cpp index d2de80e06..eae13de59 100644 --- a/src/storage/postgres_index.cpp +++ b/src/storage/postgres_index.cpp @@ -20,7 +20,7 @@ PostgresCreateIndex::PostgresCreateIndex(PhysicalPlan &physical_plan, unique_ptr //===--------------------------------------------------------------------===// // Source //===--------------------------------------------------------------------===// -SourceResultType PostgresCreateIndex::GetData(ExecutionContext &context, DataChunk &chunk, +SourceResultType PostgresCreateIndex::GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { auto &catalog = table.catalog; auto &schema = table.schema; diff --git a/src/storage/postgres_index_set.cpp b/src/storage/postgres_index_set.cpp index aacdb83e6..2ccd309c5 100644 --- a/src/storage/postgres_index_set.cpp +++ b/src/storage/postgres_index_set.cpp @@ -1,6 +1,7 @@ #include "storage/postgres_index_set.hpp" #include "storage/postgres_schema_entry.hpp" #include "storage/postgres_transaction.hpp" +#include "duckdb/parser/expression/columnref_expression.hpp" #include "duckdb/parser/parsed_data/create_schema_info.hpp" #include "storage/postgres_index_entry.hpp" #include "duckdb/parser/parsed_expression_iterator.hpp" @@ -26,7 +27,7 @@ ORDER BY pg_namespace.oid; return StringUtil::Replace(base_query, "${CONDITION}", condition); } -void PostgresIndexSet::LoadEntries(PostgresTransaction &transaction) { +void PostgresIndexSet::LoadEntries(ClientContext &context, PostgresTransaction &transaction) { if (!index_result) { throw InternalException("PostgresIndexSet::LoadEntries called without an index result defined"); } diff --git a/src/storage/postgres_insert.cpp b/src/storage/postgres_insert.cpp index e3961e4f9..385351df7 100644 --- a/src/storage/postgres_insert.cpp +++ b/src/storage/postgres_insert.cpp @@ -148,7 +148,7 @@ SinkFinalizeType PostgresInsert::Finalize(Pipeline &pipeline, Event &event, Clie //===--------------------------------------------------------------------===// // GetData //===--------------------------------------------------------------------===// -SourceResultType PostgresInsert::GetData(ExecutionContext &context, DataChunk &chunk, +SourceResultType PostgresInsert::GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { auto &insert_gstate = sink_state->Cast(); chunk.SetCardinality(1); diff --git a/src/storage/postgres_schema_entry.cpp b/src/storage/postgres_schema_entry.cpp index 55df3887b..5c1b9def9 100644 --- a/src/storage/postgres_schema_entry.cpp +++ b/src/storage/postgres_schema_entry.cpp @@ -154,7 +154,7 @@ void PostgresSchemaEntry::Alter(CatalogTransaction transaction, AlterInfo &info) } auto &postgres_transaction = GetPostgresTransaction(transaction); auto &alter = info.Cast(); - tables.AlterTable(postgres_transaction, alter); + tables.AlterTable(transaction.GetContext(), postgres_transaction, alter); } bool CatalogTypeIsSupported(CatalogType type) { @@ -175,7 +175,7 @@ void PostgresSchemaEntry::Scan(ClientContext &context, CatalogType type, return; } auto &postgres_transaction = PostgresTransaction::Get(context, catalog); - GetCatalogSet(type).Scan(postgres_transaction, callback); + GetCatalogSet(type).Scan(context, postgres_transaction, callback); } void PostgresSchemaEntry::Scan(CatalogType type, const std::function &callback) { throw NotImplementedException("Scan without context not supported"); @@ -194,7 +194,7 @@ optional_ptr PostgresSchemaEntry::LookupEntry(CatalogTransaction t return nullptr; } auto &postgres_transaction = GetPostgresTransaction(transaction); - return GetCatalogSet(catalog_type).GetEntry(postgres_transaction, lookup_info.GetEntryName()); + return GetCatalogSet(catalog_type).GetEntry(transaction.GetContext(), postgres_transaction, lookup_info.GetEntryName()); } PostgresCatalogSet &PostgresSchemaEntry::GetCatalogSet(CatalogType type) { diff --git a/src/storage/postgres_schema_set.cpp b/src/storage/postgres_schema_set.cpp index 15f6aef11..e6ef9db60 100644 --- a/src/storage/postgres_schema_set.cpp +++ b/src/storage/postgres_schema_set.cpp @@ -48,7 +48,7 @@ ORDER BY oid; return StringUtil::Replace(base_query, "${CONDITION}", condition); } -void PostgresSchemaSet::LoadEntries(PostgresTransaction &transaction) { +void PostgresSchemaSet::LoadEntries(ClientContext &context, PostgresTransaction &transaction) { auto &pg_catalog = catalog.Cast(); auto pg_version = pg_catalog.GetPostgresVersion(); string schema_query = PostgresSchemaSet::GetInitializeQuery(schema_to_load); @@ -59,7 +59,7 @@ void PostgresSchemaSet::LoadEntries(PostgresTransaction &transaction) { auto full_query = schema_query + tables_query + enum_types_query + composite_types_query + index_query; - auto results = transaction.ExecuteQueries(full_query); + auto results = transaction.ExecuteQueries(context, full_query); auto result = std::move(results[0]); results.erase(results.begin()); auto rows = result->Count(); diff --git a/src/storage/postgres_table_set.cpp b/src/storage/postgres_table_set.cpp index efe37a182..624e137f1 100644 --- a/src/storage/postgres_table_set.cpp +++ b/src/storage/postgres_table_set.cpp @@ -143,7 +143,7 @@ void PostgresTableSet::CreateEntries(PostgresTransaction &transaction, PostgresR } } -void PostgresTableSet::LoadEntries(PostgresTransaction &transaction) { +void PostgresTableSet::LoadEntries(ClientContext &context, PostgresTransaction &transaction) { if (table_result) { CreateEntries(transaction, table_result->GetResult(), table_result->start, table_result->end); table_result.reset(); @@ -173,10 +173,10 @@ unique_ptr PostgresTableSet::GetTableInfo(PostgresTransaction return table_info; } -unique_ptr PostgresTableSet::GetTableInfo(PostgresConnection &connection, const string &schema_name, +unique_ptr PostgresTableSet::GetTableInfo(ClientContext &context, PostgresConnection &connection, const string &schema_name, const string &table_name) { auto query = PostgresTableSet::GetInitializeQuery(schema_name, table_name); - auto result = connection.Query(query); + auto result = connection.Query(context, query); auto rows = result->Count(); if (rows == 0) { throw InvalidInputException("Table %s does not contain any columns.", table_name); @@ -340,20 +340,20 @@ string PostgresTableSet::GetAlterTableColumnName(const string &name, optional_pt return table.postgres_names[column_index.index]; } -string PostgresTableSet::GetAlterTablePrefix(PostgresTransaction &transaction, const string &name) { - auto entry = GetEntry(transaction, name); +string PostgresTableSet::GetAlterTablePrefix(ClientContext &context, PostgresTransaction &transaction, const string &name) { + auto entry = GetEntry(context, transaction, name); return GetAlterTablePrefix(name, entry); } -void PostgresTableSet::AlterTable(PostgresTransaction &transaction, RenameTableInfo &info) { - string sql = GetAlterTablePrefix(transaction, info.name); +void PostgresTableSet::AlterTable(ClientContext &context, PostgresTransaction &transaction, RenameTableInfo &info) { + string sql = GetAlterTablePrefix(context, transaction, info.name); sql += " RENAME TO "; sql += KeywordHelper::WriteQuoted(info.new_table_name, '"'); transaction.Query(sql); } -void PostgresTableSet::AlterTable(PostgresTransaction &transaction, RenameColumnInfo &info) { - auto entry = GetEntry(transaction, info.name); +void PostgresTableSet::AlterTable(ClientContext &context,PostgresTransaction &transaction, RenameColumnInfo &info) { + auto entry = GetEntry(context, transaction, info.name); string sql = GetAlterTablePrefix(info.name, entry); sql += " RENAME COLUMN "; string column_name = GetAlterTableColumnName(info.old_name, entry); @@ -364,8 +364,8 @@ void PostgresTableSet::AlterTable(PostgresTransaction &transaction, RenameColumn transaction.Query(sql); } -void PostgresTableSet::AlterTable(PostgresTransaction &transaction, AddColumnInfo &info) { - string sql = GetAlterTablePrefix(transaction, info.name); +void PostgresTableSet::AlterTable(ClientContext &context, PostgresTransaction &transaction, AddColumnInfo &info) { + string sql = GetAlterTablePrefix(context, transaction, info.name); sql += " ADD COLUMN "; if (info.if_column_not_exists) { sql += "IF NOT EXISTS "; @@ -376,8 +376,8 @@ void PostgresTableSet::AlterTable(PostgresTransaction &transaction, AddColumnInf transaction.Query(sql); } -void PostgresTableSet::AlterTable(PostgresTransaction &transaction, RemoveColumnInfo &info) { - auto entry = GetEntry(transaction, info.name); +void PostgresTableSet::AlterTable(ClientContext &context, PostgresTransaction &transaction, RemoveColumnInfo &info) { + auto entry = GetEntry(context, transaction, info.name); string sql = GetAlterTablePrefix(info.name, entry); sql += " DROP COLUMN "; if (info.if_column_exists) { @@ -388,19 +388,19 @@ void PostgresTableSet::AlterTable(PostgresTransaction &transaction, RemoveColumn transaction.Query(sql); } -void PostgresTableSet::AlterTable(PostgresTransaction &transaction, AlterTableInfo &alter) { +void PostgresTableSet::AlterTable(ClientContext &context, PostgresTransaction &transaction, AlterTableInfo &alter) { switch (alter.alter_table_type) { case AlterTableType::RENAME_TABLE: - AlterTable(transaction, alter.Cast()); + AlterTable(context, transaction, alter.Cast()); break; case AlterTableType::RENAME_COLUMN: - AlterTable(transaction, alter.Cast()); + AlterTable(context, transaction, alter.Cast()); break; case AlterTableType::ADD_COLUMN: - AlterTable(transaction, alter.Cast()); + AlterTable(context, transaction, alter.Cast()); break; case AlterTableType::REMOVE_COLUMN: - AlterTable(transaction, alter.Cast()); + AlterTable(context, transaction, alter.Cast()); break; default: throw BinderException("Unsupported ALTER TABLE type - Postgres tables only " diff --git a/src/storage/postgres_transaction.cpp b/src/storage/postgres_transaction.cpp index 7b198cab5..11cc301dc 100644 --- a/src/storage/postgres_transaction.cpp +++ b/src/storage/postgres_transaction.cpp @@ -16,8 +16,8 @@ PostgresTransaction::PostgresTransaction(PostgresCatalog &postgres_catalog, Tran PostgresTransaction::~PostgresTransaction() = default; -ClientContext &PostgresTransaction::GetContext() { - return *context.lock(); +optional_ptr PostgresTransaction::GetContext() { + return context.lock(); } void PostgresTransaction::Start() { @@ -26,13 +26,13 @@ void PostgresTransaction::Start() { void PostgresTransaction::Commit() { if (transaction_state == PostgresTransactionState::TRANSACTION_STARTED) { transaction_state = PostgresTransactionState::TRANSACTION_FINISHED; - GetConnectionRaw().Execute("COMMIT"); + GetConnectionRaw().Execute(GetContext(), "COMMIT"); } } void PostgresTransaction::Rollback() { if (transaction_state == PostgresTransactionState::TRANSACTION_STARTED) { transaction_state = PostgresTransactionState::TRANSACTION_FINISHED; - GetConnectionRaw().Execute("ROLLBACK"); + GetConnectionRaw().Execute(GetContext(), "ROLLBACK"); } } @@ -72,7 +72,7 @@ PostgresConnection &PostgresTransaction::GetConnection() { if (transaction_state == PostgresTransactionState::TRANSACTION_NOT_YET_STARTED) { transaction_state = PostgresTransactionState::TRANSACTION_STARTED; string query = GetBeginTransactionQuery(); - con.Execute(query); + con.Execute(GetContext(), query); } return con; } @@ -91,9 +91,9 @@ unique_ptr PostgresTransaction::Query(const string &query) { transaction_state = PostgresTransactionState::TRANSACTION_STARTED; string transaction_start = GetBeginTransactionQuery(); transaction_start += ";\n"; - return con.Query(transaction_start + query); + return con.Query(GetContext(), transaction_start + query); } - return con.Query(query); + return con.Query(GetContext(), query); } unique_ptr PostgresTransaction::QueryWithoutTransaction(const string &query) { @@ -104,18 +104,18 @@ unique_ptr PostgresTransaction::QueryWithoutTransaction(const st if (access_mode == AccessMode::READ_ONLY) { throw std::runtime_error("Execution without a Transaction is not possible in Read Only Mode"); } - return con.Query(query); + return con.Query(GetContext(), query); } -vector> PostgresTransaction::ExecuteQueries(const string &queries) { +vector> PostgresTransaction::ExecuteQueries(ClientContext& context, const string &queries) { auto &con = GetConnectionRaw(); if (transaction_state == PostgresTransactionState::TRANSACTION_NOT_YET_STARTED) { transaction_state = PostgresTransactionState::TRANSACTION_STARTED; string transaction_start = GetBeginTransactionQuery(); transaction_start += ";\n"; - return con.ExecuteQueries(transaction_start + queries); + return con.ExecuteQueries(context, transaction_start + queries); } - return con.ExecuteQueries(queries); + return con.ExecuteQueries(context, queries); } optional_ptr PostgresTransaction::ReferenceEntry(shared_ptr &entry) { diff --git a/src/storage/postgres_transaction_manager.cpp b/src/storage/postgres_transaction_manager.cpp index d546c294a..b1347ba1e 100644 --- a/src/storage/postgres_transaction_manager.cpp +++ b/src/storage/postgres_transaction_manager.cpp @@ -34,7 +34,7 @@ void PostgresTransactionManager::RollbackTransaction(Transaction &transaction) { void PostgresTransactionManager::Checkpoint(ClientContext &context, bool force) { auto &transaction = PostgresTransaction::Get(context, db.GetCatalog()); auto &db = transaction.GetConnection(); - db.Execute("CHECKPOINT"); + db.Execute(context, "CHECKPOINT"); } } // namespace duckdb diff --git a/src/storage/postgres_type_set.cpp b/src/storage/postgres_type_set.cpp index 9f32f7885..f2db80ce9 100644 --- a/src/storage/postgres_type_set.cpp +++ b/src/storage/postgres_type_set.cpp @@ -145,7 +145,7 @@ void PostgresTypeSet::InitializeCompositeTypes(PostgresTransaction &transaction, } } -void PostgresTypeSet::LoadEntries(PostgresTransaction &transaction) { +void PostgresTypeSet::LoadEntries(ClientContext &context, PostgresTransaction &transaction) { if (!enum_result || !composite_type_result) { throw InternalException("PostgresTypeSet::LoadEntries not defined without enum/composite type result"); } @@ -197,7 +197,7 @@ optional_ptr PostgresTypeSet::CreateType(PostgresTransaction &tran auto &conn = transaction.GetConnection(); auto create_sql = GetCreateTypeSQL(info); - conn.Execute(create_sql); + conn.Execute(transaction.GetContext(), create_sql); info.type.SetAlias(info.name); auto pg_type = PostgresUtils::CreateEmptyPostgresType(info.type); auto type_entry = make_shared_ptr(catalog, schema, info, pg_type); diff --git a/src/storage/postgres_update.cpp b/src/storage/postgres_update.cpp index 8c3b8a305..af167ef24 100644 --- a/src/storage/postgres_update.cpp +++ b/src/storage/postgres_update.cpp @@ -89,7 +89,7 @@ unique_ptr PostgresUpdate::GetGlobalSinkState(ClientContext &co auto &connection = transaction.GetConnection(); // create a temporary table to stream the update data into result->update_table_name = "update_data_" + UUID::ToString(UUID::GenerateRandomUUID()); - connection.Execute(CreateUpdateTable(result->update_table_name, postgres_table, columns)); + connection.Execute(context, CreateUpdateTable(result->update_table_name, postgres_table, columns)); // generate the final UPDATE sql result->update_sql = GetUpdateSQL(result->update_table_name, postgres_table, columns); // initialize the insertion chunk @@ -170,14 +170,14 @@ SinkFinalizeType PostgresUpdate::Finalize(Pipeline &pipeline, Event &event, Clie auto &connection = transaction.GetConnection(); gstate.FinishCopyTo(connection); // merge the update_info table into the actual table (i.e. perform the actual update) - connection.Execute(gstate.update_sql); + connection.Execute(context, gstate.update_sql); return SinkFinalizeType::READY; } //===--------------------------------------------------------------------===// // GetData //===--------------------------------------------------------------------===// -SourceResultType PostgresUpdate::GetData(ExecutionContext &context, DataChunk &chunk, +SourceResultType PostgresUpdate::GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { auto &insert_gstate = sink_state->Cast(); chunk.SetCardinality(1); diff --git a/test/sql/storage/attach_types.test b/test/sql/storage/attach_types.test index 687238b7b..9791a62a8 100644 --- a/test/sql/storage/attach_types.test +++ b/test/sql/storage/attach_types.test @@ -56,19 +56,19 @@ FROM test_all_types(); statement ok CREATE OR REPLACE TABLE s.all_types AS FROM all_types_tbl -query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII +query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII SELECT COLUMNS(*)::VARCHAR FROM all_types_tbl ---- -false -128 -32768 -2147483648 -9223372036854775808 0 0 0 2000-01-01 00:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 -999.9 -99999.9999 -999999999999.999999 -9999999999999999999999999999.9999999999 00000000-0000-0000-0000-000000000000 00:00:00 🦆🦆🦆🦆🦆🦆 thisisalongblob\x00withnullbytes 0010001001011100010101011010111 DUCK_DUCK_ENUM enum_0 enum_0 [] [] [] [] [] [🦆🦆🦆🦆🦆🦆, goose, NULL] -true 127 32767 2147483647 9223372036854775807 255 65535 4294967295 2000-01-01 24:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 999.9 99999.9999 999999999999.999999 9999999999999999999999999999.9999999999 ffffffff-ffff-ffff-ffff-ffffffffffff 83 years 3 months 999 days 00:16:39.999999 goo se \x00\x00\x00a 10101 GOOSE enum_299 enum_69999 [42, 999, NULL, NULL, -42] [42.0, nan, inf, -inf, NULL, -42.0] [1970-01-01, infinity, -infinity, NULL, 2022-05-12] ['1970-01-01 00:00:00', infinity, -infinity, NULL, '2022-05-12 16:23:45'] ['1970-01-01 00:00:00+00', infinity, -infinity, NULL, '2022-05-12 23:23:45+00'] [🦆🦆🦆🦆🦆🦆, goose, NULL] -NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +false -128 -32768 -2147483648 -9223372036854775808 0 0 0 2000-01-01 00:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 -999.9 -99999.9999 -999999999999.999999 -9999999999999999999999999999.9999999999 00000000-0000-0000-0000-000000000000 00:00:00 🦆🦆🦆🦆🦆🦆 thisisalongblob\x00withnullbytes 0010001001011100010101011010111 DUCK_DUCK_ENUM enum_0 enum_0 [] [] [] [] [] [🦆🦆🦆🦆🦆🦆, goose, NULL] 00:00:00 +true 127 32767 2147483647 9223372036854775807 255 65535 4294967295 2000-01-01 24:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 999.9 99999.9999 999999999999.999999 9999999999999999999999999999.9999999999 ffffffff-ffff-ffff-ffff-ffffffffffff 83 years 3 months 999 days 00:16:39.999999 goo se \x00\x00\x00a 10101 GOOSE enum_299 enum_69999 [42, 999, NULL, NULL, -42] [42.0, nan, inf, -inf, NULL, -42.0] [1970-01-01, infinity, -infinity, NULL, 2022-05-12] ['1970-01-01 00:00:00', infinity, -infinity, NULL, '2022-05-12 16:23:45'] ['1970-01-01 00:00:00+00', infinity, -infinity, NULL, '2022-05-12 23:23:45+00'] [🦆🦆🦆🦆🦆🦆, goose, NULL] 24:00:00 +NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL -query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII +query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII SELECT COLUMNS(*)::VARCHAR FROM s.all_types ---- -false -128 -32768 -2147483648 -9223372036854775808 0 0 0 2000-01-01 00:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 -999.9 -99999.9999 -999999999999.999999 -9999999999999999999999999999.9999999999 00000000-0000-0000-0000-000000000000 00:00:00 🦆🦆🦆🦆🦆🦆 thisisalongblob\x00withnullbytes 0010001001011100010101011010111 DUCK_DUCK_ENUM enum_0 enum_0 [] [] [] [] [] [🦆🦆🦆🦆🦆🦆, goose, NULL] -true 127 32767 2147483647 9223372036854775807 255 65535 4294967295 2000-01-01 24:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 999.9 99999.9999 999999999999.999999 9999999999999999999999999999.9999999999 ffffffff-ffff-ffff-ffff-ffffffffffff 83 years 3 months 999 days 00:16:39.999999 goo se \x00\x00\x00a 10101 GOOSE enum_299 enum_69999 [42, 999, NULL, NULL, -42] [42.0, nan, inf, -inf, NULL, -42.0] [1970-01-01, infinity, -infinity, NULL, 2022-05-12] ['1970-01-01 00:00:00', infinity, -infinity, NULL, '2022-05-12 16:23:45'] ['1970-01-01 00:00:00+00', infinity, -infinity, NULL, '2022-05-12 23:23:45+00'] [🦆🦆🦆🦆🦆🦆, goose, NULL] -NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +false -128 -32768 -2147483648 -9223372036854775808 0 0 0 2000-01-01 00:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 -999.9 -99999.9999 -999999999999.999999 -9999999999999999999999999999.9999999999 00000000-0000-0000-0000-000000000000 00:00:00 🦆🦆🦆🦆🦆🦆 thisisalongblob\x00withnullbytes 0010001001011100010101011010111 DUCK_DUCK_ENUM enum_0 enum_0 [] [] [] [] [] [🦆🦆🦆🦆🦆🦆, goose, NULL] 00:00:00 +true 127 32767 2147483647 9223372036854775807 255 65535 4294967295 2000-01-01 24:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 999.9 99999.9999 999999999999.999999 9999999999999999999999999999.9999999999 ffffffff-ffff-ffff-ffff-ffffffffffff 83 years 3 months 999 days 00:16:39.999999 goo se \x00\x00\x00a 10101 GOOSE enum_299 enum_69999 [42, 999, NULL, NULL, -42] [42.0, nan, inf, -inf, NULL, -42.0] [1970-01-01, infinity, -infinity, NULL, 2022-05-12] ['1970-01-01 00:00:00', infinity, -infinity, NULL, '2022-05-12 16:23:45'] ['1970-01-01 00:00:00+00', infinity, -infinity, NULL, '2022-05-12 23:23:45+00'] [🦆🦆🦆🦆🦆🦆, goose, NULL] 24:00:00 +NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL # filter pushdown foreach column_name bool tinyint smallint int bigint utinyint usmallint uint date time timestamp timestamp_s timestamp_ms timestamp_ns time_tz timestamp_tz dec_4_1 dec_9_4 dec_18_6 dec38_10 uuid interval varchar blob bit small_enum medium_enum large_enum int_array double_array date_array timestamp_array timestamptz_array varchar_array @@ -87,9 +87,9 @@ endloop statement ok SET pg_use_text_protocol=true; -query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII +query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII SELECT COLUMNS(*)::VARCHAR FROM s.all_types ---- -false -128 -32768 -2147483648 -9223372036854775808 0 0 0 2000-01-01 00:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 -999.9 -99999.9999 -999999999999.999999 -9999999999999999999999999999.9999999999 00000000-0000-0000-0000-000000000000 00:00:00 🦆🦆🦆🦆🦆🦆 thisisalongblob\x00withnullbytes 0010001001011100010101011010111 DUCK_DUCK_ENUM enum_0 enum_0 [] [] [] [] [] [🦆🦆🦆🦆🦆🦆, goose, NULL] -true 127 32767 2147483647 9223372036854775807 255 65535 4294967295 2000-01-01 24:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 999.9 99999.9999 999999999999.999999 9999999999999999999999999999.9999999999 ffffffff-ffff-ffff-ffff-ffffffffffff 83 years 3 months 999 days 00:16:39.999999 goo se \x00\x00\x00a 10101 GOOSE enum_299 enum_69999 [42, 999, NULL, NULL, -42] [42.0, nan, inf, -inf, NULL, -42.0] [1970-01-01, infinity, -infinity, NULL, 2022-05-12] ['1970-01-01 00:00:00', infinity, -infinity, NULL, '2022-05-12 16:23:45'] ['1970-01-01 00:00:00+00', infinity, -infinity, NULL, '2022-05-12 23:23:45+00'] [🦆🦆🦆🦆🦆🦆, goose, NULL] -NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +false -128 -32768 -2147483648 -9223372036854775808 0 0 0 2000-01-01 00:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 -999.9 -99999.9999 -999999999999.999999 -9999999999999999999999999999.9999999999 00000000-0000-0000-0000-000000000000 00:00:00 🦆🦆🦆🦆🦆🦆 thisisalongblob\x00withnullbytes 0010001001011100010101011010111 DUCK_DUCK_ENUM enum_0 enum_0 [] [] [] [] [] [🦆🦆🦆🦆🦆🦆, goose, NULL] 00:00:00 +true 127 32767 2147483647 9223372036854775807 255 65535 4294967295 2000-01-01 24:00:00 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 2000-01-01 01:02:03 00:00:00+15:00 2000-01-01 01:02:03 999.9 99999.9999 999999999999.999999 9999999999999999999999999999.9999999999 ffffffff-ffff-ffff-ffff-ffffffffffff 83 years 3 months 999 days 00:16:39.999999 goo se \x00\x00\x00a 10101 GOOSE enum_299 enum_69999 [42, 999, NULL, NULL, -42] [42.0, nan, inf, -inf, NULL, -42.0] [1970-01-01, infinity, -infinity, NULL, 2022-05-12] ['1970-01-01 00:00:00', infinity, -infinity, NULL, '2022-05-12 16:23:45'] ['1970-01-01 00:00:00+00', infinity, -infinity, NULL, '2022-05-12 23:23:45+00'] [🦆🦆🦆🦆🦆🦆, goose, NULL] 24:00:00 +NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL diff --git a/test/sql/storage/postgres_execute_transaction.test b/test/sql/storage/postgres_execute_transaction.test index d63887b68..1515facc1 100644 --- a/test/sql/storage/postgres_execute_transaction.test +++ b/test/sql/storage/postgres_execute_transaction.test @@ -52,3 +52,29 @@ FROM s.postgres_execute_attempt ---- 42 84 + +statement ok +CALL enable_logging('PostgresQueryLog'); + +statement ok +DESCRIBE + +query I +SELECT query FROM duckdb_logs_parsed('PostgresQueryLog'); +---- +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT +BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ +COMMIT