Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"libsql-wal",
"libsql-storage",
"libsql-storage-server",
"libsql-sync",
]

exclude = [
Expand Down
79 changes: 79 additions & 0 deletions docs/WAL_SYNC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# libSQL Sync Protocol Specification

## Overview

This is a protocol for supporting offline writes by allowing a database instance to sync its write-ahead log between clients and a remote server.

## Operations

### PushWAL

Push the local WAL to a remote server.

**Request:**

- `database_id`: The ID of the database.
- `checkpoint_seq_num`: The current checkpoint sequence number.
- `frame_num_start`: The number of the first frame to push.
- `frame_num_end`: The number of the first frame to push.
- `frames`: The WAL frames to push.

**Response:**

- `status`: SUCCESS, CONFLICT, ERROR, or NEED_FULL_SYNC
- `durable_frame_num`: The highest frame number the server acknowledges as durable.

A client uses the `PushWAL` operation to push its local WAL to the remote server. The operation is idempotent on frames, which means it is safe for the client to send the same frames multiple times. If the server already has them, it ignores them. As an optimization, the client can keep track of durable checkpoint sequence and frame number tuple acknowledged by a remote server to prevent sending duplicate frames.

**TODO:**

- Return remote WAL on conflict if client requests it.
- Allow client to request server to perform checkpointing.
- Checksum support in the WAL frames.

### PullWAL

Retrieve new WAL frames from the remote server.

**Request**:

- `database_id`: The ID of the database.
- `checkpoint_seq_num`: The current checkpoint sequence number.
- `max_frame_num`: The highest frame number in the local WAL.

**Response**:
- `status`: SUCCESS, CONFLICT, ERROR, or NEED_FULL_SYNC
- `frames`: List of new WAL frames

### FetchDatabase

Retrieve the full database file from the server.

**Request**:

- `database_id`: The ID of the database.

**Response**:

- Stream of database chunks

A client uses the `FetchDatabase` operation to bootstrap a database file locally and also for disaster recovery.

## Checkpointing Process

1. Client may request a checkpoint during PushWAL.
2. Server decides whether to initiate a checkpoint based on its state and the client's request.
3. If checkpoint is needed, server sets `perform_checkpoint` to true in the PushWAL response.
4. Client performs local checkpoint up to `checkpoint_frame_id` if instructed.
5. Server performs its own checkpoint after sending the response.

## Conflict Resolution

- The server returns `CONFLICT` error if the WAL on remote is more up-to-date than the client.
- The server sends its current WAL in the response for the client to merge and retry the push.

## Bootstrapping

1. New clients start by calling `FetchDatabase` to get the full database file.
2. Follow up with PullWAL to get any new changes since the database file was generated.
3. Apply received WAL frames to the database file to reach the current state.
122 changes: 119 additions & 3 deletions libsql-ffi/bundled/SQLite3MultipleCiphers/src/sqlite3.c
Original file line number Diff line number Diff line change
Expand Up @@ -10935,6 +10935,10 @@ SQLITE_API int sqlite3_preupdate_blobwrite(sqlite3 *);
*/
SQLITE_API void *libsql_close_hook(sqlite3 *db, void (*xClose)(void *pCtx, sqlite3 *db), void *arg);

SQLITE_API int libsql_wal_frame_count(sqlite3*, unsigned int*);

SQLITE_API int libsql_wal_get_frame(sqlite3*, unsigned int, void*, unsigned int);

/*
** CAPI3REF: Low-level system error code
** METHOD: sqlite3
Expand Down Expand Up @@ -13960,6 +13964,7 @@ typedef struct libsql_wal_methods {
/* Read a page from the write-ahead log, if it is present. */
int (*xFindFrame)(wal_impl* pWal, unsigned int, unsigned int *);
int (*xReadFrame)(wal_impl* pWal, unsigned int, int, unsigned char *);
int (*xReadFrameRaw)(wal_impl* pWal, unsigned int, int, unsigned char *);

/* If the WAL is not empty, return the size of the database. */
unsigned int (*xDbsize)(wal_impl* pWal);
Expand Down Expand Up @@ -16373,6 +16378,9 @@ SQLITE_PRIVATE int sqlite3PagerReadFileheader(Pager*, int, unsigned char*);
SQLITE_PRIVATE void sqlite3PagerSetBusyHandler(Pager*, int(*)(void *), void *);
SQLITE_PRIVATE int sqlite3PagerSetPagesize(Pager*, u32*, int);
SQLITE_PRIVATE Pgno sqlite3PagerMaxPageCount(Pager*, Pgno);
SQLITE_PRIVATE unsigned int sqlite3PagerWalFrameCount(Pager *);
SQLITE_PRIVATE int sqlite3PagerWalReadFrame(Pager *, unsigned int, void *, unsigned int);

SQLITE_PRIVATE void sqlite3PagerSetCachesize(Pager*, int);
SQLITE_PRIVATE int sqlite3PagerSetSpillsize(Pager*, int);
SQLITE_PRIVATE void sqlite3PagerSetMmapLimit(Pager *, sqlite3_int64);
Expand Down Expand Up @@ -57268,6 +57276,7 @@ typedef struct libsql_wal_methods {
/* Read a page from the write-ahead log, if it is present. */
int (*xFindFrame)(wal_impl* pWal, unsigned int, unsigned int *);
int (*xReadFrame)(wal_impl* pWal, unsigned int, int, unsigned char *);
int (*xReadFrameRaw)(wal_impl* pWal, unsigned int, int, unsigned char *);

/* If the WAL is not empty, return the size of the database. */
unsigned int (*xDbsize)(wal_impl* pWal);
Expand Down Expand Up @@ -65212,6 +65221,33 @@ SQLITE_PRIVATE int sqlite3PagerCloseWal(Pager *pPager, sqlite3 *db){
return rc;
}

SQLITE_PRIVATE unsigned int sqlite3PagerWalFrameCount(Pager *pPager){
if( pagerUseWal(pPager) ){
// TODO: We are under sqlite3 mutex, but do we need something else?
struct sqlite3_wal* pWal = (void*) pPager->wal->pData;
return pWal->hdr.mxFrame;
}else{
return 0;
}
}

SQLITE_PRIVATE int sqlite3PagerWalReadFrameRaw(
Pager *pPager,
unsigned int iFrame,
void *pFrameOut,
unsigned int nFrameOutLen
){
if( pagerUseWal(pPager) ){
unsigned int nFrameLen = 24+pPager->pageSize;
if( nFrameOutLen!=nFrameLen ) return SQLITE_MISUSE;
return pPager->wal->methods.xReadFrameRaw(pPager->wal->pData, iFrame, pPager->pageSize, pFrameOut);
}else{
return SQLITE_ERROR;
}
}

int (*xReadFrame)(wal_impl* pWal, unsigned int, int, unsigned char *);

#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
/*
** If pager pPager is a wal-mode database not in exclusive locking mode,
Expand Down Expand Up @@ -67599,9 +67635,10 @@ static int sqlite3WalClose(
if( pWal->exclusiveMode==WAL_NORMAL_MODE ){
pWal->exclusiveMode = WAL_EXCLUSIVE_MODE;
}
rc = sqlite3WalCheckpoint(pWal, db,
SQLITE_CHECKPOINT_PASSIVE, 0, 0, sync_flags, nBuf, zBuf, 0, 0, NULL, NULL
);
rc = SQLITE_ERROR;
//rc = sqlite3WalCheckpoint(pWal, db,
// SQLITE_CHECKPOINT_PASSIVE, 0, 0, sync_flags, nBuf, zBuf, 0, 0, NULL, NULL
//);
if( rc==SQLITE_OK ){
int bPersist = -1;
sqlite3OsFileControlHint(
Expand Down Expand Up @@ -68729,6 +68766,28 @@ static int sqlite3WalReadFrame(
return sqlite3OsRead(pWal->pWalFd, pOut, (nOut>sz ? sz : nOut), iOffset);
}

/*
** Read the contents of frame iRead from the wal file into buffer pOut
** (which is nOut bytes in size). Return SQLITE_OK if successful, or an
** error code otherwise.
*/
static int sqlite3WalReadFrameRaw(
Wal *pWal, /* WAL handle */
u32 iRead, /* Frame to read */
int nOut, /* Size of buffer pOut in bytes */
u8 *pOut /* Buffer to write page data to */
){
int sz;
i64 iOffset;
sz = pWal->hdr.szPage;
sz = (sz&0xfe00) + ((sz&0x0001)<<16);
testcase( sz<=32768 );
testcase( sz>=65536 );
iOffset = walFrameOffset(iRead, sz);
/* testcase( IS_BIG_INT(iOffset) ); // requires a 4GiB WAL */
return sqlite3OsRead(pWal->pWalFd, pOut, (nOut>sz ? sz : nOut), iOffset);
}

/*
** Return the size of the database in pages (or zero, if unknown).
*/
Expand Down Expand Up @@ -69838,6 +69897,7 @@ static int sqlite3WalOpen(
out->methods.xEndReadTransaction = (void (*)(wal_impl *))sqlite3WalEndReadTransaction;
out->methods.xFindFrame = (int (*)(wal_impl *, unsigned int, unsigned int *))sqlite3WalFindFrame;
out->methods.xReadFrame = (int (*)(wal_impl *, unsigned int, int, unsigned char *))sqlite3WalReadFrame;
out->methods.xReadFrameRaw = (int (*)(wal_impl *, unsigned int, int, unsigned char *))sqlite3WalReadFrameRaw;
out->methods.xDbsize = (unsigned int (*)(wal_impl *))sqlite3WalDbsize;
out->methods.xBeginWriteTransaction = (int (*)(wal_impl *))sqlite3WalBeginWriteTransaction;
out->methods.xEndWriteTransaction = (int (*)(wal_impl *))sqlite3WalEndWriteTransaction;
Expand Down Expand Up @@ -182863,6 +182923,62 @@ void *libsql_close_hook(
return pRet;
}

/*
** Return the number of frames in the WAL of the given database.
*/
int libsql_wal_frame_count(
sqlite3* db,
unsigned int *pnFrame
){
int rc = SQLITE_OK;
Pager *pPager;

#ifdef SQLITE_OMIT_WAL
*pnFrame = 0;
return SQLITE_OK;
#else
#ifdef SQLITE_ENABLE_API_ARMOR
if( !sqlite3SafetyCheckOk(db) ) return SQLITE_MISUSE_BKPT;
#endif

sqlite3_mutex_enter(db->mutex);
pPager = sqlite3BtreePager(db->aDb[0].pBt);
*pnFrame = sqlite3PagerWalFrameCount(pPager);
sqlite3_mutex_leave(db->mutex);

return rc;
#endif
}

int libsql_wal_get_frame(
sqlite3* db,
unsigned int iFrame,
void *pBuf,
unsigned int nBuf
){
int rc = SQLITE_OK;
Pager *pPager;

#ifdef SQLITE_OMIT_WAL
UNUSED_PARAMETER(iFrame);
UNUSED_PARAMETER(nBuf);
UNUSED_PARAMETER(pBuf);
return SQLITE_OK;
#else

#ifdef SQLITE_ENABLE_API_ARMOR
if( !sqlite3SafetyCheckOk(db) ) return SQLITE_MISUSE_BKPT;
#endif

sqlite3_mutex_enter(db->mutex);
pPager = sqlite3BtreePager(db->aDb[0].pBt);
rc = sqlite3PagerWalReadFrameRaw(pPager, iFrame, pBuf, nBuf);
sqlite3_mutex_leave(db->mutex);

return rc;
#endif
}

/*
** Register a function to be invoked prior to each autovacuum that
** determines the number of pages to vacuum.
Expand Down
38 changes: 26 additions & 12 deletions libsql-ffi/bundled/bindings/bindgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ extern "C" {
extern "C" {
pub fn sqlite3_vmprintf(
arg1: *const ::std::os::raw::c_char,
arg2: *mut __va_list_tag,
arg2: va_list,
) -> *mut ::std::os::raw::c_char;
}
extern "C" {
Expand All @@ -954,7 +954,7 @@ extern "C" {
arg1: ::std::os::raw::c_int,
arg2: *mut ::std::os::raw::c_char,
arg3: *const ::std::os::raw::c_char,
arg4: *mut __va_list_tag,
arg4: va_list,
) -> *mut ::std::os::raw::c_char;
}
extern "C" {
Expand Down Expand Up @@ -2501,7 +2501,7 @@ extern "C" {
pub fn sqlite3_str_vappendf(
arg1: *mut sqlite3_str,
zFormat: *const ::std::os::raw::c_char,
arg2: *mut __va_list_tag,
arg2: va_list,
);
}
extern "C" {
Expand Down Expand Up @@ -2861,6 +2861,20 @@ extern "C" {
arg: *mut ::std::os::raw::c_void,
) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn libsql_wal_frame_count(
arg1: *mut sqlite3,
arg2: *mut ::std::os::raw::c_uint,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn libsql_wal_get_frame(
arg1: *mut sqlite3,
arg2: ::std::os::raw::c_uint,
arg3: *mut ::std::os::raw::c_void,
arg4: ::std::os::raw::c_uint,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn sqlite3_system_errno(arg1: *mut sqlite3) -> ::std::os::raw::c_int;
}
Expand Down Expand Up @@ -3269,6 +3283,14 @@ pub struct libsql_wal_methods {
arg3: *mut ::std::os::raw::c_uchar,
) -> ::std::os::raw::c_int,
>,
pub xReadFrameRaw: ::std::option::Option<
unsafe extern "C" fn(
pWal: *mut wal_impl,
arg1: ::std::os::raw::c_uint,
arg2: ::std::os::raw::c_int,
arg3: *mut ::std::os::raw::c_uchar,
) -> ::std::os::raw::c_int,
>,
pub xDbsize:
::std::option::Option<unsafe extern "C" fn(pWal: *mut wal_impl) -> ::std::os::raw::c_uint>,
pub xBeginWriteTransaction:
Expand Down Expand Up @@ -3504,12 +3526,4 @@ extern "C" {
extern "C" {
pub static sqlite3_wal_manager: libsql_wal_manager;
}
pub type __builtin_va_list = [__va_list_tag; 1usize];
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct __va_list_tag {
pub gp_offset: ::std::os::raw::c_uint,
pub fp_offset: ::std::os::raw::c_uint,
pub overflow_arg_area: *mut ::std::os::raw::c_void,
pub reg_save_area: *mut ::std::os::raw::c_void,
}
pub type __builtin_va_list = *mut ::std::os::raw::c_char;
Loading