diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index de3a28d..3c98abe 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @netanelC @CptSchnitz @shimoncohen +* @MapColonies/vector-team \ No newline at end of file diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 1ce869f..2b22909 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -64,7 +64,7 @@ jobs: services: postgres: - image: postgres:14 + image: postgres:15 env: POSTGRES_PASSWORD: ${{ env.DB_PASSWORD }} POSTGRES_USER: ${{ env.DB_USERNAME }} diff --git a/README.md b/README.md index f47a6bb..9babc23 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,16 @@ each item on this queue is a metatile in the size of metatile property located i - `QUEUE_JOBS_RETRY_BACKOFF` - Default: true. Enables exponential backoff retries based on retryDelay instead of a fixed delay. Sets initial retryDelay to 1 if not set. - `QUEUE_JOBS_RETRY_LIMIT` - Default: 3. Max number of retries of failed jobs. - `QUEUE_JOBS_RETRY_DELAY_SECONDS` - Default: 60. Delay between retries of failed jobs, in seconds. -- `QUEUE_JOBS_RETENTION_HOURS` - Default: 87660. How many hours a job may be in created or retry state before it's archived. Must be >=1 +- `QUEUE_JOBS_RETENTION_SECONDS` - Default: 315576000. How many seconds a job may be in created or retry state before it's archived. Must be >=1 +- `DB_CREATE_SCHEMA` - Default: false. enable pg-boss schema creation at startup +- `DB_RUN_MIGRATE` - Default: false. enable pg-boss schema migrations at startup - `APP_CONSUME_CONDITION_ENABLED` - Pre-request consumption flag for conditions validation, current condition is tiles queue size (see `APP_CONSUME_CONDITION_TILES_QUEUE_SIZE_LIMIT`) - `APP_CONSUME_CONDITION_TILES_QUEUE_SIZE_LIMIT` - The max number of tiles in the tiles queue allowed before the request consumption -- `APP_CONSUME_CONDITION_CHECK_INTERVAL_SEC` - upon invalid match to the consume condition, the duration in seconds to wait until the next consume condition validation \ No newline at end of file +- `APP_CONSUME_CONDITION_CHECK_INTERVAL_SEC` - upon invalid match to the consume condition, the duration in seconds to wait until the next consume condition validation + +## pg-boss migrations +pg-boss v12 requires its database schema to be created and migrated before starting the service. Run migrations with the pg-boss CLI using your database connection (or `PGBOSS_*` environment variables): + +```bash +npx pg-boss migrate --connection-string postgres://user:pass@host:5432/database --schema pgboss +``` diff --git a/config/default.json b/config/default.json index 3900904..d621731 100644 --- a/config/default.json +++ b/config/default.json @@ -52,7 +52,7 @@ "retryBackoff": true, "retryLimit": 3, "retryDelaySeconds": 60, - "retentionHours": 87660 + "retentionSeconds": 315576000 }, "db": { "host": "localhost", @@ -60,6 +60,8 @@ "username": "postgres", "password": "postgres", "schema": "pgboss", + "createSchema": true, + "migrate": true, "ssl": { "enabled": false, "ca": "", diff --git a/helm/templates/configmap.yaml b/helm/templates/configmap.yaml index c8a119e..77137e3 100644 --- a/helm/templates/configmap.yaml +++ b/helm/templates/configmap.yaml @@ -24,6 +24,8 @@ data: DB_HOST: {{ .host }} DB_NAME: {{ .database }} DB_SCHEMA: {{ .schema }} + DB_CREATE_SCHEMA: {{ .createSchema | quote }} + DB_RUN_MIGRATE: {{ .migrate | quote }} DB_PORT: {{ .port | quote }} {{- end -}} {{- with .Values.queueConfig }} @@ -31,7 +33,7 @@ data: QUEUE_JOBS_RETRY_BACKOFF: {{ .retryBackoff | quote }} QUEUE_JOBS_RETRY_LIMIT: {{ .retryLimit | quote }} QUEUE_JOBS_RETRY_DELAY_SECONDS: {{ .retryDelaySeconds | quote }} - QUEUE_JOBS_RETENTION_HOURS: {{ .retentionHours | quote }} + QUEUE_JOBS_RETENTION_SECONDS: {{ .retentionSeconds | quote }} {{- end -}} FORCE_API_TILES: {{ .Values.env.force.api | quote }} FORCE_EXPIRED_TILES: {{ .Values.env.force.expiredTiles | quote }} diff --git a/helm/values.yaml b/helm/values.yaml index a83a304..2b0bed5 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -61,7 +61,7 @@ queueConfig: retryBackoff: true retryLimit: 3 retryDelaySeconds: 60 - retentionHours: 87660 + retentionSeconds: 315576000 dbConfig: host: localhost @@ -69,6 +69,8 @@ dbConfig: password: postgres database: metatile-queue-populator schema: pgboss + createSchema: true + migrate: true port: 5432 sslAuth: enabled: false diff --git a/package-lock.json b/package-lock.json index e8d3bfd..5ba4bb4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,7 @@ "@map-colonies/js-logger": "^1.0.1", "@map-colonies/openapi-express-viewer": "^3.0.0", "@map-colonies/read-pkg": "0.0.1", - "@map-colonies/schemas": "^1.17.0", + "@map-colonies/schemas": "https://ghatmpstorage.blob.core.windows.net/npm-packages/schemas-36806c74374e95e9771f7673691d2fd33cb2337e.tgz", "@map-colonies/telemetry": "^10.0.1", "@map-colonies/tile-calc": "0.1.3", "@map-colonies/tsconfig": "^1.0.1", @@ -30,7 +30,7 @@ "express": "^4.19.2", "express-openapi-validator": "^5.0.4", "http-status-codes": "^2.2.0", - "pg-boss": "^7.1.0", + "pg-boss": "^12.8.0", "prom-client": "^15.1.2", "reflect-metadata": "^0.1.13", "snake-case": "^3.0.4", @@ -4188,8 +4188,8 @@ }, "node_modules/@map-colonies/schemas": { "version": "1.17.0", - "resolved": "https://registry.npmjs.org/@map-colonies/schemas/-/schemas-1.17.0.tgz", - "integrity": "sha512-lfp27EkpXM2zlHDHtVNkjScbRZDjcn4HHfYtl8tD2lOPMPoR55wTRBsh5OhUllbLbv7O2+youA1w3Ny+iUlW9w==", + "resolved": "https://ghatmpstorage.blob.core.windows.net/npm-packages/schemas-36806c74374e95e9771f7673691d2fd33cb2337e.tgz", + "integrity": "sha512-qmcpoI5yUlEVRxVMLzfllKpuQRb0BvfzmDT/pyl7kE8dypqO4e39G2OkkKOgXT09FPyvwArws/1UXrxJmid1tA==", "license": "MIT", "peer": true }, @@ -9519,19 +9519,6 @@ "node": ">= 14" } }, - "node_modules/aggregate-error": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.1.0.tgz", - "integrity": "sha512-4I7Td01quW/RpocfNayFdFVk1qSuoh0E7JrbRJ16nH01HhKFQ88INq9Sd+nd72zqRySlr9BmDA8xlEJ6vJMrYA==", - "license": "MIT", - "dependencies": { - "clean-stack": "^2.0.0", - "indent-string": "^4.0.0" - }, - "engines": { - "node": ">=8" - } - }, "node_modules/ajv": { "version": "8.17.1", "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.17.1.tgz", @@ -10718,15 +10705,6 @@ "dev": true, "license": "MIT" }, - "node_modules/clean-stack": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-2.2.0.tgz", - "integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==", - "license": "MIT", - "engines": { - "node": ">=6" - } - }, "node_modules/cliui": { "version": "8.0.1", "resolved": "https://registry.npmjs.org/cliui/-/cliui-8.0.1.tgz", @@ -11293,15 +11271,15 @@ } }, "node_modules/cron-parser": { - "version": "4.9.0", - "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-4.9.0.tgz", - "integrity": "sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==", + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/cron-parser/-/cron-parser-5.5.0.tgz", + "integrity": "sha512-oML4lKUXxizYswqmxuOCpgFS8BNUJpIu6k/2HVHyaL8Ynnf3wdf9tkns0yRdJLSIjkJ+b0DXHMZEHGpMwjnPww==", "license": "MIT", "dependencies": { - "luxon": "^3.2.1" + "luxon": "^3.7.1" }, "engines": { - "node": ">=12.0.0" + "node": ">=18" } }, "node_modules/cross-env": { @@ -11604,18 +11582,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/delay": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz", - "integrity": "sha512-ReEBKkIfe4ya47wlPYf/gu5ib6yUG0/Aez0JQZQz94kiWtRQvZIQbTiehsnwHvLSWJnQdhVeqYue7Id1dKr0qw==", - "license": "MIT", - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/delayed-stream": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", @@ -14673,15 +14639,6 @@ "node": ">=0.8.19" } }, - "node_modules/indent-string": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-4.0.0.tgz", - "integrity": "sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==", - "license": "MIT", - "engines": { - "node": ">=8" - } - }, "node_modules/inflight": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", @@ -17738,6 +17695,7 @@ "version": "4.0.8", "resolved": "https://registry.npmjs.org/lodash.debounce/-/lodash.debounce-4.0.8.tgz", "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", + "dev": true, "license": "MIT" }, "node_modules/lodash.get": { @@ -18377,6 +18335,18 @@ "dev": true, "license": "MIT" }, + "node_modules/non-error": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/non-error/-/non-error-0.1.0.tgz", + "integrity": "sha512-TMB1uHiGsHRGv1uYclfhivcnf0/PdFp2pNqRxXjncaAsjYMoisaQJI+SSZCqRq+VliwRTC8tsMQfmrWjDMhkPQ==", + "license": "MIT", + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/normalize-path": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/normalize-path/-/normalize-path-3.0.0.tgz", @@ -18890,21 +18860,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/p-map": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/p-map/-/p-map-4.0.0.tgz", - "integrity": "sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==", - "license": "MIT", - "dependencies": { - "aggregate-error": "^3.0.0" - }, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", @@ -19100,29 +19055,20 @@ } }, "node_modules/pg-boss": { - "version": "7.4.0", - "resolved": "https://registry.npmjs.org/pg-boss/-/pg-boss-7.4.0.tgz", - "integrity": "sha512-thjVCqLvFINqDJq/fS3dsH2lglZ9owqJi6waTNO/Bmdr0zDbRbrgEsAkzi50riYPaDx5EUvmpsFHipmR1XjxWA==", + "version": "12.8.0", + "resolved": "https://registry.npmjs.org/pg-boss/-/pg-boss-12.8.0.tgz", + "integrity": "sha512-JiOu1Y+/LgOf9saIfnsnU/fOc6OVojYwDQAa7vluAIkJJuHeyCeFHw5H4XAfvzcVGOSfBE2SIRV6wmyZY1LZvA==", "license": "MIT", "dependencies": { - "cron-parser": "^4.0.0", - "delay": "^5.0.0", - "lodash.debounce": "^4.0.8", - "p-map": "^4.0.0", - "pg": "^8.5.1", - "uuid": "^8.3.2" + "cron-parser": "^5.5.0", + "pg": "^8.17.2", + "serialize-error": "^13.0.1" }, - "engines": { - "node": ">=12.0.0" - } - }, - "node_modules/pg-boss/node_modules/uuid": { - "version": "8.3.2", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", - "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", - "license": "MIT", "bin": { - "uuid": "dist/bin/uuid" + "pg-boss": "dist/cli.js" + }, + "engines": { + "node": ">=22.12.0" } }, "node_modules/pg-cloudflare": { @@ -20555,6 +20501,37 @@ "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==", "license": "MIT" }, + "node_modules/serialize-error": { + "version": "13.0.1", + "resolved": "https://registry.npmjs.org/serialize-error/-/serialize-error-13.0.1.tgz", + "integrity": "sha512-bBZaRwLH9PN5HbLCjPId4dP5bNGEtumcErgOX952IsvOhVPrm3/AeK1y0UHA/QaPG701eg0yEnOKsCOC6X/kaA==", + "license": "MIT", + "dependencies": { + "non-error": "^0.1.0", + "type-fest": "^5.4.1" + }, + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/serialize-error/node_modules/type-fest": { + "version": "5.4.3", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-5.4.3.tgz", + "integrity": "sha512-AXSAQJu79WGc79/3e9/CR77I/KQgeY1AhNvcShIH4PTcGYyC4xv6H4R4AUOwkPS5799KlVDAu8zExeCrkGquiA==", + "license": "(MIT OR CC0-1.0)", + "dependencies": { + "tagged-tag": "^1.0.0" + }, + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/serve-static": { "version": "1.16.3", "resolved": "https://registry.npmjs.org/serve-static/-/serve-static-1.16.3.tgz", @@ -21452,6 +21429,18 @@ "tinyqueue": "^2.0.0" } }, + "node_modules/tagged-tag": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/tagged-tag/-/tagged-tag-1.0.0.tgz", + "integrity": "sha512-yEFYrVhod+hdNyx7g5Bnkkb0G6si8HJurOoOEgC8B/O0uXLHlaey/65KRv6cuWBNhBgHKAROVpc7QyYqE5gFng==", + "license": "MIT", + "engines": { + "node": ">=20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/tdigest": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.2.tgz", diff --git a/package.json b/package.json index ea9e791..554c5a5 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "@map-colonies/js-logger": "^1.0.1", "@map-colonies/openapi-express-viewer": "^3.0.0", "@map-colonies/read-pkg": "0.0.1", - "@map-colonies/schemas": "^1.17.0", + "@map-colonies/schemas": "https://ghatmpstorage.blob.core.windows.net/npm-packages/schemas-36806c74374e95e9771f7673691d2fd33cb2337e.tgz", "@map-colonies/telemetry": "^10.0.1", "@map-colonies/tile-calc": "0.1.3", "@map-colonies/tsconfig": "^1.0.1", @@ -55,7 +55,7 @@ "express": "^4.19.2", "express-openapi-validator": "^5.0.4", "http-status-codes": "^2.2.0", - "pg-boss": "^7.1.0", + "pg-boss": "^12.8.0", "prom-client": "^15.1.2", "reflect-metadata": "^0.1.13", "snake-case": "^3.0.4", diff --git a/src/common/config.ts b/src/common/config.ts index 18b0c0d..2a57336 100644 --- a/src/common/config.ts +++ b/src/common/config.ts @@ -1,8 +1,8 @@ import { type ConfigInstance, config } from '@map-colonies/config'; -import { vectorMetatileQueuePopulatorFullV1, type vectorMetatileQueuePopulatorFullV1Type } from '@map-colonies/schemas'; +import { vectorMetatileQueuePopulatorFullV2, type vectorMetatileQueuePopulatorFullV2Type } from '@map-colonies/schemas'; // Choose here the type of the config instance and import this type from the entire application -type ConfigType = ConfigInstance; +type ConfigType = ConfigInstance; let configInstance: ConfigType | undefined; @@ -13,7 +13,7 @@ let configInstance: ConfigType | undefined; */ async function initConfig(offlineMode?: boolean): Promise { configInstance = await config({ - schema: vectorMetatileQueuePopulatorFullV1, + schema: vectorMetatileQueuePopulatorFullV2, offlineMode, }); } diff --git a/src/common/constants.ts b/src/common/constants.ts index da20b47..ed1302c 100644 --- a/src/common/constants.ts +++ b/src/common/constants.ts @@ -21,5 +21,6 @@ export const ON_SIGNAL = Symbol('onSignal'); export const CONSUME_AND_POPULATE_FACTORY = Symbol('consumeAndPopulateFactory'); export const JOB_QUEUE_PROVIDER = Symbol('JobQueueProvider'); +export const QUEUE_NAMES = Symbol('QueueNames'); export const MILLISECONDS_IN_SECOND = 1000; diff --git a/src/common/interfaces.ts b/src/common/interfaces.ts index c531dc1..83f4196 100644 --- a/src/common/interfaces.ts +++ b/src/common/interfaces.ts @@ -1,5 +1,5 @@ -import { type vectorMetatileQueuePopulatorFullV1Type } from '@map-colonies/schemas'; +import { type vectorMetatileQueuePopulatorFullV2Type } from '@map-colonies/schemas'; -export interface JobInsertConfig extends Partial> { +export interface JobInsertConfig extends Partial> { retryDelay?: number; } diff --git a/src/containerConfig.ts b/src/containerConfig.ts index 590c49d..430fc96 100644 --- a/src/containerConfig.ts +++ b/src/containerConfig.ts @@ -3,17 +3,18 @@ import { trace } from '@opentelemetry/api'; import { Registry } from 'prom-client'; import jsLogger, { Logger } from '@map-colonies/js-logger'; import { InjectionObject, registerDependencies } from '@common/dependencyRegistration'; -import { CONSUME_AND_POPULATE_FACTORY, HEALTHCHECK, JOB_QUEUE_PROVIDER, ON_SIGNAL, SERVICES, SERVICE_NAME } from '@common/constants'; +import { CONSUME_AND_POPULATE_FACTORY, HEALTHCHECK, JOB_QUEUE_PROVIDER, ON_SIGNAL, QUEUE_NAMES, SERVICES, SERVICE_NAME } from '@common/constants'; import { getTracing } from '@common/tracing'; import { ConfigType, getConfig } from '@common/config'; import { CleanupRegistry } from '@map-colonies/cleanup-registry'; import { DependencyContainer, instanceCachingFactory, instancePerContainerCachingFactory, Lifecycle } from 'tsyringe'; -import PgBoss from 'pg-boss'; +import { type PgBoss } from 'pg-boss'; import { PGBOSS_PROVIDER, pgBossFactory } from './tiles/jobQueueProvider/pgbossFactory'; import { TILES_ROUTER_SYMBOL, tilesRouterFactory } from './tiles/routes/tilesRouter'; import { PgBossJobQueueProvider } from './tiles/jobQueueProvider/pgBossJobQueue'; import { TilesManager } from './tiles/models/tilesManager'; import { consumeAndPopulateFactory } from './requestConsumer'; +import { queuesNameFactory, type QueueNames } from './tiles/jobQueueProvider/queuesNameFactory'; export interface RegisterOptions { override?: InjectionObject[]; @@ -71,6 +72,12 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise }), }, }, + { + token: QUEUE_NAMES, + provider: { + useFactory: instanceCachingFactory(queuesNameFactory), + }, + }, { token: TilesManager, provider: { @@ -94,8 +101,12 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise }, postInjectionHook: async (container): Promise => { const pgBoss = container.resolve(PGBOSS_PROVIDER); + const queuesName = container.resolve(QUEUE_NAMES); await pgBoss.start(); + + const promise = Object.values(queuesName).map(async (queue: string) => pgBoss.createQueue(queue)); + await Promise.all(promise); }, }, { token: TILES_ROUTER_SYMBOL, provider: { useFactory: tilesRouterFactory } }, @@ -103,6 +114,18 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise token: JOB_QUEUE_PROVIDER, provider: { useClass: PgBossJobQueueProvider }, options: { lifecycle: Lifecycle.Singleton }, + postInjectionHook: (container): void => { + const queueProv = container.resolve(JOB_QUEUE_PROVIDER); + const cleanupRegistry = container.resolve(SERVICES.CLEANUP_REGISTRY); + cleanupRegistry.register({ + id: JOB_QUEUE_PROVIDER, + func: async () => + new Promise((resolve) => { + queueProv.stopQueue(); + resolve(true); + }), + }); + }, }, { token: HEALTHCHECK, diff --git a/src/instrumentation.mts b/src/instrumentation.mts index b1bd0c2..47b0dcd 100644 --- a/src/instrumentation.mts +++ b/src/instrumentation.mts @@ -4,7 +4,7 @@ import { tracingFactory } from './common/tracing.js'; import { getConfig, initConfig } from './common/config.js'; -await initConfig(); +await initConfig(true); const config = getConfig(); diff --git a/src/requestConsumer.ts b/src/requestConsumer.ts index f36ede3..7af01ef 100644 --- a/src/requestConsumer.ts +++ b/src/requestConsumer.ts @@ -1,5 +1,5 @@ import { Logger } from '@map-colonies/js-logger'; -import PgBoss from 'pg-boss'; +import { type PgBoss } from 'pg-boss'; import { FactoryFunction } from 'tsyringe'; import { JOB_QUEUE_PROVIDER, SERVICES } from './common/constants'; import { PgBossJobQueueProvider } from './tiles/jobQueueProvider/pgBossJobQueue'; @@ -21,9 +21,10 @@ export const consumeAndPopulateFactory: FactoryFunction<() => Promise> = ( if (appConfig.consumeCondition.enabled) { conditionFn = async (): Promise => { - const currentSize = await pgBoss.getQueueSize(tilesManager.tilesQueueName, { before: 'completed' }); - logger.debug({ msg: 'condition function', queueName: tilesManager.tilesQueueName, size: currentSize }); - return currentSize <= (appConfig.consumeCondition.tilesQueueSizeLimit ?? Number.NEGATIVE_INFINITY); + const { totalCount } = await pgBoss.getQueueStats(tilesManager.tilesQueueName); + + logger.debug({ msg: 'condition function', queueName: tilesManager.tilesQueueName, size: totalCount }); + return totalCount <= (appConfig.consumeCondition.tilesQueueSizeLimit ?? Number.NEGATIVE_INFINITY); }; } diff --git a/src/tiles/jobQueueProvider/pgBossJobQueue.ts b/src/tiles/jobQueueProvider/pgBossJobQueue.ts index e3a4d98..530a983 100644 --- a/src/tiles/jobQueueProvider/pgBossJobQueue.ts +++ b/src/tiles/jobQueueProvider/pgBossJobQueue.ts @@ -1,18 +1,17 @@ import { setTimeout as setTimeoutPromise } from 'node:timers/promises'; import { type Logger } from '@map-colonies/js-logger'; -import PgBoss, { JobWithMetadata } from 'pg-boss'; +import { type PgBoss, JobWithMetadata } from 'pg-boss'; import { inject, injectable } from 'tsyringe'; import { type ConfigType } from '@src/common/config'; import { vectorMetatileQueuePopulatorSharedV1Type } from '@map-colonies/schemas'; -import { MILLISECONDS_IN_SECOND, SERVICES } from '../../common/constants'; -import { TILE_REQUEST_QUEUE_NAME_PREFIX } from '../models/constants'; +import { MILLISECONDS_IN_SECOND, QUEUE_NAMES, SERVICES } from '../../common/constants'; import { type ConditionFn, JobQueueProvider } from './intefaces'; import { PGBOSS_PROVIDER } from './pgbossFactory'; +import { type QueueNames } from './queuesNameFactory'; @injectable() export class PgBossJobQueueProvider implements JobQueueProvider { private isRunning = true; - private readonly queueName: string; private readonly queueCheckTimeout: number; private readonly consumeCondition: vectorMetatileQueuePopulatorSharedV1Type['app']['consumeCondition']; @@ -21,31 +20,31 @@ export class PgBossJobQueueProvider implements JobQueueProvider { public constructor( @inject(PGBOSS_PROVIDER) private readonly pgBoss: PgBoss, @inject(SERVICES.CONFIG) config: ConfigType, - @inject(SERVICES.LOGGER) private readonly logger: Logger + @inject(SERVICES.LOGGER) private readonly logger: Logger, + @inject(QUEUE_NAMES) private readonly queuesName: QueueNames ) { const appConfig = config.get('app'); - this.queueName = `${TILE_REQUEST_QUEUE_NAME_PREFIX}-${appConfig.projectName}`; this.queueCheckTimeout = appConfig.requestQueueCheckIntervalSec * MILLISECONDS_IN_SECOND; this.consumeCondition = appConfig.consumeCondition; } public get activeQueueName(): string { - return this.queueName; + return this.queuesName.requestQueue; } public startQueue(): void { - this.logger.debug({ msg: 'starting queue', queueName: this.queueName }); + this.logger.debug({ msg: 'starting queue', queueName: this.activeQueueName }); this.pgBoss.on('error', (err) => this.logger.error({ msg: 'pg-boss error event', err })); - this.isRunning = true; } public stopQueue(): void { - this.logger.debug({ msg: 'stopping queue', queueName: this.queueName }); + this.logger.debug({ msg: 'stopping queue', queueName: this.activeQueueName }); this.isRunning = false; } public async consumeQueue(fn: (job: JobWithMetadata) => Promise, conditionFn?: ConditionFn): Promise { + this.startQueue(); this.logger.info({ msg: 'started consuming queue' }); for await (const job of this.getJobsIterator(conditionFn)) { @@ -65,17 +64,17 @@ export class PgBossJobQueueProvider implements JobQueueProvider { this.logger.debug({ msg: 'job fetched from queue', jobId: job.id }); await fn(job); this.logger.debug({ msg: 'job completed successfully', jobId: job.id }); - await this.pgBoss.complete(job.id); + await this.pgBoss.complete(this.activeQueueName, job.id); } catch (err) { const error = err as Error; this.logger.error({ err: error, jobId: job.id, job }); - await this.pgBoss.fail(job.id, error); + await this.pgBoss.fail(this.activeQueueName, job.id, error); } finally { this.runningJobs--; } } - private async *getJobsIterator(conditionFn?: ConditionFn): AsyncGenerator> { + private async *getJobsIterator(conditionFn?: ConditionFn): AsyncGenerator> { const timeout = this.consumeCondition.enabled ? this.consumeCondition.conditionCheckIntervalSec * MILLISECONDS_IN_SECOND : 0; while (this.isRunning) { @@ -87,17 +86,17 @@ export class PgBossJobQueueProvider implements JobQueueProvider { continue; } - const jobs = await this.pgBoss.fetch(this.queueName, 1, { includeMetadata: true }); + const jobs = await this.pgBoss.fetch(this.activeQueueName, { batchSize: 1, includeMetadata: true }); - if (jobs === null || jobs.length === 0) { - this.logger.info({ msg: 'queue is empty, waiting for data', queueName: this.queueName, timeout: this.queueCheckTimeout }); + if (jobs.length === 0) { + this.logger.info({ msg: 'queue is empty, waiting for data', queueName: this.activeQueueName, timeout: this.queueCheckTimeout }); await setTimeoutPromise(this.queueCheckTimeout); continue; } yield jobs[0]; - this.logger.info({ msg: 'next queue check after timeout', queueName: this.queueName, timeout: this.queueCheckTimeout }); + this.logger.info({ msg: 'next queue check after timeout', queueName: this.activeQueueName, timeout: this.queueCheckTimeout }); await setTimeoutPromise(this.queueCheckTimeout); } } diff --git a/src/tiles/jobQueueProvider/pgbossFactory.ts b/src/tiles/jobQueueProvider/pgbossFactory.ts index 7fc1b2a..230e1df 100644 --- a/src/tiles/jobQueueProvider/pgbossFactory.ts +++ b/src/tiles/jobQueueProvider/pgbossFactory.ts @@ -1,13 +1,13 @@ import { readFileSync } from 'fs'; import { TlsOptions } from 'tls'; import { hostname } from 'os'; -import { vectorMetatileQueuePopulatorFullV1Type } from '@map-colonies/schemas'; -import PgBoss from 'pg-boss'; +import { vectorMetatileQueuePopulatorFullV2Type } from '@map-colonies/schemas'; +import { PgBoss, type ConstructorOptions } from 'pg-boss'; import { SERVICE_NAME } from '@src/common/constants'; -type DbConfig = vectorMetatileQueuePopulatorFullV1Type['db']; +type DbConfig = vectorMetatileQueuePopulatorFullV2Type['db']; -const createDatabaseOptions = (dbConfig: DbConfig): PgBoss.ConstructorOptions => { +const createDatabaseOptions = (dbConfig: DbConfig): ConstructorOptions => { let ssl: TlsOptions | undefined = undefined; const { ssl: inputSsl, username: user, ...dataSourceOptions } = dbConfig; @@ -25,7 +25,11 @@ const createDatabaseOptions = (dbConfig: DbConfig): PgBoss.ConstructorOptions => export const pgBossFactory = (dbConfig: DbConfig): PgBoss => { const databaseOptions = createDatabaseOptions(dbConfig); - return new PgBoss({ ...databaseOptions, noScheduling: true, noSupervisor: true, uuid: 'v4' }); + return new PgBoss({ + ...databaseOptions, + supervise: false, //TODO: check if we can enable supervise + schedule: false, + }); }; export const PGBOSS_PROVIDER = Symbol('PgBoss'); diff --git a/src/tiles/jobQueueProvider/queuesNameFactory.ts b/src/tiles/jobQueueProvider/queuesNameFactory.ts new file mode 100644 index 0000000..c9821a3 --- /dev/null +++ b/src/tiles/jobQueueProvider/queuesNameFactory.ts @@ -0,0 +1,18 @@ +import { ConfigType } from '@src/common/config'; +import { SERVICES } from '@src/common/constants'; +import { FactoryFunction } from 'tsyringe'; +import { TILE_REQUEST_QUEUE_NAME_PREFIX, TILES_QUEUE_NAME_PREFIX } from '../models/constants'; + +export interface QueueNames { + requestQueue: string; + tilesQueue: string; +} + +export const queuesNameFactory: FactoryFunction<{ requestQueue: string; tilesQueue: string }> = (container) => { + const appConfig = container.resolve(SERVICES.CONFIG).get('app'); + + const requestQueue = `${TILE_REQUEST_QUEUE_NAME_PREFIX}-${appConfig.projectName}`; + const tilesQueue = `${TILES_QUEUE_NAME_PREFIX}-${appConfig.projectName}`; + + return { requestQueue, tilesQueue }; +}; diff --git a/src/tiles/models/tilesManager.ts b/src/tiles/models/tilesManager.ts index 31b94ba..ecc2259 100644 --- a/src/tiles/models/tilesManager.ts +++ b/src/tiles/models/tilesManager.ts @@ -2,17 +2,18 @@ import { randomUUID as uuidv4 } from 'crypto'; import { type Logger } from '@map-colonies/js-logger'; import { BoundingBox, boundingBoxToTiles as boundingBoxToTilesGenerator, Tile, tileToBoundingBox } from '@map-colonies/tile-calc'; import { API_STATE } from '@map-colonies/detiler-common'; -import PgBoss, { JobInsert, JobWithMetadata } from 'pg-boss'; +import { type PgBoss, JobInsert, JobWithMetadata } from 'pg-boss'; import { inject, injectable } from 'tsyringe'; import client from 'prom-client'; import booleanIntersects from '@turf/boolean-intersects'; import { Feature } from 'geojson'; import { type ConfigType } from '@src/common/config'; import { snakeCase } from 'snake-case'; -import { SERVICES } from '../../common/constants'; +import { QUEUE_NAMES, SERVICES } from '../../common/constants'; import { JobInsertConfig } from '../../common/interfaces'; import { hashValue } from '../../common/util'; import { PGBOSS_PROVIDER } from '../jobQueueProvider/pgbossFactory'; +import { type QueueNames } from '../jobQueueProvider/queuesNameFactory'; import { Source, TileQueuePayload, TileRequestQueuePayload, TilesByAreaRequest } from './tiles'; import { RequestAlreadyInQueueError } from './errors'; import { TILE_REQUEST_QUEUE_NAME_PREFIX, TILES_QUEUE_NAME_PREFIX } from './constants'; @@ -38,11 +39,12 @@ export class TilesManager { @inject(PGBOSS_PROVIDER) private readonly pgboss: PgBoss, @inject(SERVICES.CONFIG) config: ConfigType, @inject(SERVICES.LOGGER) private readonly logger: Logger, + @inject(QUEUE_NAMES) private readonly queuesName: QueueNames, @inject(SERVICES.METRICS) registry?: client.Registry ) { const appConfig = config.get('app'); - this.requestQueueName = `${TILE_REQUEST_QUEUE_NAME_PREFIX}-${appConfig.projectName}`; - this.tilesQueueName = `${TILES_QUEUE_NAME_PREFIX}-${appConfig.projectName}`; + this.requestQueueName = this.queuesName.requestQueue; + this.tilesQueueName = this.queuesName.tilesQueue; this.batchSize = appConfig.tilesBatchSize; this.metatile = appConfig.metatileSize; this.shouldForceApiTiles = appConfig.force.api; @@ -72,8 +74,8 @@ export class TilesManager { name: `metatile_queue_populator_${queue.type}_queue_current_count`, help: `The number of jobs currently in the ${queue.type} queue`, async collect(): Promise { - const currentQueueSize = await self.pgboss.getQueueSize(queue.name); - this.set(currentQueueSize); + const { totalCount } = await self.pgboss.getQueueStats(queue.name); + this.set(totalCount); }, registers: [registry], }); @@ -130,10 +132,12 @@ export class TilesManager { }; const key = hashValue(payload); + //TODO: fix typescript error + const singletonSeconds = this.baseQueueConfig.expireInSeconds as number; this.logger.debug({ msg: 'pushing payload to queue', queueName: this.requestQueueName, key, payload, itemCount: payload.items.length }); - const res = await this.pgboss.sendOnce(this.requestQueueName, payload, { ...this.baseQueueConfig }, key); + const res = await this.pgboss.send(this.requestQueueName, payload, { ...this.baseQueueConfig, singletonKey: key, singletonSeconds }); if (res === null) { this.logger.error({ msg: 'request already in queue', queueName: this.requestQueueName, key, payload }); @@ -148,14 +152,13 @@ export class TilesManager { const tileJobsArr = tiles.map((tile) => ({ ...this.baseQueueConfig, - name: this.tilesQueueName, data: { ...tile, parent: requestId, state: API_STATE, force: this.shouldForceApiTiles ? this.shouldForceApiTiles : force }, })); await this.populateTilesQueue(tileJobsArr, 'api'); } public async isAlive(): Promise { - await this.pgboss.getQueueSize(this.requestQueueName); + await this.pgboss.getQueueStats(this.requestQueueName); } public async handleTileRequest(job: JobWithMetadata): Promise { @@ -165,7 +168,7 @@ export class TilesManager { jobId: job.id, itemCount: job.data.items.length, source: job.data.source, - retryCount: job.retrycount, + retryCount: job.retryCount, retryLimit: this.baseQueueConfig.retryLimit, state: job.data.state, isForced: job.data.force, @@ -185,7 +188,7 @@ export class TilesManager { fetchTimerEnd(); } - this.requestsHandledCounter?.inc({ source: job.data.source, retrycount: job.retrycount }); + this.requestsHandledCounter?.inc({ source: job.data.source, retrycount: job.retryCount }); } private async handleApiTileRequest(job: JobWithMetadata): Promise { @@ -208,8 +211,7 @@ export class TilesManager { continue; } } - - tileArr.push({ ...this.baseQueueConfig, name: this.tilesQueueName, data: { ...tile, parent: id, state, force: isTileForced } }); + tileArr.push({ ...this.baseQueueConfig, data: { ...tile, parent: id, state, force: isTileForced } }); if (tileArr.length >= this.batchSize) { await this.populateTilesQueue(tileArr, 'api'); tileArr = []; @@ -238,7 +240,6 @@ export class TilesManager { for await (const tile of tilesGenerator) { tileMap.set(stringifyTile(tile), { ...this.baseQueueConfig, - name: this.tilesQueueName, data: { ...tile, parent: id, state, force: isTileForced }, }); if (tileMap.size >= this.batchSize) { @@ -257,7 +258,7 @@ export class TilesManager { private async populateTilesQueue(tiles: JobInsert[], source: Source): Promise { this.logger.info({ msg: 'populating tiles queue', queueName: this.tilesQueueName, itemCount: tiles.length, source }); - await this.pgboss.insert(tiles); + await this.pgboss.insert(this.tilesQueueName, tiles); tiles.forEach((tile) => this.metatilesPopulatedCounter?.inc({ source, z: tile.data?.z })); this.requestBatchesHandledCounter?.inc({ source }); diff --git a/tests/configurations/integration/jest.config.js b/tests/configurations/integration/jest.config.js index 5bbad89..7e1e6ba 100644 --- a/tests/configurations/integration/jest.config.js +++ b/tests/configurations/integration/jest.config.js @@ -6,7 +6,7 @@ module.exports = { transform: { '^.+\\.(t|j)s$': ['@swc/jest'], }, - transformIgnorePatterns: ['/node_modules/(?!(?:@turf|kdbush|geokdbush|tinyqueue)/)'], + transformIgnorePatterns: ['/node_modules/(?!(?:@turf|kdbush|geokdbush|tinyqueue|pg-boss|serialize-error|non-error)/)'], coverageReporters: ['text', 'html'], moduleNameMapper: pathsToModuleNameMapper(compilerOptions.paths, { prefix: '/' }), collectCoverage: true, @@ -22,6 +22,7 @@ module.exports = { coverageDirectory: '/coverage/integration', rootDir: '../../../.', testMatch: ['/tests/integration/**/*.spec.ts'], + modulePathIgnorePatterns: ['/dist/'], setupFiles: ['/tests/configurations/jest.setup.ts'], setupFilesAfterEnv: [ 'jest-openapi', diff --git a/tests/configurations/jest.unit.setup.ts b/tests/configurations/jest.unit.setup.ts new file mode 100644 index 0000000..7e785a0 --- /dev/null +++ b/tests/configurations/jest.unit.setup.ts @@ -0,0 +1,12 @@ +import 'reflect-metadata'; + +jest.mock('pg-boss', () => { + class PgBossMock {} + return { + /* eslint-disable @typescript-eslint/naming-convention */ + __esModule: true, + PgBoss: PgBossMock, + /* eslint-enable @typescript-eslint/naming-convention */ + default: PgBossMock, + }; +}); diff --git a/tests/configurations/unit/jest.config.js b/tests/configurations/unit/jest.config.js index 39ac592..ff68988 100644 --- a/tests/configurations/unit/jest.config.js +++ b/tests/configurations/unit/jest.config.js @@ -6,9 +6,10 @@ module.exports = { transform: { '^.+\\.(t|j)s$': ['@swc/jest'], }, - transformIgnorePatterns: ['/node_modules/(?!(?:@turf|kdbush|geokdbush|tinyqueue)/)'], + transformIgnorePatterns: ['/node_modules/(?!(?:@turf|kdbush|geokdbush|tinyqueue|pg-boss|serialize-error|non-error)/)'], moduleNameMapper: pathsToModuleNameMapper(compilerOptions.paths, { prefix: '/' }), testMatch: ['/tests/unit/**/*.spec.ts'], + modulePathIgnorePatterns: ['/dist/'], coverageReporters: ['text', 'html'], collectCoverage: true, collectCoverageFrom: [ @@ -27,7 +28,7 @@ module.exports = { ['jest-html-reporters', { multipleReportsUnitePath: './reports', pageTitle: 'unit', publicPath: './reports', filename: 'unit.html' }], ], rootDir: '../../../.', - setupFiles: ['/tests/configurations/jest.setup.ts'], + setupFiles: ['/tests/configurations/jest.unit.setup.ts'], setupFilesAfterEnv: ['/tests/matchers.js', '/tests/configurations/jest.setupAfterEnv.js'], testEnvironment: 'node', coverageThreshold: { diff --git a/tests/helpers/constants.ts b/tests/helpers/constants.ts new file mode 100644 index 0000000..9604207 --- /dev/null +++ b/tests/helpers/constants.ts @@ -0,0 +1,7 @@ +import { QueueNames } from '@src/tiles/jobQueueProvider/queuesNameFactory'; + +export const queueNames: QueueNames & { [x: string]: string } = { + requestTestQueue: 'tiles-requests-test-requests', + requestQueue: 'tiles-requests-test', + tilesQueue: 'tiles-test', +}; diff --git a/tests/integration/docs/docs.spec.ts b/tests/integration/docs/docs.spec.ts index c8f5b9d..26d534e 100644 --- a/tests/integration/docs/docs.spec.ts +++ b/tests/integration/docs/docs.spec.ts @@ -5,6 +5,7 @@ import { DependencyContainer } from 'tsyringe'; import httpStatusCodes from 'http-status-codes'; import { getApp } from '@src/app'; import { SERVICES } from '@src/common/constants'; +import { PGBOSS_PROVIDER } from '@src/tiles/jobQueueProvider/pgbossFactory'; import { DocsRequestSender } from './helpers/docsRequestSender'; describe('docs', function () { @@ -16,6 +17,10 @@ describe('docs', function () { override: [ { token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } }, { token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } }, + { + token: PGBOSS_PROVIDER, + provider: { useValue: { start: jest.fn().mockResolvedValue(undefined), createQueue: jest.fn().mockResolvedValue(undefined) } }, + }, ], useChild: true, }); diff --git a/tests/integration/tiles/tiles.spec.ts b/tests/integration/tiles/tiles.spec.ts index 920fb3b..1959a25 100644 --- a/tests/integration/tiles/tiles.spec.ts +++ b/tests/integration/tiles/tiles.spec.ts @@ -5,14 +5,15 @@ import { trace } from '@opentelemetry/api'; import { CleanupRegistry } from '@map-colonies/cleanup-registry'; import httpStatusCodes from 'http-status-codes'; import { DependencyContainer } from 'tsyringe'; -import PgBoss from 'pg-boss'; +import { type PgBoss, JobWithMetadata } from 'pg-boss'; import { Tile } from '@map-colonies/tile-calc'; -import { type vectorMetatileQueuePopulatorFullV1Type } from '@map-colonies/schemas'; +import { type vectorMetatileQueuePopulatorFullV2Type } from '@map-colonies/schemas'; import { type FeatureCollection } from 'geojson'; import { bbox } from '@turf/turf'; +import { queueNames } from '@tests/helpers/constants'; import { ConfigType, getConfig } from '../../../src/common/config'; import { getApp } from '../../../src/app'; -import { CONSUME_AND_POPULATE_FACTORY, JOB_QUEUE_PROVIDER, SERVICES } from '../../../src/common/constants'; +import { CONSUME_AND_POPULATE_FACTORY, JOB_QUEUE_PROVIDER, QUEUE_NAMES, SERVICES } from '../../../src/common/constants'; import { PgBossJobQueueProvider } from '../../../src/tiles/jobQueueProvider/pgBossJobQueue'; import { consumeAndPopulateFactory } from '../../../src/requestConsumer'; import { BAD_FEATURE, BBOX1, BBOX2, GOOD_FEATURE, GOOD_LARGE_FEATURE } from '../../helpers/samples'; @@ -21,11 +22,11 @@ import { PGBOSS_PROVIDER } from '../../../src/tiles/jobQueueProvider/pgbossFacto import { TilesRequestSender } from './helpers/requestSender'; import { getBbox } from './helpers/generator'; -async function waitForJobToBeResolved(boss: PgBoss, jobId: string): Promise { +async function waitForJobToBeResolved(boss: PgBoss, queueName: string, jobId: string): Promise | null> { // eslint-disable-next-line @typescript-eslint/naming-convention, @typescript-eslint/no-unused-vars for await (const _unused of setIntervalPromise(10)) { - const job = await boss.getJobById(jobId); - if (job?.completedon) { + const [job] = await boss.findJobs(queueName, { id: jobId }); + if (job.completedOn) { return job; } } @@ -57,7 +58,7 @@ describe('tiles', function () { ...config.get('app'), projectName: 'app-api', enableRequestQueueHandling: false, - } satisfies Partial; + } satisfies Partial; } return config.get(key); }) as ConfigType['get'], @@ -70,6 +71,7 @@ describe('tiles', function () { }, { token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } }, { token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } }, + { token: QUEUE_NAMES, provider: { useValue: queueNames } }, ], }); container = depContainer; @@ -334,7 +336,7 @@ describe('tiles', function () { describe('POST /tiles/area', function () { it('should return 500 if the queue is not available', async function () { const boss = container.resolve(PGBOSS_PROVIDER); - jest.spyOn(boss, 'sendOnce').mockRejectedValueOnce(new Error('failed')); + jest.spyOn(boss, 'send').mockRejectedValueOnce(new Error('failed')); const bbox = getBbox(); @@ -378,7 +380,7 @@ describe('tiles', function () { consumeCondition: { enabled: false, }, - } satisfies Partial; + } satisfies Partial; } return config.get(key); }) as ConfigType['get'], @@ -391,6 +393,7 @@ describe('tiles', function () { }, { token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } }, { token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } }, + { token: QUEUE_NAMES, provider: { useValue: queueNames } }, ], }); container = depContainer; @@ -398,7 +401,7 @@ describe('tiles', function () { beforeEach(async function () { const boss = container.resolve(PGBOSS_PROVIDER); - await boss.clearStorage(); + await boss.deleteAllJobs(); }); afterAll(async function () { @@ -426,17 +429,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 14); + const result = await boss.fetch('tiles-test-requests', { batchSize: 14 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39177, y: 10594, z: 18, metatile: 8 }, { x: 39176, y: 10594, z: 18, metatile: 8 }, { x: 39176, y: 10595, z: 18, metatile: 8 }, @@ -476,17 +482,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 14); + const result = await boss.fetch('tiles-test-requests', { batchSize: 14 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39177, y: 10594, z: 18, metatile: 8, force: true, state: 100 }, { x: 39176, y: 10594, z: 18, metatile: 8, force: true, state: 100 }, { x: 39176, y: 10595, z: 18, metatile: 8, force: true, state: 100 }, @@ -524,17 +533,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 14); + const result = await boss.fetch('tiles-test-requests', { batchSize: 14 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39177, y: 10594, z: 18, metatile: 8 }, { x: 39176, y: 10594, z: 18, metatile: 8 }, { x: 39176, y: 10595, z: 18, metatile: 8 }, @@ -573,17 +585,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 14); + const result = await boss.fetch('tiles-test-requests', { batchSize: 14 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39177, y: 10594, z: 18, metatile: 8, force: true }, { x: 39176, y: 10594, z: 18, metatile: 8, force: true }, { x: 39176, y: 10595, z: 18, metatile: 8, force: true }, @@ -621,17 +636,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 14); + const result = await boss.fetch('tiles-test-requests', { batchSize: 14 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39177, y: 10594, z: 18, metatile: 8 }, { x: 39176, y: 10594, z: 18, metatile: 8 }, { x: 39176, y: 10595, z: 18, metatile: 8 }, @@ -674,17 +692,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 15); + const result = await boss.fetch('tiles-test-requests', { batchSize: 15 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39176, y: 10600, z: 18, metatile: 8 }, { x: 39177, y: 10594, z: 18, metatile: 8 }, { x: 39176, y: 10594, z: 18, metatile: 8 }, @@ -726,9 +747,9 @@ describe('tiles', function () { const jobId1 = await boss.send('tiles-requests-test-requests', geojsonRequest); - await waitForJobToBeResolved(boss, jobId1 as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId1 as string); - const geojsonResult = await boss.fetch('tiles-test-requests', 1000); + const geojsonResult = await boss.fetch('tiles-test-requests', { batchSize: 1000 }); const bboxRequest = { items: [ @@ -743,17 +764,17 @@ describe('tiles', function () { const jobId2 = await boss.send('tiles-requests-test-requests', bboxRequest); - await waitForJobToBeResolved(boss, jobId2 as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId2 as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const bboxResult = await boss.fetch('tiles-test-requests', 1000); + const bboxResult = await boss.fetch('tiles-test-requests', { batchSize: 1000 }); expect(geojsonResult).not.toBeNull(); expect(bboxResult).not.toBeNull(); - expect(geojsonResult!.length).toBeLessThan(bboxResult!.length); + expect(geojsonResult.length).toBeLessThan(bboxResult.length); }); }); @@ -782,7 +803,7 @@ describe('tiles', function () { conditionCheckIntervalSec: 1, tilesQueueSizeLimit: 2, }, - } satisfies Partial; + } satisfies Partial; } return config.get(key); }) as ConfigType['get'], @@ -795,6 +816,7 @@ describe('tiles', function () { }, { token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } }, { token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } }, + { token: QUEUE_NAMES, provider: { useValue: queueNames } }, ], }); container = depContainer; @@ -802,7 +824,7 @@ describe('tiles', function () { beforeEach(async function () { const boss = container.resolve(PGBOSS_PROVIDER); - await boss.clearStorage(); + await boss.deleteAllJobs(); }); afterAll(async function () { @@ -830,9 +852,9 @@ describe('tiles', function () { const jobId1 = await boss.send('tiles-requests-test-requests', request1); - await waitForJobToBeResolved(boss, jobId1 as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId1 as string); - const geojsonResult = await boss.fetch('tiles-test-requests', 5000); + const geojsonResult = await boss.fetch('tiles-test-requests', { batchSize: 5000 }); const bboxRequest = { items: [ @@ -853,11 +875,11 @@ describe('tiles', function () { await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const bboxResult = await boss.fetch('tiles-test-requests', 1000); + const bboxResult = await boss.fetch('tiles-test-requests', { batchSize: 1000 }); expect(geojsonResult).not.toBeNull(); - expect(bboxResult).toBeNull(); - const currentRequestsQueueSize = await boss.getQueueSize('tiles-requests-test-requests'); + expect(bboxResult).toEqual([]); + const { queuedCount: currentRequestsQueueSize } = await boss.getQueueStats('tiles-requests-test-requests'); expect(currentRequestsQueueSize).toBe(1); }); }); @@ -889,7 +911,7 @@ describe('tiles', function () { api: true, expiredTiles: true, }, - } satisfies Partial; + } satisfies Partial; } return config.get(key); }) as ConfigType['get'], @@ -902,6 +924,7 @@ describe('tiles', function () { }, { token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } }, { token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } }, + { token: QUEUE_NAMES, provider: { useValue: queueNames } }, ], }); container = depContainer; @@ -909,7 +932,7 @@ describe('tiles', function () { beforeEach(async function () { const boss = container.resolve(PGBOSS_PROVIDER); - await boss.clearStorage(); + await boss.deleteAllJobs(); }); afterAll(async function () { @@ -938,17 +961,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 14); + const result = await boss.fetch('tiles-test-requests', { batchSize: 14 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39177, y: 10594, z: 18, metatile: 8, force: true, state: 100 }, { x: 39176, y: 10594, z: 18, metatile: 8, force: true, state: 100 }, { x: 39176, y: 10595, z: 18, metatile: 8, force: true, state: 100 }, @@ -986,17 +1012,20 @@ describe('tiles', function () { const jobId = await boss.send('tiles-requests-test-requests', request); - await waitForJobToBeResolved(boss, jobId as string); + await waitForJobToBeResolved(boss, 'tiles-requests-test-requests', jobId as string); provider.stopQueue(); await expect(consumeAndPopulatePromise).resolves.not.toThrow(); - const result = await boss.fetch('tiles-test-requests', 14); + const result = await boss.fetch('tiles-test-requests', { batchSize: 14 }); expect(result).not.toBeNull(); - await boss.complete(result!.map((r) => r.id)); - expect(result!.map((r) => r.data)).toContainSameTiles([ + await boss.complete( + 'tiles-test-requests', + result.map((r) => r.id) + ); + expect(result.map((r) => r.data)).toContainSameTiles([ { x: 39177, y: 10594, z: 18, metatile: 8, force: true }, { x: 39176, y: 10594, z: 18, metatile: 8, force: true }, { x: 39176, y: 10595, z: 18, metatile: 8, force: true }, diff --git a/tests/unit/tiles/jobQueueProvider/queuesNameFactory.spec.ts b/tests/unit/tiles/jobQueueProvider/queuesNameFactory.spec.ts new file mode 100644 index 0000000..95b83f9 --- /dev/null +++ b/tests/unit/tiles/jobQueueProvider/queuesNameFactory.spec.ts @@ -0,0 +1,22 @@ +import { container } from 'tsyringe'; +import { ConfigType } from '@src/common/config'; +import { SERVICES } from '@src/common/constants'; +import { queuesNameFactory } from '@src/tiles/jobQueueProvider/queuesNameFactory'; + +describe('queuesNameFactory', () => { + it('should create queue names from project name', () => { + const childContainer = container.createChildContainer(); + const configMock = { + get: jest.fn().mockReturnValue({ projectName: 'demo' }), + } as unknown as ConfigType; + + childContainer.registerInstance(SERVICES.CONFIG, configMock); + + const names = queuesNameFactory(childContainer); + + expect(names).toEqual({ + requestQueue: 'tiles-requests-demo', + tilesQueue: 'tiles-demo', + }); + }); +}); diff --git a/tests/unit/tiles/models/pgBossJobQueue.spec.ts b/tests/unit/tiles/models/pgBossJobQueue.spec.ts index 622ca42..2aab509 100644 --- a/tests/unit/tiles/models/pgBossJobQueue.spec.ts +++ b/tests/unit/tiles/models/pgBossJobQueue.spec.ts @@ -1,7 +1,8 @@ import { setTimeout as setTimeoutPromise } from 'node:timers/promises'; import jsLogger from '@map-colonies/js-logger'; -import PgBoss from 'pg-boss'; +import { type PgBoss } from 'pg-boss'; import { ConfigType } from '@src/common/config'; +import { queueNames } from '@tests/helpers/constants'; import { PgBossJobQueueProvider } from '../../../../src/tiles/jobQueueProvider/pgBossJobQueue'; describe('PgBossJobQueueProvider', () => { @@ -9,9 +10,10 @@ describe('PgBossJobQueueProvider', () => { let configMock: ConfigType; let pgbossMock: { on: jest.Mock; + createQueue: jest.Mock; start: jest.Mock; stop: jest.Mock; - getQueueSize: jest.Mock; + getQueueStats: jest.Mock; complete: jest.Mock; fail: jest.Mock; fetch: jest.Mock; @@ -20,9 +22,10 @@ describe('PgBossJobQueueProvider', () => { beforeAll(() => { pgbossMock = { on: jest.fn(), + createQueue: jest.fn(), start: jest.fn(), stop: jest.fn(), - getQueueSize: jest.fn(), + getQueueStats: jest.fn(), complete: jest.fn(), fail: jest.fn(), fetch: jest.fn(), @@ -35,7 +38,10 @@ describe('PgBossJobQueueProvider', () => { return { projectName: 'queue-name', requestQueueCheckIntervalSec: 0.1, - consumeCondition: 0.2, + consumeCondition: { + enabled: true, + conditionCheckIntervalSec: 0.2, + }, }; default: break; @@ -49,7 +55,7 @@ describe('PgBossJobQueueProvider', () => { }); beforeEach(function () { - provider = new PgBossJobQueueProvider(pgbossMock as unknown as PgBoss, configMock, jsLogger({ enabled: false })); + provider = new PgBossJobQueueProvider(pgbossMock as unknown as PgBoss, configMock, jsLogger({ enabled: false }), queueNames); }); afterEach(function () { @@ -58,7 +64,7 @@ describe('PgBossJobQueueProvider', () => { describe('#activeQueueName', () => { it('should return the queue name', () => { - expect(provider.activeQueueName).toBe('tiles-requests-queue-name'); + expect(provider.activeQueueName).toBe(queueNames.requestQueue); }); }); @@ -68,7 +74,7 @@ describe('PgBossJobQueueProvider', () => { const job2 = [{ id: 'id2', data: { key: 'value' } }]; const fnMock = jest.fn(); - pgbossMock.fetch.mockResolvedValueOnce(job1).mockResolvedValueOnce(job2).mockResolvedValue(null); + pgbossMock.fetch.mockResolvedValueOnce(job1).mockResolvedValueOnce(job2).mockResolvedValue([]); provider.startQueue(); const queuePromise = provider.consumeQueue(fnMock); await setTimeoutPromise(1000); @@ -87,7 +93,7 @@ describe('PgBossJobQueueProvider', () => { const job3 = [{ id: 'id3', data: { key: 'value' } }]; const fnMock = jest.fn(); - pgbossMock.fetch.mockResolvedValueOnce(job1).mockResolvedValueOnce(job2).mockResolvedValueOnce(job3).mockResolvedValueOnce(null); + pgbossMock.fetch.mockResolvedValueOnce(job1).mockResolvedValueOnce(job2).mockResolvedValueOnce(job3).mockResolvedValueOnce([]); const conditionFnMock = jest.fn(); conditionFnMock.mockReturnValueOnce(true).mockReturnValueOnce(true).mockReturnValueOnce(false); @@ -120,7 +126,7 @@ describe('PgBossJobQueueProvider', () => { await expect(queuePromise).resolves.not.toThrow(); expect(pgbossMock.complete).not.toHaveBeenCalled(); - expect(pgbossMock.fail).toHaveBeenCalledWith(id, fetchError); + expect(pgbossMock.fail).toHaveBeenCalledWith(queueNames.requestQueue, id, fetchError); }); }); }); diff --git a/tests/unit/tiles/models/tilesManager.spec.ts b/tests/unit/tiles/models/tilesManager.spec.ts index b2a746d..bd29b60 100644 --- a/tests/unit/tiles/models/tilesManager.spec.ts +++ b/tests/unit/tiles/models/tilesManager.spec.ts @@ -1,12 +1,13 @@ import jsLogger from '@map-colonies/js-logger'; import { faker } from '@faker-js/faker'; import { Tile } from '@map-colonies/tile-calc'; -import PgBoss from 'pg-boss'; +import { type JobInsert, type JobWithMetadata, type PgBoss } from 'pg-boss'; import client from 'prom-client'; import { bbox } from '@turf/turf'; import { FeatureCollection } from 'geojson'; import { API_STATE } from '@map-colonies/detiler-common'; import { ConfigType, getConfig } from '@src/common/config'; +import { queueNames } from '@tests/helpers/constants'; import { RequestAlreadyInQueueError } from '../../../../src/tiles/models/errors'; import { TileRequestQueuePayload, TilesByAreaRequest } from '../../../../src/tiles/models/tiles'; import { TilesManager } from '../../../../src/tiles/models/tilesManager'; @@ -15,7 +16,7 @@ import { boundingBoxToPolygon } from '../../../../src/tiles/models/util'; import { hashValue } from '../../../../src/common/util'; const logger = jsLogger({ enabled: false }); -const queueConfig = { retryDelay: 1 }; +const queueConfig = { retryDelay: 1, expireInSeconds: 60 }; describe('tilesManager', () => { let config: ConfigType; @@ -58,8 +59,8 @@ describe('tilesManager', () => { describe('#addBboxTilesRequestToQueue', () => { it('resolve without error if request is a valid bbox', async function () { - const sendOnceMock = jest.fn().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockResolvedValue('ok'); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const request: TilesByAreaRequest[] = [{ area: [90, 90, -90, -90], minZoom: 0, maxZoom: 0 }]; const expectedPayload: TileRequestQueuePayload = { items: [{ area: { west: 90, south: 90, east: -90, north: -90 }, minZoom: 0, maxZoom: 0 }], @@ -70,24 +71,32 @@ describe('tilesManager', () => { const resource = tilesManager.addArealTilesRequestToQueue(request); await expect(resource).resolves.not.toThrow(); - expect(sendOnceMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, queueConfig, hashValue(expectedPayload)); + expect(sendMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, { + ...queueConfig, + singletonKey: hashValue(expectedPayload), + singletonSeconds: queueConfig.expireInSeconds, + }); }); it('resolve without error if request is a valid geojson', async function () { - const sendOnceMock = jest.fn().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockResolvedValue('ok'); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const request: TilesByAreaRequest[] = [{ area: GOOD_FEATURE, minZoom: 0, maxZoom: 0 }]; const expectedPayload: TileRequestQueuePayload = { items: [{ area: GOOD_FEATURE, minZoom: 0, maxZoom: 0 }], source: 'api', state: API_STATE }; const resource = tilesManager.addArealTilesRequestToQueue(request); await expect(resource).resolves.not.toThrow(); - expect(sendOnceMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, queueConfig, hashValue(expectedPayload)); + expect(sendMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, { + ...queueConfig, + singletonKey: hashValue(expectedPayload), + singletonSeconds: queueConfig.expireInSeconds, + }); }); it('resolve without error if request is a mix of valid bbox and geojson', async function () { - const sendOnceMock = jest.fn().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockResolvedValue('ok'); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const request: TilesByAreaRequest[] = [ { area: GOOD_FEATURE, minZoom: 0, maxZoom: 0 }, { area: [90, 90, -90, -90], minZoom: 0, maxZoom: 0 }, @@ -104,12 +113,16 @@ describe('tilesManager', () => { const resource = tilesManager.addArealTilesRequestToQueue(request); await expect(resource).resolves.not.toThrow(); - expect(sendOnceMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, queueConfig, hashValue(expectedPayload)); + expect(sendMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, { + ...queueConfig, + singletonKey: hashValue(expectedPayload), + singletonSeconds: queueConfig.expireInSeconds, + }); }); it('resolve without error if request is a mix of valid bbox and feature geojson, bbox and featureCollection geojson', async function () { - const sendOnceMock = jest.fn().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockResolvedValue('ok'); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const featureCollection: FeatureCollection = { type: 'FeatureCollection', features: [GOOD_FEATURE, GOOD_FEATURE], @@ -133,12 +146,16 @@ describe('tilesManager', () => { const resource = tilesManager.addArealTilesRequestToQueue(request); await expect(resource).resolves.not.toThrow(); - expect(sendOnceMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, queueConfig, hashValue(expectedPayload)); + expect(sendMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, { + ...queueConfig, + singletonKey: hashValue(expectedPayload), + singletonSeconds: queueConfig.expireInSeconds, + }); }); it('resolve without error if request is a mix with force attibute', async function () { - const sendOnceMock = jest.fn().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockResolvedValue('ok'); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const featureCollection: FeatureCollection = { type: 'FeatureCollection', features: [GOOD_FEATURE, GOOD_FEATURE], @@ -163,7 +180,11 @@ describe('tilesManager', () => { const resource = tilesManager.addArealTilesRequestToQueue(request, true); await expect(resource).resolves.not.toThrow(); - expect(sendOnceMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, queueConfig, hashValue(expectedPayload)); + expect(sendMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, { + ...queueConfig, + singletonKey: hashValue(expectedPayload), + singletonSeconds: queueConfig.expireInSeconds, + }); }); it('resolve without error if request is a mix with force attribute if app is configured so', async function () { @@ -190,8 +211,8 @@ describe('tilesManager', () => { }), }; - const sendOnceMock = jest.fn().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockResolvedValue('ok'); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const featureCollection: FeatureCollection = { type: 'FeatureCollection', features: [GOOD_FEATURE, GOOD_FEATURE], @@ -216,12 +237,16 @@ describe('tilesManager', () => { const resource = tilesManager.addArealTilesRequestToQueue(request); await expect(resource).resolves.not.toThrow(); - expect(sendOnceMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, queueConfig, hashValue(expectedPayload)); + expect(sendMock).toHaveBeenCalledWith('tiles-requests-test', expectedPayload, { + ...queueConfig, + singletonKey: hashValue(expectedPayload), + singletonSeconds: queueConfig.expireInSeconds, + }); }); it('should throw RequestAlreadyInQueueError if the request is the same', async function () { - const sendOnceMock = jest.fn().mockResolvedValue(null); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockResolvedValue(null); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const request: TilesByAreaRequest[] = [{ area: [90, 90, -90, -90], minZoom: 0, maxZoom: 0 }]; const resource = tilesManager.addArealTilesRequestToQueue(request); @@ -230,8 +255,8 @@ describe('tilesManager', () => { }); it('should throw the error thrown by pg-boss', async function () { - const sendOnceMock = jest.fn().mockRejectedValue(new Error('test')); - const tilesManager = new TilesManager({ sendOnce: sendOnceMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const sendMock = jest.fn().mockRejectedValue(new Error('test')); + const tilesManager = new TilesManager({ send: sendMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const request: TilesByAreaRequest[] = [{ area: [90, 90, -90, -90], minZoom: 0, maxZoom: 0 }]; const resource = tilesManager.addArealTilesRequestToQueue(request); @@ -242,8 +267,14 @@ describe('tilesManager', () => { describe('#isAlive', () => { it('should resolve without error', async function () { - const getQueueSizeMock = jest.fn().mockResolvedValue(0); - const tilesManager = new TilesManager({ getQueueSize: getQueueSizeMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const getQueueStatsMock = jest.fn().mockResolvedValue({ totalCount: 0 }); + const tilesManager = new TilesManager( + { getQueueStats: getQueueStatsMock } as unknown as PgBoss, + configMock, + logger, + queueNames, + new client.Registry() + ); const resource = tilesManager.isAlive(); @@ -251,8 +282,14 @@ describe('tilesManager', () => { }); it('should throw the error thrown by pg-boss', async function () { - const getQueueSizeMock = jest.fn().mockRejectedValue(new Error('test')); - const tilesManager = new TilesManager({ getQueueSize: getQueueSizeMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const getQueueStatsMock = jest.fn().mockRejectedValue(new Error('test')); + const tilesManager = new TilesManager( + { getQueueStats: getQueueStatsMock } as unknown as PgBoss, + configMock, + logger, + queueNames, + new client.Registry() + ); const isAlivePromise = tilesManager.isAlive(); @@ -262,19 +299,19 @@ describe('tilesManager', () => { describe('#addTilesToQueue', () => { it('should insert the tiles into the queue', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.addTilesToQueue([ { x: 9794, y: 2650, z: 16, metatile: 8 }, { x: 39176, y: 10600, z: 18, metatile: 8 }, { x: 19588, y: 5300, z: 17, metatile: 8 }, - ]) as unknown as PgBoss.JobWithDoneCallback; + ]); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args.map((job) => job.data)).toContainSameTiles([ { x: 9794, y: 2650, z: 16, metatile: 8 }, { x: 39176, y: 10600, z: 18, metatile: 8 }, @@ -283,8 +320,8 @@ describe('tilesManager', () => { }); it('should insert the tiles into the queue with force attribute if requst is configured so', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.addTilesToQueue( [ @@ -293,12 +330,12 @@ describe('tilesManager', () => { { x: 19588, y: 5300, z: 17, metatile: 8 }, ], true - ) as unknown as PgBoss.JobWithDoneCallback; + ); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args.map((job) => job.data)).toContainSameTiles([ { x: 9794, y: 2650, z: 16, metatile: 8, force: true }, { x: 39176, y: 10600, z: 18, metatile: 8, force: true }, @@ -330,19 +367,19 @@ describe('tilesManager', () => { }), }; - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.addTilesToQueue([ { x: 9794, y: 2650, z: 16, metatile: 8 }, { x: 39176, y: 10600, z: 18, metatile: 8 }, { x: 19588, y: 5300, z: 17, metatile: 8 }, - ]) as unknown as PgBoss.JobWithDoneCallback; + ]); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args.map((job) => job.data)).toContainSameTiles([ { x: 9794, y: 2650, z: 16, metatile: 8, force: true }, { x: 39176, y: 10600, z: 18, metatile: 8, force: true }, @@ -351,18 +388,18 @@ describe('tilesManager', () => { }); it('should add the same parent id to all the tiles added', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.addTilesToQueue([ { x: 9794, y: 2650, z: 16, metatile: 8 }, { x: 39176, y: 10600, z: 18, metatile: 8 }, - ]) as unknown as PgBoss.JobWithDoneCallback; + ]); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args[0].data?.parent).toBeDefined(); expect(args[0].data?.parent).toBe(args[1].data?.parent); @@ -372,8 +409,8 @@ describe('tilesManager', () => { describe('#handleTileRequest', () => { describe('api requests', () => { it('should insert the tile generated by a bbox request to the queue', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -386,24 +423,25 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8 }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: undefined, state: undefined, force: undefined }, }, ]); }); it('should insert the tile generated by a bbox request to the queue with force attribute if request is configured so', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -417,17 +455,18 @@ describe('tilesManager', () => { source: 'api', force: true, }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, force: true }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, force: true, parent: undefined, state: undefined }, }, ]); }); @@ -456,8 +495,8 @@ describe('tilesManager', () => { }), }; - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -470,24 +509,25 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, force: true }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, force: true, parent: undefined, state: undefined }, }, ]); }); it('should insert the tile generated by bbox and geojson request to the queue', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -505,13 +545,13 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(2); - const firstCallArgs = insertMock.mock.calls[0][0]; - const secondCallArgs = insertMock.mock.calls[1][0]; + const firstCallArgs = insertMock.mock.calls[0][1] as JobInsert[]; + const secondCallArgs = insertMock.mock.calls[1][1] as JobInsert[]; expect(firstCallArgs).toHaveLength(10); expect(secondCallArgs).toHaveLength(5); @@ -535,8 +575,8 @@ describe('tilesManager', () => { }); it('should insert the tile generated by a geojson request to the queue', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -549,24 +589,25 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8 }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: undefined, state: undefined, force: undefined }, }, ]); }); it('should insert tiles generated by bbox request in multiple zoom levels', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -579,12 +620,12 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args.map((job) => job.data)).toContainSameTiles([ { x: 9794, y: 2650, z: 16, metatile: 8 }, { x: 39176, y: 10600, z: 18, metatile: 8 }, @@ -593,8 +634,8 @@ describe('tilesManager', () => { }); it('should insert tiles generated by geojson request in multiple zoom levels', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -607,12 +648,12 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args.map((job) => job.data)).toContainSameTiles([ { x: 9794, y: 2650, z: 16, metatile: 8 }, { x: 39176, y: 10600, z: 18, metatile: 8 }, @@ -621,8 +662,8 @@ describe('tilesManager', () => { }); it('should insert tiles generated by bbox into the queue in multiple batches', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -635,13 +676,13 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(2); - const firstCallArgs = insertMock.mock.calls[0][0]; - const secondCallArgs = insertMock.mock.calls[1][0]; + const firstCallArgs = insertMock.mock.calls[0][1] as JobInsert[]; + const secondCallArgs = insertMock.mock.calls[1][1] as JobInsert[]; expect(firstCallArgs).toHaveLength(10); expect([...firstCallArgs, ...secondCallArgs].map((job) => job.data)).toContainSameTiles([ @@ -663,8 +704,8 @@ describe('tilesManager', () => { }); it('should insert tiles generated by geojson into the queue in multiple batches', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -677,13 +718,13 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(2); - const firstCallArgs = insertMock.mock.calls[0][0]; - const secondCallArgs = insertMock.mock.calls[1][0]; + const firstCallArgs = insertMock.mock.calls[0][1] as JobInsert[]; + const secondCallArgs = insertMock.mock.calls[1][1] as JobInsert[]; expect(firstCallArgs).toHaveLength(10); expect([...firstCallArgs, ...secondCallArgs].map((job) => job.data)).toContainSameTiles([ @@ -705,8 +746,8 @@ describe('tilesManager', () => { }); it('should add the id of the parent tile request to the tiles generated by bbox request', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const id = faker.string.uuid(); const promise = tilesManager.handleTileRequest({ @@ -721,24 +762,25 @@ describe('tilesManager', () => { source: 'api', }, id, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, state: undefined, force: undefined }, }, ]); }); it('should add the id of the parent tile request to the tiles generated by geojson request', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const id = faker.string.uuid(); const promise = tilesManager.handleTileRequest({ @@ -753,24 +795,25 @@ describe('tilesManager', () => { source: 'api', }, id, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, state: undefined, force: undefined }, }, ]); }); it('should filter out non intersected tiles for geojson request', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const [west, south, east, north] = bbox(GOOD_LARGE_FEATURE); const boundingBox = { west, south, east, north }; @@ -786,7 +829,7 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(geojsonPromise).resolves.not.toThrow(); const geojsonInsertions = insertMock.mock.calls.length; @@ -802,7 +845,7 @@ describe('tilesManager', () => { ], source: 'api', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(bboxPromise).resolves.not.toThrow(); @@ -814,8 +857,8 @@ describe('tilesManager', () => { describe('expired tiles requests', () => { it('should insert the tile to the queue', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const id = faker.string.uuid(); const promise = tilesManager.handleTileRequest({ @@ -830,24 +873,25 @@ describe('tilesManager', () => { ], source: 'expiredTiles', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, state: undefined, force: undefined }, }, ]); }); it('should insert the tile to the queue with force attribute if request configured so', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const id = faker.string.uuid(); const promise = tilesManager.handleTileRequest({ @@ -863,17 +907,18 @@ describe('tilesManager', () => { source: 'expiredTiles', force: true, }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, force: true }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, force: true, state: undefined }, }, ]); }); @@ -902,8 +947,8 @@ describe('tilesManager', () => { }), }; - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const id = faker.string.uuid(); const promise = tilesManager.handleTileRequest({ @@ -918,24 +963,25 @@ describe('tilesManager', () => { ], source: 'expiredTiles', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, force: true }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, force: true, state: undefined }, }, ]); }); it('should insert tiles in multiple zoom levels', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -948,12 +994,12 @@ describe('tilesManager', () => { ], source: 'expiredTiles', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args.map((job) => job.data)).toContainSameTiles([ { x: 9794, y: 2650, z: 16, metatile: 8 }, { x: 39176, y: 10600, z: 18, metatile: 8 }, @@ -962,8 +1008,8 @@ describe('tilesManager', () => { }); it('should insert tiles with the parent state', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -977,12 +1023,12 @@ describe('tilesManager', () => { source: 'expiredTiles', state: 666, }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; expect(args.map((job) => job.data)).toContainSameTiles([ { x: 9794, y: 2650, z: 16, metatile: 8, state: 666 }, { x: 39176, y: 10600, z: 18, metatile: 8, state: 666 }, @@ -991,8 +1037,8 @@ describe('tilesManager', () => { }); it('should insert tiles into the queue in multiple batches', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert[]]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const promise = tilesManager.handleTileRequest({ data: { @@ -1005,13 +1051,13 @@ describe('tilesManager', () => { ], source: 'expiredTiles', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(2); - const firstCallArgs = insertMock.mock.calls[0][0]; - const secondCallArgs = insertMock.mock.calls[1][0]; + const firstCallArgs = insertMock.mock.calls[0][1] as JobInsert[]; + const secondCallArgs = insertMock.mock.calls[1][1] as JobInsert[]; expect(firstCallArgs).toHaveLength(10); expect([...firstCallArgs, ...secondCallArgs].map((job) => job.data)).toContainSameTiles([ @@ -1033,8 +1079,8 @@ describe('tilesManager', () => { }); it('should not insert the same tile twice', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const id = faker.string.uuid(); const promise = tilesManager.handleTileRequest({ @@ -1054,24 +1100,25 @@ describe('tilesManager', () => { ], source: 'expiredTiles', }, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, state: undefined, force: undefined }, }, ]); }); it('should add the id of the tile request to the tiles', async function () { - const insertMock = jest.fn, [PgBoss.JobInsert]>().mockResolvedValue('ok'); - const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, new client.Registry()); + const insertMock = jest.fn().mockResolvedValue(['ok']); + const tilesManager = new TilesManager({ insert: insertMock } as unknown as PgBoss, configMock, logger, queueNames, new client.Registry()); const id = faker.string.uuid(); const promise = tilesManager.handleTileRequest({ @@ -1086,17 +1133,18 @@ describe('tilesManager', () => { source: 'expiredTiles', }, id, - } as unknown as PgBoss.JobWithMetadataDoneCallback); + } as unknown as JobWithMetadata); await expect(promise).resolves.not.toThrow(); expect(insertMock).toHaveBeenCalledTimes(1); - const args = insertMock.mock.calls[0][0]; + const queueName = insertMock.mock.calls[0][0]; + const args = insertMock.mock.calls[0][1] as JobInsert[]; + expect(queueName).toBe('tiles-test'); expect(args).toEqual([ { - name: 'tiles-test', - data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id }, - retryDelay: 1, + ...queueConfig, + data: { x: 39176, y: 10600, z: 18, metatile: 8, parent: id, state: undefined, force: undefined }, }, ]); });