Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions include/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,39 +528,48 @@ class ShardManager {
" not found in schema '", schema_name, "'");
}

arrow::Result<bool> update_node(const int64_t id,
arrow::Result<bool> update_node(const std::string &schema_name,
const int64_t id,
const std::shared_ptr<Field> &field,
const Value &value,
const UpdateType update_type) {
for (auto &schema_shards : shards_ | std::views::values) {
for (const auto &shard : schema_shards) {
if (id >= shard->min_id && id <= shard->max_id) {
return shard->update(id, field, value, update_type);
}
auto schema_it = shards_.find(schema_name);
if (schema_it == shards_.end()) {
return arrow::Status::KeyError("Schema not found: ", schema_name);
}

for (const auto &shard : schema_it->second) {
if (id >= shard->min_id && id <= shard->max_id) {
return shard->update(id, field, value, update_type);
}
}

return arrow::Status::KeyError("Node with id ", id,
" not found in any schema");
return arrow::Status::KeyError("Node with id ", id, " not found in schema ",
schema_name);
}

arrow::Result<bool> update_node(const int64_t id,
arrow::Result<bool> update_node(const std::string &schema_name,
const int64_t id,
const std::string &field_name,
const Value &value,
const UpdateType update_type) {
for (auto &schema_shards : shards_ | std::views::values) {
for (const auto &shard : schema_shards) {
auto field = schema_registry_->get(shard->schema_name)
.ValueOrDie()
->get_field(field_name);
if (id >= shard->min_id && id <= shard->max_id) {
return shard->update(id, field, value, update_type);
}
auto schema_it = shards_.find(schema_name);
if (schema_it == shards_.end()) {
return arrow::Status::KeyError("Schema not found: ", schema_name,
" in shards");
}

auto field =
schema_registry_->get(schema_name).ValueOrDie()->get_field(field_name);

for (const auto &shard : schema_it->second) {
if (id >= shard->min_id && id <= shard->max_id) {
return shard->update(id, field, value, update_type);
}
}

return arrow::Status::KeyError("Node with id ", id,
" not found in any schema");
return arrow::Status::KeyError("Node with id ", id, " not found in schema ",
schema_name);
}

arrow::Result<std::vector<std::shared_ptr<Node>>> get_nodes(
Expand Down Expand Up @@ -758,24 +767,29 @@ class Database {
return node;
}

arrow::Result<bool> update_node(const int64_t id,
arrow::Result<bool> update_node(const std::string &schema_name,
const int64_t id,
const std::shared_ptr<Field> &field,
const Value &value,
const UpdateType update_type) {
return shard_manager_->update_node(id, field, value, update_type);
return shard_manager_->update_node(schema_name, id, field, value,
update_type);
}

arrow::Result<bool> update_node(const int64_t id,
arrow::Result<bool> update_node(const std::string &schema_name,
const int64_t id,
const std::string &field_name,
const Value &value,
const UpdateType update_type) {
return shard_manager_->update_node(id, field_name, value, update_type);
return shard_manager_->update_node(schema_name, id, field_name, value,
update_type);
}

arrow::Result<bool> remove_node(const std::string &schema_name,
int64_t node_id) {
if (auto res = node_manager_->remove_node(node_id); !res) {
return arrow::Status::Invalid("Failed to remove node: {}", node_id);
if (auto res = node_manager_->remove_node(schema_name, node_id); !res) {
return arrow::Status::Invalid("Failed to remove node: ", schema_name, ":",
node_id);
}
return shard_manager_->remove_node(schema_name, node_id);
}
Expand Down Expand Up @@ -810,7 +824,8 @@ class Database {
const std::string &schema_name,
TemporalContext *temporal_context = nullptr,
size_t chunk_size = 10000) const {
ARROW_ASSIGN_OR_RAISE(auto schema, schema_registry_->get(schema_name));
ARROW_ASSIGN_OR_RAISE(const auto schema,
schema_registry_->get(schema_name));
auto arrow_schema = schema->arrow();
ARROW_ASSIGN_OR_RAISE(auto all_nodes,
shard_manager_->get_nodes(schema_name));
Expand Down
18 changes: 13 additions & 5 deletions include/metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,14 @@ struct Manifest {
std::string id;
std::vector<ShardMetadata> shards;
std::vector<EdgeMetadata> edges;
int64_t node_id_seq = 0;
std::unordered_map<std::string, int64_t>
node_id_seq_per_schema; // Per-schema ID counters
int64_t edge_id_seq = 0;
int64_t shard_id_seq = 0;

NLOHMANN_DEFINE_TYPE_INTRUSIVE(Manifest, id, shards, edges, node_id_seq,
edge_id_seq, shard_id_seq);
NLOHMANN_DEFINE_TYPE_INTRUSIVE(Manifest, id, shards, edges,
node_id_seq_per_schema, edge_id_seq,
shard_id_seq);

std::string toString() const {
std::stringstream ss;
Expand All @@ -333,8 +335,14 @@ struct Manifest {
<< "}";
if (i < edges.size() - 1) ss << ", ";
}
ss << "], node_id_seq=" << node_id_seq << ", edge_id_seq=" << edge_id_seq
<< ", shard_id_seq=" << shard_id_seq << "}";
ss << "], node_id_seq_per_schema={";
size_t idx = 0;
for (const auto &[schema_name, counter] : node_id_seq_per_schema) {
ss << "'" << schema_name << "':" << counter;
if (idx++ < node_id_seq_per_schema.size() - 1) ss << ", ";
}
ss << "}, edge_id_seq=" << edge_id_seq << ", shard_id_seq=" << shard_id_seq
<< "}";
return ss.str();
}

Expand Down
73 changes: 63 additions & 10 deletions include/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,28 @@ class NodeManager {

~NodeManager() { node_arena_->clear(); }

arrow::Result<std::shared_ptr<Node>> get_node(const int64_t id) {
return nodes[id];
arrow::Result<std::shared_ptr<Node>> get_node(const std::string &schema_name,
const int64_t id) {
auto schema_it = nodes_.find(schema_name);
if (schema_it == nodes_.end()) {
return arrow::Status::KeyError("Schema not found: ", schema_name);
}

auto node_it = schema_it->second.find(id);
if (node_it == schema_it->second.end()) {
return arrow::Status::KeyError("Node not found: ", schema_name, ":", id);
}

return node_it->second;
}

bool remove_node(const int64_t id) { return nodes.erase(id) > 0; }
bool remove_node(const std::string &schema_name, const int64_t id) {
auto schema_it = nodes_.find(schema_name);
if (schema_it == nodes_.end()) {
return false;
}
return schema_it->second.erase(id) > 0;
}

arrow::Result<std::shared_ptr<Node>> create_node(
const std::string &schema_name,
Expand Down Expand Up @@ -216,7 +233,11 @@ class NodeManager {

int64_t id = 0;
if (!add) {
id = id_counter.fetch_add(1);
// Get or create per-schema ID counter
if (id_counters_.find(schema_name) == id_counters_.end()) {
id_counters_[schema_name].store(0);
}
id = id_counters_[schema_name].fetch_add(1);
} else {
id = data.at("id").as_int64();
}
Expand Down Expand Up @@ -249,7 +270,7 @@ class NodeManager {
id, schema_name, EMPTY_DATA,
std::make_unique<NodeHandle>(std::move(node_handle)), node_arena_,
schema_, layout_);
nodes[id] = node;
nodes_[schema_name][id] = node;
return node;
} else {
std::unordered_map<std::string, Value> normalized_data;
Expand All @@ -270,17 +291,49 @@ class NodeManager {
auto node = std::make_shared<Node>(id, schema_name, normalized_data,
std::unique_ptr<NodeHandle>{}, nullptr,
schema_);
nodes[id] = node;
nodes_[schema_name][id] = node;
return node;
}
}

void set_id_counter(const int64_t value) { id_counter.store(value); }
int64_t get_id_counter() const { return id_counter.load(); }
void set_id_counter(const std::string &schema_name, const int64_t value) {
id_counters_[schema_name].store(value);
}

int64_t get_id_counter(const std::string &schema_name) const {
auto it = id_counters_.find(schema_name);
if (it == id_counters_.end()) {
return 0;
}
return it->second.load();
}

// Get all schema ID counters (for snapshot/manifest)
std::unordered_map<std::string, int64_t> get_all_id_counters() const {
std::unordered_map<std::string, int64_t> result;
for (const auto &[schema_name, counter] : id_counters_) {
result[schema_name] = counter.load();
}
return result;
}

// Set all schema ID counters (for snapshot/manifest restore)
void set_all_id_counters(
const std::unordered_map<std::string, int64_t> &counters) {
for (const auto &[schema_name, value] : counters) {
id_counters_[schema_name].store(value);
}
}

private:
std::atomic<int64_t> id_counter{0};
std::unordered_map<int64_t, std::shared_ptr<Node>> nodes;
// Per-schema ID counters (schema_name -> counter)
std::unordered_map<std::string, std::atomic<int64_t>> id_counters_;

// Per-schema node storage (schema_name -> (node_id -> Node))
std::unordered_map<std::string,
std::unordered_map<int64_t, std::shared_ptr<Node>>>
nodes_;

std::shared_ptr<SchemaRegistry> schema_registry_;
std::shared_ptr<LayoutRegistry> layout_registry_;
std::shared_ptr<NodeArena> node_arena_;
Expand Down
55 changes: 42 additions & 13 deletions src/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,6 @@ struct QueryState {
for (const auto& f : schema->fields()) {
std::string fq_name = alias + "." + f->name();
int field_id = next_field_id.fetch_add(1);

names.emplace_back(fq_name);
indices.emplace_back(field_id);
field_id_to_name[field_id] = fq_name;
Expand Down Expand Up @@ -1312,7 +1311,11 @@ populate_rows_bfs(int64_t node_id, const SchemaRef& start_schema,
while (size-- > 0) {
auto item = queue.front();
queue.pop();
auto node = query_state.node_manager->get_node(item.node_id).ValueOrDie();
auto item_schema = item.schema_ref.is_declaration()
? item.schema_ref.schema()
: query_state.aliases.at(item.schema_ref.value());
auto node = query_state.node_manager->get_node(item_schema, item.node_id)
.ValueOrDie();
const auto& it_fq =
query_state.schema_field_indices.find(item.schema_ref.value());
if (it_fq == query_state.schema_field_indices.end()) {
Expand Down Expand Up @@ -2170,7 +2173,8 @@ arrow::Result<std::shared_ptr<QueryResult>> Database::query(
.contains(target_id)) {
continue;
}
auto node_result = node_manager_->get_node(target_id);
auto node_result =
node_manager_->get_node(target_schema, target_id);
if (node_result.ok()) {
const auto target_node = node_result.ValueOrDie();
if (target_node->schema_name == target_schema) {
Expand Down Expand Up @@ -2275,27 +2279,52 @@ arrow::Result<std::shared_ptr<QueryResult>> Database::query(
} else if (traverse->traverse_type() == TraverseType::Left) {
query_state.ids[traverse->target().value()].insert(
matched_target_ids.begin(), matched_target_ids.end());
} else { // Right, Full remove nodes with incoming connections
} else { // Right, Full: matched targets + unmatched targets
auto target_ids =
get_ids_from_table(
get_table(target_schema, query_state.temporal_context.get())
.ValueOrDie())
.ValueOrDie();
IF_DEBUG_ENABLED {
log_debug(
"traverse type: '{}', matched_source_ids=[{}], "
"target_ids=[{}]",
traverse->target().value(), join_container(matched_source_ids),
join_container(target_ids));
}

llvm::DenseSet<int64_t> result;
dense_difference(target_ids, matched_source_ids, result);

// Check if this is a self-join (same schema for source and target)
if (source_schema == target_schema) {
// Self-join: Exclude nodes that were sources with matches
// (prevents same node appearing as both source and unmatched
// target)
dense_difference(target_ids, matched_source_ids, result);
IF_DEBUG_ENABLED {
log_debug(
"traverse type: '{}' (Right/Full, self-join), "
"matched_source_ids=[{}], unmatched_targets=[{}], total={}",
traverse->target().value(),
join_container(matched_source_ids), join_container(result),
result.size());
}
} else {
// Cross-schema join: Include matched targets + unmatched targets
// (compare IDs within same schema to avoid ID collision)
result = matched_target_ids;
llvm::DenseSet<int64_t> unmatched_targets;
dense_difference(target_ids, matched_target_ids, unmatched_targets);
result.insert(unmatched_targets.begin(), unmatched_targets.end());
IF_DEBUG_ENABLED {
log_debug(
"traverse type: '{}' (Right/Full, cross-schema), "
"matched_targets=[{}], unmatched_targets=[{}], total={}",
traverse->target().value(),
join_container(matched_target_ids),
join_container(unmatched_targets), result.size());
}
}

query_state.ids[traverse->target().value()] = result;
}

std::vector<std::shared_ptr<Node>> neighbors;
for (auto id : query_state.ids[traverse->target().value()]) {
auto node_res = node_manager_->get_node(id);
auto node_res = node_manager_->get_node(target_schema, id);
if (node_res.ok()) {
neighbors.push_back(node_res.ValueOrDie());
}
Expand Down
18 changes: 15 additions & 3 deletions src/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ arrow::Result<bool> SnapshotManager::initialize() {
log_info(this->manifest_->toString());

edge_store_->set_id_seq(manifest_->edge_id_seq);
node_manager_->set_id_counter(manifest_->node_id_seq);
node_manager_->set_all_id_counters(manifest_->node_id_seq_per_schema);
shard_manager_->set_id_counter(manifest_->shard_id_seq);

// Restore index counters for each schema
Expand Down Expand Up @@ -197,12 +197,24 @@ arrow::Result<Snapshot> SnapshotManager::commit() {
Manifest new_manifest;
new_manifest.id = generate_uuid();
new_manifest.edge_id_seq = edge_store_->get_edge_id_counter();
new_manifest.node_id_seq = node_manager_->get_id_counter();
new_manifest.node_id_seq_per_schema = node_manager_->get_all_id_counters();
new_manifest.shard_id_seq = shard_manager_->get_id_counter();

std::stringstream node_counters_str;
node_counters_str << "{";
size_t idx = 0;
for (const auto &[schema_name, counter] :
new_manifest.node_id_seq_per_schema) {
node_counters_str << schema_name << ":" << counter;
if (idx++ < new_manifest.node_id_seq_per_schema.size() - 1) {
node_counters_str << ", ";
}
}
node_counters_str << "}";

log_info("Saving counters: edge_id_seq=" +
std::to_string(new_manifest.edge_id_seq) +
", node_id_seq=" + std::to_string(new_manifest.node_id_seq) +
", node_id_seq_per_schema=" + node_counters_str.str() +
", shard_id_seq=" + std::to_string(new_manifest.shard_id_seq));

for (const auto &edge_type : edge_store_->get_edge_types()) {
Expand Down
Loading