Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions packages/backend/src/QueryEngine.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { describe, expect, it } from 'vitest';
import { col, param } from '@synthql/queries';
import { queryEngine } from './tests/queryEngine';
import { from } from './tests/generated';

const params = {
'where.actor_id': 2,
'where.film_id': 47,
'include.film.where.language_id': 3,
'include.film.include.language.where.last_update': '2022-02-15 10:02:19+00',
};

describe('QueryEngine', () => {
it('registerQueries + executeRegisteredQuery', async () => {
queryEngine.registerQueries([findFilmActor(false).maybe()]);

const parameterizedQueryResult =
await queryEngine.executeRegisteredQueryAndWait({
queryId: findFilmActor(false).maybe().hash,
params,
});

const regularQueryResult = await queryEngine.executeAndWait(
findFilmActor(true).maybe(),
);

expect(parameterizedQueryResult).toEqual(regularQueryResult);
});
});

function findFilmActor(regular: boolean) {
return from('film_actor')
.where({
actor_id: regular ? params['where.actor_id'] : param(),
film_id: regular ? params['where.film_id'] : param(),
})
.include({
film: from('film')
.where({
film_id: col('film_actor.film_id'),
language_id: regular
? params['include.film.where.language_id']
: param(),
})
.include({
language: from('language')
.where({
language_id: col('film.language_id'),
last_update: regular
? params[
'include.film.include.language.where.last_update'
]
: param(),
})
.maybe(),
})
.maybe(),
});
}
160 changes: 129 additions & 31 deletions packages/backend/src/QueryEngine.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { Pool } from 'pg';
import { Query, QueryResult, Table } from '@synthql/queries';
import {
AnyQuery,
Query,
QueryResult,
RegisteredQueryRequestBody,
Table,
} from '@synthql/queries';
import { composeQuery } from './execution/executors/PgExecutor/composeQuery';
import { QueryPlan, collectLast } from '.';
import { QueryProvider } from './QueryProvider';
Expand All @@ -9,12 +15,15 @@ import { QueryProviderExecutor } from './execution/executors/QueryProviderExecut
import { PgExecutor } from './execution/executors/PgExecutor';
import { generateLast } from './util/generators/generateLast';
import { SynthqlError } from './SynthqlError';
import { QueryStore } from './QueryStore';

export interface QueryEngineProps<DB> {
/**
* The database connection string e.g. `postgresql://user:password@localhost:5432/db`.
* The database connection string.
* e.g. `postgresql://user:password@localhost:5432/db`.
*
* If you use this option, SynthQL will create a conection pool for you internally.
* If you use this option, SynthQL will create
* a conection pool for you internally.
*/
url?: string;
/**
Expand All @@ -25,14 +34,20 @@ export interface QueryEngineProps<DB> {
*/
schema?: string;
/**
* An optional SQL statement that will be sent before every SynthQL query.
* If true, the executor will execute queries that have not
* been registered via `QueryEngine.registerQueries()`.
*/
dangerouslyAllowUnregisteredQueries?: boolean;
/**
* An optional SQL statement that will
* be sent before every SynthQL query.
*
* e.g `SELECT version();`
*/
prependSql?: string;
/**
* A list of providers that you want to be used
* to execute your SynthQL queries against.
* A list of providers that you want to
* execute your SynthQL queries against.
*
* e.g:
*
Expand All @@ -55,9 +70,11 @@ export interface QueryEngineProps<DB> {
*/
providers?: Array<QueryProvider<DB, Table<DB>>>;
/**
* The connection pool to which the executor will send SQL queries to.
* The connection pool to which the
* executor will send SQL queries to.
*
* You can use this instead of passing a connection string.
* You can use this instead of
* passing a connection string.
*/
pool?: Pool;

Expand All @@ -70,18 +87,24 @@ export interface QueryEngineProps<DB> {
export class QueryEngine<DB> {
private pool: Pool;
private schema: string;
private dangerouslyAllowUnregisteredQueries: boolean;
private prependSql?: string;
private queryStore: QueryStore;
private executors: Array<QueryExecutor> = [];

constructor(config: QueryEngineProps<DB>) {
this.schema = config.schema ?? 'public';
this.prependSql = config.prependSql;
this.pool =
config.pool ??
new Pool({
connectionString: config.url,
max: 10,
});
this.schema = config.schema ?? 'public';
this.dangerouslyAllowUnregisteredQueries =
config.dangerouslyAllowUnregisteredQueries ?? false;
this.prependSql = config.prependSql;

this.queryStore = new QueryStore();

const qpe = new QueryProviderExecutor(config.providers ?? []);
this.executors = [
Expand All @@ -96,12 +119,24 @@ export class QueryEngine<DB> {
];
}

compile<T>(query: T extends Query<DB, infer TTable> ? T : never): {
sql: string;
params: any[];
} {
const { sqlBuilder } = composeQuery({
defaultSchema: this.schema,
query,
});

return sqlBuilder.build();
}

execute<TTable extends Table<DB>, TQuery extends Query<DB, TTable>>(
query: TQuery,
opts?: {
/**
* The name of the database schema to execute
* your SynthQL query against
* The name of the database schema to
* execute your SynthQL query against
*
* e.g `public`
*/
Expand All @@ -113,6 +148,22 @@ export class QueryEngine<DB> {
returnLastOnly?: boolean;
},
): AsyncGenerator<QueryResult<DB, TQuery>> {
if (!this.dangerouslyAllowUnregisteredQueries) {
if (!query.hash) {
throw SynthqlError.createQueryMissingHashError({
query,
});
}

const hasQueryFn = this.queryStore.has(query.hash);

if (!hasQueryFn) {
throw SynthqlError.createQueryNotRegisteredError({
queryId: query.hash,
});
}
}

const gen = execute<DB, TQuery>(query, {
executors: this.executors,
defaultSchema: opts?.schema ?? this.schema,
Expand All @@ -133,35 +184,77 @@ export class QueryEngine<DB> {
query: TQuery,
opts?: {
/**
* The name of the database schema to execute
* your SynthQL query against
* The name of the database schema to
* execute your SynthQL query against
*
* e.g `public`
*/
schema?: string;
},
): Promise<QueryResult<DB, TQuery>> {
return await collectLast(
generateLast(
execute<DB, TQuery>(query, {
executors: this.executors,
defaultSchema: opts?.schema ?? this.schema,
prependSql: this.prependSql,
}),
),
return collectLast(
this.execute(query, {
schema: opts?.schema ?? this.schema,
returnLastOnly: true,
}),
);
}

compile<T>(query: T extends Query<DB, infer TTable> ? T : never): {
sql: string;
params: any[];
} {
const { sqlBuilder } = composeQuery({
defaultSchema: this.schema,
query,
executeRegisteredQuery<
TTable extends Table<DB>,
TQuery extends Query<DB, TTable>,
>(
{ queryId, params }: RegisteredQueryRequestBody,
opts?: {
/**
* The name of the database schema to
* execute your SynthQL query against
*
* e.g `public`
*/
schema?: string;
/**
* If true, the query result generator will wait for query
* execution completion, and then return only the last result
*/
returnLastOnly?: boolean;
},
): AsyncGenerator<QueryResult<DB, TQuery>> {
const query = this.queryStore.get({
queryId,
params,
});

return sqlBuilder.build();
return this.execute<TTable, TQuery>(query as TQuery, {
schema: opts?.schema ?? this.schema,
returnLastOnly: true,
});
}

executeRegisteredQueryAndWait<
TTable extends Table<DB>,
TQuery extends Query<DB, TTable>,
>(
{ queryId, params }: RegisteredQueryRequestBody,
opts?: {
/**
* The name of the database schema to
* execute your SynthQL query against
*
* e.g `public`
*/
schema?: string;
},
): Promise<QueryResult<DB, TQuery>> {
return collectLast(
this.executeRegisteredQuery<TTable, TQuery>(
{ queryId, params },
{
schema: opts?.schema ?? this.schema,
returnLastOnly: true,
},
),
);
}

async explain<TTable extends Table<DB>>(
Expand All @@ -173,7 +266,6 @@ export class QueryEngine<DB> {
});

const { params, sql } = sqlBuilder.build();

const explainQuery: string = `explain (analyze, buffers, verbose, settings, format json) ${sql}`;

try {
Expand All @@ -186,4 +278,10 @@ export class QueryEngine<DB> {
});
}
}

registerQueries(queries: Array<AnyQuery>) {
for (const query of queries) {
this.queryStore.set(query);
}
}
}
Loading