From dc4fd88e2636340149d7b384b3420545dd7a0f1a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 17 Feb 2026 14:04:34 +0100 Subject: [PATCH] Raw table improvements --- .../lib/src/attachments_queue_table.dart | 3 +- .../lib/src/attachments/attachment.dart | 38 ++-- packages/powersync_core/lib/src/schema.dart | 207 +++++++++++++----- packages/powersync_core/test/crud_test.dart | 66 ++++++ .../test/powersync_shared_test.dart | 20 ++ .../test/sync/in_memory_sync_test.dart | 104 ++++++++- 6 files changed, 361 insertions(+), 77 deletions(-) diff --git a/packages/powersync_attachments_helper/lib/src/attachments_queue_table.dart b/packages/powersync_attachments_helper/lib/src/attachments_queue_table.dart index 938ad4b1..4fcc69a3 100644 --- a/packages/powersync_attachments_helper/lib/src/attachments_queue_table.dart +++ b/packages/powersync_attachments_helper/lib/src/attachments_queue_table.dart @@ -68,7 +68,8 @@ class Attachment { /// 4. Attachment to be archived enum AttachmentState { queuedUpload, queuedDownload, queuedDelete, archived } -class AttachmentsQueueTable extends Table { +// ignore: deprecated_subclass +final class AttachmentsQueueTable extends Table { AttachmentsQueueTable( {String attachmentsQueueTableName = defaultAttachmentsQueueTableName, List additionalColumns = const [], diff --git a/packages/powersync_core/lib/src/attachments/attachment.dart b/packages/powersync_core/lib/src/attachments/attachment.dart index 00f0032a..9d15e6fa 100644 --- a/packages/powersync_core/lib/src/attachments/attachment.dart +++ b/packages/powersync_core/lib/src/attachments/attachment.dart @@ -170,28 +170,30 @@ final class Attachment { /// /// {@category attachments} @experimental -final class AttachmentsQueueTable extends Table { - AttachmentsQueueTable({ +extension type AttachmentsQueueTable._(Table _) implements Table { + factory AttachmentsQueueTable({ String attachmentsQueueTableName = defaultTableName, List additionalColumns = const [], List indexes = const [], String? viewName, - }) : super.localOnly( - attachmentsQueueTableName, - [ - const Column.text('filename'), - const Column.text('local_uri'), - const Column.integer('timestamp'), - const Column.integer('size'), - const Column.text('media_type'), - const Column.integer('state'), - const Column.integer('has_synced'), - const Column.text('meta_data'), - ...additionalColumns, - ], - viewName: viewName, - indexes: indexes, - ); + }) { + return AttachmentsQueueTable._(Table.localOnly( + attachmentsQueueTableName, + [ + const Column.text('filename'), + const Column.text('local_uri'), + const Column.integer('timestamp'), + const Column.integer('size'), + const Column.text('media_type'), + const Column.integer('state'), + const Column.integer('has_synced'), + const Column.text('meta_data'), + ...additionalColumns, + ], + viewName: viewName, + indexes: indexes, + )); + } static const defaultTableName = 'attachments_queue'; } diff --git a/packages/powersync_core/lib/src/schema.dart b/packages/powersync_core/lib/src/schema.dart index 1289cae0..80561e94 100644 --- a/packages/powersync_core/lib/src/schema.dart +++ b/packages/powersync_core/lib/src/schema.dart @@ -1,3 +1,6 @@ +/// @docImport 'database/powersync_db_mixin.dart'; +library; + import 'crud.dart'; import 'schema_logic.dart'; @@ -59,19 +62,9 @@ final class TrackPreviousValuesOptions { {this.columnFilter, this.onlyWhenChanged = false}); } -/// A single table in the schema. -class Table { - static const _maxNumberOfColumns = 1999; - - /// The synced table name, matching sync rules. - final String name; - - /// List of columns. - final List columns; - - /// List of indexes. - final List indexes; - +/// Common options that can be applied on [Table] and [RawTable] (through +/// [RawTableSchema]). +final class TableOptions { /// Whether to add a hidden `_metadata` column that will be enabled for /// updates to attach custom information about writes that will be reported /// through [CrudEntry.metadata]. @@ -92,6 +85,53 @@ class Table { /// ignored when creating CRUD entries. final bool ignoreEmptyUpdates; + const TableOptions({ + this.trackMetadata = false, + this.trackPreviousValues, + this.localOnly = false, + this.insertOnly = false, + this.ignoreEmptyUpdates = false, + }); + + void _validateOptions() { + if (trackMetadata && localOnly) { + throw AssertionError("Local-only tables can't track metadata"); + } + + if (trackPreviousValues != null && localOnly) { + throw AssertionError("Local-only tables can't track old values"); + } + } + + Map _optionsToJson() { + return { + 'local_only': localOnly, + 'insert_only': insertOnly, + 'ignore_empty_update': ignoreEmptyUpdates, + 'include_metadata': trackMetadata, + if (trackPreviousValues case final trackPreviousValues?) ...{ + 'include_old': trackPreviousValues.columnFilter ?? true, + 'include_old_only_when_changed': trackPreviousValues.onlyWhenChanged, + }, + }; + } +} + +/// A single table in the schema. +@Deprecated.subclass( + 'Avoid extending table, create an instance or extension type around it instead.') +base class Table extends TableOptions { + static const _maxNumberOfColumns = 1999; + + /// The synced table name, matching sync rules. + final String name; + + /// List of columns. + final List columns; + + /// List of indexes. + final List indexes; + /// Override the name for the view final String? _viewNameOverride; @@ -120,24 +160,20 @@ class Table { this.columns, { this.indexes = const [], String? viewName, - this.localOnly = false, - this.ignoreEmptyUpdates = false, - this.trackMetadata = false, - this.trackPreviousValues, - }) : insertOnly = false, - _viewNameOverride = viewName; + super.localOnly, + super.ignoreEmptyUpdates, + super.trackMetadata, + super.trackPreviousValues, + }) : _viewNameOverride = viewName, + super(insertOnly: false); /// Create a table that only exists locally. /// /// This table does not record changes, and is not synchronized from the service. const Table.localOnly(this.name, this.columns, {this.indexes = const [], String? viewName}) - : localOnly = true, - insertOnly = false, - trackMetadata = false, - trackPreviousValues = null, - ignoreEmptyUpdates = false, - _viewNameOverride = viewName; + : _viewNameOverride = viewName, + super(localOnly: true); /// Create a table that only supports inserts. /// @@ -151,13 +187,12 @@ class Table { this.name, this.columns, { String? viewName, - this.ignoreEmptyUpdates = false, - this.trackMetadata = false, - this.trackPreviousValues, - }) : localOnly = false, - insertOnly = true, - indexes = const [], - _viewNameOverride = viewName; + super.ignoreEmptyUpdates, + super.trackMetadata, + super.trackPreviousValues, + }) : indexes = const [], + _viewNameOverride = viewName, + super(localOnly: false, insertOnly: true); Column operator [](String columnName) { return columns.firstWhere((element) => element.name == columnName); @@ -186,13 +221,7 @@ class Table { "Invalid characters in view name: $_viewNameOverride"); } - if (trackMetadata && localOnly) { - throw AssertionError("Local-only tables can't track metadata"); - } - - if (trackPreviousValues != null && localOnly) { - throw AssertionError("Local-only tables can't track old values"); - } + _validateOptions(); Set columnNames = {"id"}; for (var column in columns) { @@ -238,16 +267,9 @@ class Table { Map toJson() => { 'name': name, 'view_name': _viewNameOverride, - 'local_only': localOnly, - 'insert_only': insertOnly, 'columns': columns, 'indexes': indexes.map((e) => e.toJson(this)).toList(growable: false), - 'ignore_empty_update': ignoreEmptyUpdates, - 'include_metadata': trackMetadata, - if (trackPreviousValues case final trackPreviousValues?) ...{ - 'include_old': trackPreviousValues.columnFilter ?? true, - 'include_old_only_when_changed': trackPreviousValues.onlyWhenChanged, - }, + ..._optionsToJson(), }; } @@ -360,24 +382,89 @@ final class RawTable { /// based on data from the sync service. /// /// See [PendingStatement] for details. - final PendingStatement put; + final PendingStatement? put; /// A statement responsible for deleting a row based on its PowerSync id. /// /// See [PendingStatement] for details. Note that [PendingStatementValue]s /// used here must all be [PendingStatementValue.id]. - final PendingStatement delete; + final PendingStatement? delete; + /// For [RawTable.inferred] tables, the schema from which [put] and [delete] + /// statemenst are inferred. + final RawTableSchema? schema; + + /// An optional statement to run when [PowerSyncDatabaseMixin.disconnectAndClear] + /// is called. + final String? clear; + + /// Creates a raw table with explicit [put] and [delete] statements. + /// + /// These can also be [RawTable.inferred] when providing a [RawTableSchema]. const RawTable({ required this.name, - required this.put, - required this.delete, + required PendingStatement this.put, + required PendingStatement this.delete, + this.clear, + }) : schema = null; + + /// Creates a raw table where [put] and [delete] statements are optional + /// because the sync client can infer defaults from the [schema] of the table + /// in the local database. + const RawTable.inferred({ + required this.name, + required RawTableSchema this.schema, + this.put, + this.delete, + this.clear, }); Map toJson() => { 'name': name, 'put': put, 'delete': delete, + 'clear': clear, + ...?schema?._toJson(), + }; +} + +/// The schema of a [RawTable] in the local database. +/// +/// This information is optional when declaring raw tables with [RawTable.new]. +/// However, providing it allows the sync client to infer [RawTable.put] and +/// [RawTable.delete] statements automatically. +final class RawTableSchema { + /// The actual name of the raw table in the local schema. + /// + /// Unlike [RawTable.name], which describes the name of _synced_ tables to + /// match, this reflects the local SQLite table name. This is used to infer + /// [RawTable.put] and [RawTable.delete] statements for the sync client. It + /// can also be used to auto-generate triggers forwarding writes on raw tables + /// into the CRUD upload queue (using the `powersync_create_raw_table_crud_trigger` + /// SQL function). + final String tableName; + + /// An optional filter of columns that should be synced. + /// + /// By default, all columns in raw tables are considered for sync. If a filter + /// is specified, PowerSync treats unmatched columns as local-only and will + /// not attempt to sync them. + final List? syncedColumns; + + /// Common options affecting how the `powersync_create_raw_table_crud_trigger` + /// SQL function generates triggers. + final TableOptions options; + + const RawTableSchema({ + required this.tableName, + this.syncedColumns, + this.options = const TableOptions(), + }); + + Map _toJson() => { + 'table_name': tableName, + if (syncedColumns != null) 'synced_columns': syncedColumns, + ...options._optionsToJson(), }; } @@ -399,7 +486,7 @@ final class PendingStatement { /// [PendingStatementValue.id]. final List params; - PendingStatement({required this.sql, required this.params}); + const PendingStatement({required this.sql, required this.params}); Map toJson() => { 'sql': sql, @@ -411,7 +498,12 @@ final class PendingStatement { /// running a [PendingStatement] for a [RawTable]. sealed class PendingStatementValue { /// A value that is bound to the textual id used in the PowerSync protocol. - factory PendingStatementValue.id() = _PendingStmtValueId; + const factory PendingStatementValue.id() = _PendingStmtValueId; + + /// A value that is bound to a JSON object containing all columns from the + /// synced row that haven't been matched by a [PendingStatementValue.column] + /// value in the same statement. + const factory PendingStatementValue.rest() = _PendingStmtValueRest; /// A value that is bound to the value of a column in a replace (`PUT`) /// operation of the PowerSync protocol. @@ -441,6 +533,15 @@ class _PendingStmtValueId implements PendingStatementValue { } } +class _PendingStmtValueRest implements PendingStatementValue { + const _PendingStmtValueRest(); + + @override + dynamic toJson() { + return 'Rest'; + } +} + /// Type of column. enum ColumnType { /// TEXT column. diff --git a/packages/powersync_core/test/crud_test.dart b/packages/powersync_core/test/crud_test.dart index 0e188075..f4d0e1c1 100644 --- a/packages/powersync_core/test/crud_test.dart +++ b/packages/powersync_core/test/crud_test.dart @@ -1,3 +1,5 @@ +import 'dart:convert'; + import 'package:powersync_core/powersync_core.dart'; import 'package:sqlite_async/sqlite3_common.dart'; import 'package:test/test.dart'; @@ -19,6 +21,8 @@ void main() { powersync = await testUtils.setupPowerSync(path: path); }); + tearDown(() => powersync.close()); + test('INSERT', () async { expect(await powersync.getAll('SELECT * FROM ps_crud'), equals([])); await powersync.execute( @@ -397,5 +401,67 @@ void main() { await powersync.execute('UPDATE lists SET name = ?;', ['name']); expect(await powersync.getNextCrudTransaction(), isNull); }); + + test('inferred crud trigger for raw tables', () async { + const table = RawTable.inferred( + name: 'sync_name', schema: RawTableSchema(tableName: 'users')); + await powersync.execute('CREATE TABLE users (id TEXT, name TEXT);'); + await powersync.execute( + 'SELECT powersync_create_raw_table_crud_trigger(?, ?, ?)', + [json.encode(table), 'users_insert', 'INSERT'], + ); + await powersync.execute( + 'INSERT INTO users (id, name) VALUES (?, ?);', ['id', 'user']); + + final tx = await powersync.getNextCrudTransaction(); + expect(tx!.crud, [ + isA() + .having((e) => e.op, 'op', UpdateType.put) + .having((e) => e.table, 'table', 'sync_name') + .having((e) => e.id, 'id', 'id') + .having((e) => e.opData, 'opData', {'name': 'user'}), + ]); + }); + + test('raw tables with options', () async { + const table = RawTable.inferred( + name: 'sync_name', + schema: RawTableSchema( + tableName: 'users', + syncedColumns: ['name'], + options: TableOptions( + ignoreEmptyUpdates: true, + trackPreviousValues: TrackPreviousValuesOptions(), + ), + ), + ); + + await powersync + .execute('CREATE TABLE users (id TEXT, name TEXT, local TEXT);'); + await powersync.execute( + 'INSERT INTO users (id, name, local) VALUES (?, ?, ?);', + ['id', 'name', 'local']); + + await powersync.execute( + 'SELECT powersync_create_raw_table_crud_trigger(?, ?, ?)', + [json.encode(table), 'users_update', 'UPDATE'], + ); + + await powersync.execute('UPDATE users SET name = ?, local = ?', + ['updated_name', 'updated_local']); + // This should not generate a CRUD entry because the only syned column is + // not affected. + await powersync.execute('UPDATE users SET name = ?, local = ?', + ['updated_name', 'updated_local_2']); + + final tx = await powersync.getNextCrudTransaction(); + expect(tx!.crud, [ + isA() + .having((e) => e.op, 'op', UpdateType.patch) + .having((e) => e.id, 'id', 'id') + .having((e) => e.opData, 'opData', {'name': 'updated_name'}).having( + (e) => e.previousValues, 'previousValues', {'name': 'name'}), + ]); + }); }); } diff --git a/packages/powersync_core/test/powersync_shared_test.dart b/packages/powersync_core/test/powersync_shared_test.dart index b4317d89..2ee287fc 100644 --- a/packages/powersync_core/test/powersync_shared_test.dart +++ b/packages/powersync_core/test/powersync_shared_test.dart @@ -146,5 +146,25 @@ void main() { await db.close(); }); + + test('can clear raw tables', () async { + final db = await testUtils.setupPowerSync(path: path); + await db.updateSchema(const Schema([], rawTables: [ + RawTable( + name: 'unused', + put: PendingStatement(sql: '', params: []), + delete: PendingStatement(sql: '', params: []), + clear: 'DELETE FROM lists', + ) + ])); + await db.execute( + 'CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT)'); + await db + .execute('INSERT INTO lists (id, name) VALUES (uuid(), ?)', ['list']); + + expect(await db.getAll('SELECT * FROM lists'), hasLength(1)); + await db.disconnectAndClear(); + expect(await db.getAll('SELECT * FROM lists'), isEmpty); + }); }); } diff --git a/packages/powersync_core/test/sync/in_memory_sync_test.dart b/packages/powersync_core/test/sync/in_memory_sync_test.dart index 6a465dab..2349bfd4 100644 --- a/packages/powersync_core/test/sync/in_memory_sync_test.dart +++ b/packages/powersync_core/test/sync/in_memory_sync_test.dart @@ -219,15 +219,104 @@ void _declareTests(String name, SyncOptions options, bool bson) { }); } else { // raw tables are only supported by the rust sync client - test('raw tables', () async { + test('raw tables with implicit statements', () async { + final schema = const Schema([], rawTables: [ + RawTable.inferred( + name: 'lists', + schema: RawTableSchema(tableName: 'lists'), + ), + ]); + + await database.execute( + 'CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);'); + final query = StreamQueue( + database.watch('SELECT * FROM lists', throttle: Duration.zero)); + await expectLater(query, emits(isEmpty)); + + await database.updateSchema(schema); + await waitForConnection(); + + syncService + ..addLine({ + 'checkpoint': Checkpoint( + lastOpId: '1', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', priority: 3, checksum: 0) + ], + ) + }) + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': [ + { + 'checksum': 0, + 'data': json.encode({'name': 'custom list'}), + 'op': 'PUT', + 'op_id': '1', + 'object_id': 'my_list', + 'object_type': 'lists' + } + ] + } + }) + ..addLine({ + 'checkpoint_complete': {'last_op_id': '1'} + }); + + await expectLater( + query, + emits([ + { + 'id': 'my_list', + 'name': 'custom list', + } + ]), + ); + + syncService + ..addLine({ + 'checkpoint': Checkpoint( + lastOpId: '2', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', priority: 3, checksum: 0) + ], + ) + }) + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': [ + { + 'checksum': 0, + 'op': 'REMOVE', + 'op_id': '2', + 'object_id': 'my_list', + 'object_type': 'lists' + } + ] + } + }) + ..addLine({ + 'checkpoint_complete': {'last_op_id': '2'} + }); + + await expectLater(query, emits(isEmpty)); + }); + + test('raw tables with explicit statements', () async { final schema = Schema(const [], rawTables: [ RawTable( name: 'lists', put: PendingStatement( - sql: 'INSERT OR REPLACE INTO lists (id, name) VALUES (?, ?)', + sql: + 'INSERT OR REPLACE INTO lists (id, name, _rest) VALUES (?, ?, ?)', params: [ PendingStatementValue.id(), PendingStatementValue.column('name'), + PendingStatementValue.rest(), ], ), delete: PendingStatement( @@ -240,7 +329,7 @@ void _declareTests(String name, SyncOptions options, bool bson) { ]); await database.execute( - 'CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT);'); + 'CREATE TABLE lists (id TEXT NOT NULL PRIMARY KEY, name TEXT, _rest TEXT);'); final query = StreamQueue( database.watch('SELECT * FROM lists', throttle: Duration.zero)); await expectLater(query, emits(isEmpty)); @@ -264,7 +353,8 @@ void _declareTests(String name, SyncOptions options, bool bson) { 'data': [ { 'checksum': 0, - 'data': json.encode({'name': 'custom list'}), + 'data': json.encode( + {'name': 'custom list', 'additional_column': 'foo'}), 'op': 'PUT', 'op_id': '1', 'object_id': 'my_list', @@ -280,7 +370,11 @@ void _declareTests(String name, SyncOptions options, bool bson) { await expectLater( query, emits([ - {'id': 'my_list', 'name': 'custom list'} + { + 'id': 'my_list', + 'name': 'custom list', + '_rest': json.encode({'additional_column': 'foo'}) + } ]), );