Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 112 additions & 3 deletions lib/src/crdt_executor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,30 @@ class CrdtExecutor extends CrdtWriteExecutor implements CrdtApi {
CrdtExecutor(ReadWriteApi super._db, super.hlc);

@override
Future<List<Map<String, Object?>>> query(String sql, [List<Object?>? args]) =>
(_db as ReadWriteApi)
.query(SqlUtil.transformAutomaticExplicitSql(sql), args);
Future<List<Map<String, Object?>>> query(String sql,
[List<Object?>? args]) async {
// Handle INSERT...RETURNING statements by processing them through CRDT transformation
if (SqlUtil.isInsertWithReturning(sql)) {
final statements = (_sqlEngine.parseMultiple(sql).rootNode
as SemicolonSeparatedStatements)
.statements;
assert(statements.length == 1,
'This package does not support compound statements:\n$sql');

final statement = statements.first as InsertStatement;
SqlUtil.transformAutomaticExplicit(statement);

// Transform the INSERT statement to add CRDT metadata
final result = await _insertForQuery(statement, args);
affectedTables.add(result.tableName);

// Return the results from the transformed query
return result.queryResult;
}

return (_db as ReadWriteApi)
.query(SqlUtil.transformAutomaticExplicitSql(sql), args);
}
}

class CrdtWriteExecutor extends _CrdtTableExecutor {
Expand Down Expand Up @@ -237,6 +258,94 @@ class CrdtWriteExecutor extends _CrdtTableExecutor {
return newStatement.table.tableName;
}

Future<({String tableName, List<Map<String, Object?>> queryResult})>
_insertForQuery(InsertStatement statement, List<Object?>? args) async {
// Force explicit column description in insert statements
assert(statement.targetColumns.isNotEmpty,
'Unsupported statement: target columns must be explicitly stated.\n${statement.toSql()}');

// Disallow star select statements
assert(
statement.source is! SelectInsertSource ||
((statement.source as SelectInsertSource).stmt as SelectStatement)
.columns
.whereType<StarResultColumn>()
.isEmpty,
'Unsupported statement: select columns must be explicitly stated.\n${statement.toSql()}');

final argCount = args?.length ?? 0;
final source = switch (statement.source) {
ValuesSource s => ValuesSource([
Tuple(expressions: [
...s.values.first.expressions,
NumberedVariable(argCount + 1),
NumberedVariable(argCount + 2),
NumberedVariable(argCount + 3),
])
]),
SelectInsertSource s => SelectInsertSource(SelectStatement(
withClause: (s.stmt as SelectStatement).withClause,
distinct: (s.stmt as SelectStatement).distinct,
columns: [
...(s.stmt as SelectStatement).columns,
ExpressionResultColumn(expression: NumberedVariable(argCount + 1)),
ExpressionResultColumn(expression: NumberedVariable(argCount + 2)),
ExpressionResultColumn(expression: NumberedVariable(argCount + 3)),
],
from: (s.stmt as SelectStatement).from,
where: (s.stmt as SelectStatement).where,
groupBy: (s.stmt as SelectStatement).groupBy,
windowDeclarations: (s.stmt as SelectStatement).windowDeclarations,
orderBy: (s.stmt as SelectStatement).orderBy,
limit: (s.stmt as SelectStatement).limit,
)),
_ => throw UnimplementedError(
'Unsupported data source: ${statement.source.runtimeType}, please file an issue in the sql_crdt project.')
};

final newStatement = InsertStatement(
mode: statement.mode,
upsert: statement.upsert,
returning: statement.returning,
withClause: statement.withClause,
table: statement.table,
targetColumns: [
...statement.targetColumns,
Reference(columnName: 'hlc'),
Reference(columnName: 'node_id'),
Reference(columnName: 'modified'),
],
source: source,
);

// Touch
if (statement.upsert is UpsertClause) {
final action = statement.upsert!.entries.first.action;
if (action is DoUpdate) {
action.set.addAll([
SingleColumnSetComponent(
column: Reference(columnName: 'hlc'),
expression: NumberedVariable(argCount + 1),
),
SingleColumnSetComponent(
column: Reference(columnName: 'node_id'),
expression: NumberedVariable(argCount + 2),
),
SingleColumnSetComponent(
column: Reference(columnName: 'modified'),
expression: NumberedVariable(argCount + 3),
),
]);
}
}

final newArgs = [...args ?? [], _hlcString, hlc.nodeId, _hlcString];
final result =
await (_db as ReadWriteApi).query(newStatement.toSql(), newArgs);

return (tableName: newStatement.table.tableName, queryResult: result);
}

Future<String> _update(UpdateStatement statement, List<Object?>? args) async {
final argCount = args?.length ?? 0;
final newStatement = UpdateStatement(
Expand Down
16 changes: 16 additions & 0 deletions lib/src/sql_util.dart
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,20 @@ class SqlUtil {

static BinaryExpression _joinClauses(Expression left, Expression right) =>
BinaryExpression(left, Token(TokenType.and, _span), right);

/// Checks if a SQL statement is an INSERT with RETURNING clause
static bool isInsertWithReturning(String sql) {
// Quick string-based pre-check to avoid parsing non-INSERT statements
final trimmed = sql.trim().toLowerCase();
if (!trimmed.startsWith('insert')) return false;
if (!trimmed.contains('returning')) return false;

// Only parse if it looks like INSERT...RETURNING
try {
final statement = _sqlEngine.parse(sql).rootNode as Statement;
return statement is InsertStatement && statement.returning != null;
} catch (_) {
return false;
}
}
}
113 changes: 113 additions & 0 deletions test/crdt_executor_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import 'package:sql_crdt/sql_crdt.dart';
import 'package:test/test.dart';

class MockDatabase implements ReadWriteApi {
final List<String> executedSql = [];
final List<List<Object?>?> executedArgs = [];
final List<String> queriedSql = [];
final List<List<Object?>?> queriedArgs = [];

List<Map<String, Object?>> _queryResult = [];

void setQueryResult(List<Map<String, Object?>> result) {
_queryResult = result;
}

@override
Future<void> execute(String sql, [List<Object?>? args]) async {
executedSql.add(sql);
executedArgs.add(args);
}

@override
Future<List<Map<String, Object?>>> query(String sql,
[List<Object?>? args]) async {
queriedSql.add(sql);
queriedArgs.add(args);
return _queryResult;
}

void reset() {
executedSql.clear();
executedArgs.clear();
queriedSql.clear();
queriedArgs.clear();
_queryResult = [];
}
}

void main() {
group('CrdtExecutor INSERT INTO...SELECT tests', () {
late MockDatabase mockDb;
late CrdtExecutor executor;
late Hlc hlc;

setUp(() {
mockDb = MockDatabase();
hlc = Hlc.now('test-node');
executor = CrdtExecutor(mockDb, hlc);
});

test('INSERT INTO...SELECT without RETURNING uses execute', () async {
await executor.execute(
'INSERT INTO target (name, value) SELECT name, value FROM source WHERE active = 1');

expect(mockDb.executedSql.length, 1);
expect(mockDb.queriedSql.length, 0);

// Should transform to add CRDT columns
final executedSql = mockDb.executedSql.first;
expect(executedSql, contains('hlc'));
expect(executedSql, contains('node_id'));
expect(executedSql, contains('modified'));
});

test('INSERT INTO...SELECT with RETURNING should use query', () async {
// Set up mock result
mockDb.setQueryResult([
{'id': 1, 'name': 'test', 'hlc': hlc.toString()}
]);

final result = await executor.query(
'INSERT INTO target (name, value) SELECT name, value FROM source WHERE active = 1 RETURNING id, name');

expect(mockDb.queriedSql.length, 1);
expect(mockDb.executedSql.length, 0);
expect(result.length, 1);
expect(result.first['name'], 'test');

// Should transform to add CRDT columns
final queriedSql = mockDb.queriedSql.first;
expect(queriedSql, contains('hlc'));
expect(queriedSql, contains('node_id'));
expect(queriedSql, contains('modified'));
});

test('Regular INSERT with RETURNING should use query', () async {
mockDb.setQueryResult([
{'id': 1, 'name': 'test'}
]);

final result = await executor.query(
'INSERT INTO users (name, age) VALUES (?, ?) RETURNING id, name',
['John', 25]);

expect(mockDb.queriedSql.length, 1);
expect(mockDb.executedSql.length, 0);
expect(result.length, 1);
});

test('Regular SELECT continues to use query', () async {
mockDb.setQueryResult([
{'name': 'test', 'age': 30}
]);

final result =
await executor.query('SELECT name, age FROM users WHERE active = 1');

expect(mockDb.queriedSql.length, 1);
expect(mockDb.executedSql.length, 0);
expect(result.length, 1);
});
});
}
62 changes: 62 additions & 0 deletions test/sql_util_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,66 @@ void main() {
expect(transformed, 'UPDATE users SET name = ?1, age = ?2 WHERE id = ?3');
});
});

group('INSERT with RETURNING detection', () {
test('Regular INSERT without RETURNING', () {
final sql = 'INSERT INTO users (name, age) VALUES (?, ?)';
expect(SqlUtil.isInsertWithReturning(sql), false);
});

test('INSERT with RETURNING', () {
final sql =
'INSERT INTO users (name, age) VALUES (?, ?) RETURNING id, name';
expect(SqlUtil.isInsertWithReturning(sql), true);
});

test('INSERT INTO...SELECT without RETURNING', () {
final sql = 'INSERT INTO target (name) SELECT name FROM source';
expect(SqlUtil.isInsertWithReturning(sql), false);
});

test('INSERT INTO...SELECT with RETURNING', () {
final sql =
'INSERT INTO target (name) SELECT name FROM source RETURNING id, name';
expect(SqlUtil.isInsertWithReturning(sql), true);
});

test('Regular SELECT statement', () {
final sql = 'SELECT name, age FROM users WHERE active = 1';
expect(SqlUtil.isInsertWithReturning(sql), false);
});

test('Invalid SQL returns false', () {
final sql = 'INVALID SQL STATEMENT';
expect(SqlUtil.isInsertWithReturning(sql), false);
});

test('Performance test - SELECT statements avoid parsing', () {
// This test verifies that SELECT statements are quickly rejected
// without expensive parsing. We can't easily measure time in tests,
// but we can verify behavior doesn't change.
final selectQueries = [
'SELECT * FROM users',
'SELECT id, name FROM users WHERE active = 1',
'SELECT COUNT(*) FROM products',
'SELECT u.name, p.title FROM users u JOIN posts p ON u.id = p.user_id',
];

for (final sql in selectQueries) {
expect(SqlUtil.isInsertWithReturning(sql), false);
}
});

test('Edge cases with RETURNING in different contexts', () {
// RETURNING in comments or strings should not trigger false positives
expect(SqlUtil.isInsertWithReturning('SELECT * FROM returning_table'),
false);
expect(SqlUtil.isInsertWithReturning('SELECT "returning" FROM users'),
false);
expect(
SqlUtil.isInsertWithReturning(
'UPDATE users SET name = ? /* returning comment */'),
false);
});
});
}
Loading