diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 24c5ab5..36409e9 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -107,6 +107,11 @@ impl SyncClient { SyncControlRequest::StopSyncStream => self.state.tear_down(), } } + + /// Whether a sync iteration is currently active on the connection. + pub fn has_sync_iteration(&self) -> bool { + matches!(self.state, ClientState::IterationActive(_)) + } } enum ClientState { diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index a185063..2d1c483 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -10,7 +10,7 @@ use powersync_sqlite_nostd as sqlite; use powersync_sqlite_nostd::{Connection, Context}; use sqlite::{ResultCode, Value}; -use crate::error::PowerSyncError; +use crate::error::{PSResult, PowerSyncError}; use crate::migrations::{LATEST_VERSION, powersync_migrate}; use crate::schema::inspection::ExistingView; use crate::state::DatabaseState; @@ -75,7 +75,7 @@ fn powersync_clear_impl( // speed up the next sync. local_db.exec_safe("DELETE FROM ps_oplog; DELETE FROM ps_buckets")?; } else { - local_db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0")?; + trigger_resync(local_db, state)?; local_db.exec_safe("DELETE FROM ps_buckets WHERE name = '$local'")?; } @@ -86,10 +86,10 @@ DELETE FROM ps_crud; DELETE FROM ps_untyped; DELETE FROM ps_updated_rows; DELETE FROM ps_kv WHERE key != 'client_id'; -DELETE FROM ps_sync_state; DELETE FROM ps_stream_subscriptions; ", )?; + clear_has_synced(local_db)?; let table_glob = if flags.clear_local() { "ps_data_*" @@ -138,6 +138,52 @@ DELETE FROM {table};", Ok(String::from("")) } +fn trigger_resync(db: *mut sqlite::sqlite3, state: &DatabaseState) -> Result<(), PowerSyncError> { + { + let client = state.sync_client.borrow(); + if let Some(client) = client.as_ref() + && client.has_sync_iteration() + { + return Err(PowerSyncError::argument_error( + "Cannot clear or trigger resync while a sync iteration is active.", + )); + } + } + + db.exec_safe("UPDATE ps_buckets SET last_applied_op = 0 WHERE name != '$local'") + .into_db_result(db)?; + Ok(Default::default()) +} + +fn clear_has_synced(db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> { + db.exec_safe("DELETE FROM ps_sync_state;")?; + db.exec_safe("UPDATE ps_stream_subscriptions SET last_synced_at = NULL")?; + Ok(()) +} + +fn powersync_trigger_resync_impl( + ctx: *mut sqlite::context, + args: &[*mut sqlite::value], +) -> Result { + let local_db = ctx.db_handle(); + let state = unsafe { DatabaseState::from_context(&ctx) }; + trigger_resync(local_db, state)?; + + let clear_progress = args[0].int() != 0; + if clear_progress { + clear_has_synced(local_db)?; + } + + Ok(Default::default()) +} + +create_auto_tx_function!(powersync_trigger_resync_tx, powersync_trigger_resync_impl); +create_sqlite_text_fn!( + powersync_trigger_resync, + powersync_trigger_resync_tx, + "powersync_trigger_resync" +); + #[derive(Clone, Copy)] struct PowerSyncClearFlags(i32); @@ -199,12 +245,23 @@ pub fn register(db: *mut sqlite::sqlite3, state: Rc) -> Result<() "powersync_clear", 1, sqlite::UTF8, - Some(Rc::into_raw(state) as *mut c_void), + Some(Rc::into_raw(state.clone()) as *mut c_void), Some(powersync_clear), None, None, Some(DatabaseState::destroy_rc), )?; + db.create_function_v2( + "powersync_trigger_resync", + 1, + sqlite::UTF8, + Some(Rc::into_raw(state) as *mut c_void), + Some(powersync_trigger_resync), + None, + None, + Some(DatabaseState::destroy_rc), + )?; + Ok(()) } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 4145fcf..92181bf 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -400,6 +400,7 @@ void _syncTests({ db.execute( 'insert into items (id, col) values (uuid(), ?)', ['local item']); expect(db.select('SELECT * FROM items'), hasLength(2)); + invokeControl('stop', null); // Soft clear db.execute('SELECT powersync_clear(2)'); @@ -427,6 +428,70 @@ void _syncTests({ expect(db.select('SELECT * FROM items'), hasLength(1)); }); + group('trigger resync', () { + test('forbidden during sync', () { + invokeControl('start', null); + + expect( + () => db.select('SELECT powersync_trigger_resync(1)'), + throwsA( + isSqliteException( + 3091, + contains( + 'Cannot clear or trigger resync while a sync iteration is active.'), + ), + ), + ); + }); + + test('re-applies data', () { + invokeControl('start', null); + pushCheckpoint(buckets: [bucketDescription('a', count: 1)]); + pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'}); + pushCheckpointComplete(); + invokeControl('stop', null); + + db.execute('delete from ps_data__items'); + db.execute('select powersync_trigger_resync(0)'); + + final instructions = invokeControl('start', null); + expect( + instructions, + contains( + containsPair( + 'EstablishSyncStream', + containsPair( + 'request', + containsPair('buckets', [ + {'name': 'a', 'after': '1'} + ]), + ), + ), + ), + ); + + pushCheckpoint(buckets: [bucketDescription('a', count: 1)]); + pushCheckpointComplete(); + + expect(db.select('select * from items'), [ + {'id': 'row-0', 'col': 'hi'} + ]); + }); + + test('can clear has synced', () { + invokeControl('start', null); + pushCheckpoint(buckets: [bucketDescription('a', count: 1)]); + pushSyncData('a', '1', 'row-0', 'PUT', {'col': 'hi'}); + pushCheckpointComplete(); + invokeControl('stop', null); + + db.execute('select powersync_trigger_resync(1)'); + final [row] = db.select('select powersync_offline_sync_status()'); + expect(json.decode(row.columnAt(0)), + containsPair('priority_status', isEmpty)); + }); + }); + test('persists download progress', () { const bucket = 'bkt'; void expectProgress(int atLast, int sinceLast) {