Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ add_library(
postgres_execute.cpp
postgres_extension.cpp
postgres_filter_pushdown.cpp
postgres_parameters.cpp
postgres_query.cpp
postgres_scanner.cpp
postgres_storage.cpp
Expand Down
14 changes: 10 additions & 4 deletions src/include/postgres_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "postgres_utils.hpp"
#include "postgres_parameters.hpp"
#include "postgres_result.hpp"
#include "duckdb/common/shared_ptr.hpp"

Expand Down Expand Up @@ -45,9 +46,13 @@ class PostgresConnection {

public:
static PostgresConnection Open(const string &dsn, const string &attach_path);
void Execute(optional_ptr<ClientContext> context, const string &query);
unique_ptr<PostgresResult> TryQuery(optional_ptr<ClientContext> context, const string &query, optional_ptr<string> error_message = nullptr);
unique_ptr<PostgresResult> Query(optional_ptr<ClientContext> context, const string &query);
void Execute(optional_ptr<ClientContext> context, const string &query,
const PostgresParameters &params = PostgresParameters());
unique_ptr<PostgresResult> TryQuery(optional_ptr<ClientContext> context, const string &query,
optional_ptr<string> error_message = nullptr,
const PostgresParameters &params = PostgresParameters());
unique_ptr<PostgresResult> Query(optional_ptr<ClientContext> context, const string &query,
const PostgresParameters &params = PostgresParameters());

//! Submits a set of queries to be executed in the connection.
vector<unique_ptr<PostgresResult>> ExecuteQueries(ClientContext &context, const string &queries);
Expand Down Expand Up @@ -87,7 +92,8 @@ class PostgresConnection {
static bool DebugPrintQueries();

private:
PGresult *PQExecute(optional_ptr<ClientContext> context, const string &query);
PGresult *PQExecute(optional_ptr<ClientContext> context, const string &query,
const PostgresParameters &params = PostgresParameters());

shared_ptr<OwnedPostgresConnection> connection;
string dsn;
Expand Down
2 changes: 1 addition & 1 deletion src/include/postgres_logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ class PostgresQueryLogType : public LogType {
static LogicalType GetLogType();
};

} // namespace
} // namespace duckdb
56 changes: 56 additions & 0 deletions src/include/postgres_parameters.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// postgres_parameters.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb.hpp"
#include <libpq-fe.h>
#include "postgres_version.hpp"

namespace duckdb {

class PostgresParameters {
vector<Oid> types;
vector<Value> values;
vector<vector<char>> copied_values;
vector<const char *> value_ptrs;
vector<int> lengths;
vector<int> formats;

public:
PostgresParameters() {
}

PostgresParameters(vector<Oid> types_p, vector<Value> values_p);

bool Empty() const {
return types.empty();
}

int Count() const {
return static_cast<int>(types.size());
}

const Oid *Types() const {
return types.data();
}

const char *const *Values() const {
return value_ptrs.data();
}

const int *Lengths() const {
return lengths.data();
}

const int *Formats() const {
return formats.data();
}
};

} // namespace duckdb
2 changes: 2 additions & 0 deletions src/include/postgres_scanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "duckdb.hpp"
#include "postgres_utils.hpp"
#include "postgres_connection.hpp"
#include "postgres_parameters.hpp"
#include "storage/postgres_connection_pool.hpp"

namespace duckdb {
Expand All @@ -29,6 +30,7 @@ struct PostgresBindData : public FunctionData {
string schema_name;
string table_name;
string sql;
PostgresParameters params;
string limit;
idx_t pages_approx = 0;

Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/postgres_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class PostgresSchemaEntry;
class PostgresCatalog : public Catalog {
public:
explicit PostgresCatalog(AttachedDatabase &db_p, string connection_string, string attach_path,
AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level, ClientContext &context);
AccessMode access_mode, string schema_to_load, PostgresIsolationLevel isolation_level,
ClientContext &context);
~PostgresCatalog();

string connection_string;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/postgres_catalog_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class PostgresCatalogSet {

optional_ptr<CatalogEntry> GetEntry(ClientContext &context, PostgresTransaction &transaction, const string &name);
void DropEntry(PostgresTransaction &transaction, DropInfo &info);
void Scan(ClientContext& context, PostgresTransaction &transaction, const std::function<void(CatalogEntry &)> &callback);
void Scan(ClientContext &context, PostgresTransaction &transaction,
const std::function<void(CatalogEntry &)> &callback);
virtual optional_ptr<CatalogEntry> CreateEntry(PostgresTransaction &transaction, shared_ptr<CatalogEntry> entry);
void ClearEntries();
virtual bool SupportReload() const {
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/postgres_delete.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class PostgresDelete : public PhysicalOperator {

public:
// Source interface
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/postgres_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class PostgresCreateIndex : public PhysicalOperator {

public:
// Source interface
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/postgres_insert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class PostgresInsert : public PhysicalOperator {

public:
// Source interface
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/postgres_table_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class PostgresTableSet : public PostgresInSchemaSet {

static unique_ptr<PostgresTableInfo> GetTableInfo(PostgresTransaction &transaction, PostgresSchemaEntry &schema,
const string &table_name);
static unique_ptr<PostgresTableInfo> GetTableInfo(ClientContext &context, PostgresConnection &connection, const string &schema_name,
const string &table_name);
static unique_ptr<PostgresTableInfo> GetTableInfo(ClientContext &context, PostgresConnection &connection,
const string &schema_name, const string &table_name);
optional_ptr<CatalogEntry> ReloadEntry(PostgresTransaction &transaction, const string &table_name) override;

void AlterTable(ClientContext &context, PostgresTransaction &transaction, AlterTableInfo &info);
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/postgres_update.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class PostgresUpdate : public PhysicalOperator {

public:
// Source interface
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const override;
SourceResultType GetDataInternal(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const override;

bool IsSource() const override {
return true;
Expand Down
58 changes: 38 additions & 20 deletions src/postgres_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,42 @@ static bool ResultHasError(PGresult *result) {
}
}

PGresult *PostgresConnection::PQExecute(optional_ptr<ClientContext> context, const string &query) {
PGresult *PostgresConnection::PQExecute(optional_ptr<ClientContext> context, const string &query,
const PostgresParameters &params) {
if (PostgresConnection::DebugPrintQueries()) {
Printer::Print(query + "\n");
}
int64_t start_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();
auto res = PQexec(GetConn(), query.c_str());
int64_t end_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();
int64_t start_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();

PGconn *conn = GetConn();
PGresult *res = nullptr;

if (params.Empty()) {
res = PQexec(GetConn(), query.c_str());
} else {
// Unlike PQexec, PQexecParams allows at most one SQL command in the given string.
int format = 0; // text format
res = PQexecParams(conn, query.c_str(), params.Count(), params.Types(), params.Values(), params.Lengths(),
params.Formats(), format);
}

int64_t end_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();
if (context) {
DUCKDB_LOG(*context, PostgresQueryLogType, query, end_time - start_time);
}
return res;
}

unique_ptr<PostgresResult> PostgresConnection::TryQuery(optional_ptr<ClientContext> context, const string &query, optional_ptr<string> error_message) {
unique_ptr<PostgresResult> PostgresConnection::TryQuery(optional_ptr<ClientContext> context, const string &query,
optional_ptr<string> error_message,
const PostgresParameters &params) {
lock_guard<mutex> guard(connection->connection_lock);
auto result = PQExecute(context, query.c_str());
auto result = PQExecute(context, query.c_str(), params);

if (ResultHasError(result)) {
if (error_message) {
*error_message = StringUtil::Format("Failed to execute query \"" + query +
Expand All @@ -92,26 +108,28 @@ unique_ptr<PostgresResult> PostgresConnection::TryQuery(optional_ptr<ClientConte
return make_uniq<PostgresResult>(result);
}

unique_ptr<PostgresResult> PostgresConnection::Query(optional_ptr<ClientContext> context, const string &query) {
unique_ptr<PostgresResult> PostgresConnection::Query(optional_ptr<ClientContext> context, const string &query,
const PostgresParameters &params) {
string error_msg;
auto result = TryQuery(context, query, &error_msg);
auto result = TryQuery(context, query, &error_msg, params);
if (!result) {
throw std::runtime_error(error_msg);
}
return result;
}

void PostgresConnection::Execute(optional_ptr<ClientContext> context, const string &query) {
Query(context, query);
void PostgresConnection::Execute(optional_ptr<ClientContext> context, const string &query,
const PostgresParameters &params) {
Query(context, query, params);
}

vector<unique_ptr<PostgresResult>> PostgresConnection::ExecuteQueries(ClientContext &context, const string &queries) {
if (PostgresConnection::DebugPrintQueries()) {
Printer::Print(queries + "\n");
}
int64_t start_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();
int64_t start_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();
auto res = PQsendQuery(GetConn(), queries.c_str());
if (res == 0) {
throw std::runtime_error("Failed to execute query \"" + queries + "\": " + string(PQerrorMessage(GetConn())));
Expand All @@ -132,9 +150,9 @@ vector<unique_ptr<PostgresResult>> PostgresConnection::ExecuteQueries(ClientCont
}
results.push_back(std::move(result));
}
int64_t end_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();
int64_t end_time = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now())
.time_since_epoch()
.count();
DUCKDB_LOG(context, PostgresQueryLogType, queries, end_time - start_time);
return results;
}
Expand Down
4 changes: 2 additions & 2 deletions src/postgres_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ static void LoadInternal(ExtensionLoader &loader) {
connection->registered_state->Insert("postgres_extension", make_shared_ptr<PostgresExtensionState>());
}

auto &instance = loader.GetDatabaseInstance();
auto &log_manager = instance.GetLogManager();
auto &instance = loader.GetDatabaseInstance();
auto &log_manager = instance.GetLogManager();
log_manager.RegisterLogType(make_uniq<PostgresQueryLogType>());
}

Expand Down
22 changes: 11 additions & 11 deletions src/postgres_logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ constexpr LogLevel PostgresQueryLogType::LEVEL;
// PostgresQueryLogType
//===--------------------------------------------------------------------===//
string PostgresQueryLogType::ConstructLogMessage(const string &str, int64_t duration) {
child_list_t<Value> child_list = {
{"query", str},
{"duration_ms", duration},
};
child_list_t<Value> child_list = {
{"query", str},
{"duration_ms", duration},
};

return Value::STRUCT(std::move(child_list)).ToString();
return Value::STRUCT(std::move(child_list)).ToString();
}

LogicalType PostgresQueryLogType::GetLogType() {
child_list_t<LogicalType> child_list = {
{"query", LogicalType::VARCHAR},
{"duration_ms", LogicalType::BIGINT},
};
return LogicalType::STRUCT(child_list);
child_list_t<LogicalType> child_list = {
{"query", LogicalType::VARCHAR},
{"duration_ms", LogicalType::BIGINT},
};
return LogicalType::STRUCT(child_list);
}

} // namespace
} // namespace duckdb
Loading
Loading