Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions include/arrow_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
#define ARROW_UTILS_HPP

#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/compute/registry.h>
#include <llvm/ADT/DenseSet.h>
#include <llvm/ADT/SmallVector.h>

namespace tundradb {

arrow::Result<llvm::DenseSet<int64_t>> get_ids_from_table(
const std::shared_ptr<arrow::Table>& table);

// Initialize Arrow Compute module - should be called once at startup
bool initialize_arrow_compute();

Expand Down
114 changes: 114 additions & 0 deletions include/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/result.h>
#include <llvm/ADT/DenseMap.h>
#include <llvm/ADT/DenseSet.h>
#include <llvm/ADT/SmallVector.h>
#include <llvm/ADT/StringMap.h>

#include <atomic>
#include <memory>
#include <optional>
#include <set>
#include <sstream>
#include <string>
#include <unordered_map>
#include <vector>

#include "node.hpp"
Expand Down Expand Up @@ -1012,6 +1018,114 @@ class QueryResult {
QueryExecutionStats stats_;
};

// Forward declarations
class NodeManager;
class SchemaRegistry;

/**
* @brief Query execution state container
*
* Holds all state needed during query execution including tables, node IDs,
* schema mappings, and graph connections. Optimized for performance with
* LLVM containers and object pooling.
*/
struct QueryState {
SchemaRef from;
std::unordered_map<std::string, std::shared_ptr<arrow::Table>> tables;
llvm::StringMap<llvm::DenseSet<int64_t>> ids;
std::unordered_map<std::string, std::string> aliases;

// Precomputed fully-qualified field names per alias (SchemaRef::value())
llvm::StringMap<std::vector<std::string>> fq_field_names;

// Field index optimization: replace string-based field lookups with integer
// indices
llvm::StringMap<std::vector<int>>
schema_field_indices; // "User" -> [0, 1, 2]
llvm::SmallDenseMap<int, std::string, 64>
field_id_to_name; // 0 -> "user.name"
llvm::StringMap<int> field_name_to_index; // "user.name" -> 0
std::atomic<int> next_field_id{0}; // Global field ID counter

llvm::StringMap<
llvm::DenseMap<int64_t, llvm::SmallVector<GraphConnection, 4>>>
connections; // outgoing
llvm::DenseMap<int64_t, llvm::SmallVector<GraphConnection, 4>> incoming;

std::shared_ptr<NodeManager> node_manager;
std::shared_ptr<SchemaRegistry> schema_registry;
std::vector<Traverse> traversals;

// Temporal context for time-travel queries (nullptr = current version)
std::unique_ptr<TemporalContext> temporal_context;

// Connection object pooling to avoid repeated allocations
class ConnectionPool {
private:
std::vector<GraphConnection> pool_;
size_t next_index_ = 0;

public:
explicit ConnectionPool(size_t initial_size = 1000) : pool_(initial_size) {}

GraphConnection& get() {
if (next_index_ >= pool_.size()) {
pool_.resize(pool_.size() * 2); // Grow pool if needed
}
return pool_[next_index_++];
}

void reset() { next_index_ = 0; }
size_t size() const { return next_index_; }
};

mutable ConnectionPool connection_pool_;

// Simple inline methods
[[nodiscard]] llvm::DenseSet<int64_t>& get_ids(const SchemaRef& schema_ref) {
return ids[schema_ref.value()];
}

[[nodiscard]] const llvm::DenseSet<int64_t>& get_ids(
const SchemaRef& schema_ref) const {
// For const access, use find() to avoid returning temporary from lookup()
auto it = ids.find(schema_ref.value());
if (it != ids.end()) {
return it->second;
}
// Return reference to static empty set for non-existent keys
static const llvm::DenseSet<int64_t> empty_set;
return empty_set;
}

[[nodiscard]] bool has_outgoing(const SchemaRef& schema_ref,
int64_t node_id) const {
return connections.contains(schema_ref.value()) &&
connections.at(schema_ref.value()).contains(node_id) &&
!connections.at(schema_ref.value()).at(node_id).empty();
}

// Complex methods - implemented in query.cpp
void reserve_capacity(const Query& query);

arrow::Result<std::string> register_schema(const SchemaRef& schema_ref);

arrow::Result<std::string> resolve_schema(const SchemaRef& schema_ref) const;

arrow::Result<bool> compute_fully_qualified_names(
const SchemaRef& schema_ref);

arrow::Result<bool> compute_fully_qualified_names(
const SchemaRef& schema_ref, const std::string& resolved_schema);

void remove_node(int64_t node_id, const SchemaRef& schema_ref);

arrow::Result<bool> update_table(const std::shared_ptr<arrow::Table>& table,
const SchemaRef& schema_ref);

std::string ToString() const;
};

} // namespace tundradb

#endif // QUERY_HPP
27 changes: 0 additions & 27 deletions include/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,6 @@ static arrow::Result<std::shared_ptr<arrow::Table>> filter_table_by_id(
return filtered_table.table();
}

static arrow::Result<llvm::DenseSet<int64_t>> get_ids_from_table(
std::shared_ptr<arrow::Table> table) {
log_debug("Extracting IDs from table with {} rows", table->num_rows());

auto id_idx = table->schema()->GetFieldIndex("id");
if (id_idx == -1) {
log_error("Table does not have an 'id' column");
return arrow::Status::Invalid("table does not have an 'id' column");
}

auto id_column = table->column(id_idx);
llvm::DenseSet<int64_t> result_ids;
result_ids.reserve(table->num_rows());

for (int chunk_idx = 0; chunk_idx < id_column->num_chunks(); chunk_idx++) {
auto chunk = std::static_pointer_cast<arrow::Int64Array>(
id_column->chunk(chunk_idx));
log_debug("Processing chunk {} with {} rows", chunk_idx, chunk->length());
for (int i = 0; i < chunk->length(); i++) {
result_ids.insert(chunk->Value(i));
}
}

log_debug("Extracted {} unique IDs from table", result_ids.size());
return result_ids;
}

static arrow::Result<std::shared_ptr<arrow::Table>> create_table(
const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Node>>& nodes, size_t chunk_size,
Expand Down
36 changes: 36 additions & 0 deletions src/arrow_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,47 @@
#include "../include/arrow_utils.hpp"

#include <arrow/compute/api.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/scanner.h>
#include <arrow/datum.h>
#include <arrow/table.h>
#include <llvm/ADT/DenseSet.h>
#include <llvm/ADT/SmallVector.h>
#include <llvm/ADT/StringMap.h>

#include <algorithm>

#include "../include/logger.hpp"

namespace tundradb {

arrow::Result<llvm::DenseSet<int64_t>> get_ids_from_table(
const std::shared_ptr<arrow::Table>& table) {
log_debug("Extracting IDs from table with {} rows", table->num_rows());

const auto id_idx = table->schema()->GetFieldIndex("id");
if (id_idx == -1) {
log_error("Table does not have an 'id' column");
return arrow::Status::Invalid("table does not have an 'id' column");
}

const auto id_column = table->column(id_idx);
llvm::DenseSet<int64_t> result_ids;
result_ids.reserve(table->num_rows());

for (int chunk_idx = 0; chunk_idx < id_column->num_chunks(); chunk_idx++) {
const auto chunk = std::static_pointer_cast<arrow::Int64Array>(
id_column->chunk(chunk_idx));
log_debug("Processing chunk {} with {} rows", chunk_idx, chunk->length());
for (int i = 0; i < chunk->length(); i++) {
result_ids.insert(chunk->Value(i));
}
}

log_debug("Extracted {} unique IDs from table", result_ids.size());
return result_ids;
}

// Initialize Arrow Compute module - should be called once at startup
bool initialize_arrow_compute() {
static bool initialized = false;
Expand Down
Loading