Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
c0731a9
support for postgres search
derdawid May 19, 2020
67725d5
Fix UTF8 encoding
derdawid Apr 26, 2021
4a35dd2
Updates doctrine
gcarlev Aug 25, 2023
eb770e8
Merge pull request #4 from gcarlev/upgrade-doctrine
derdawid Aug 25, 2023
1a3b9aa
Updates doctrine
gcarlev Aug 25, 2023
548c886
Updates doctrine
gcarlev Aug 25, 2023
4993175
Merge pull request #5 from gcarlev/upgrade-doctrine
derdawid Aug 25, 2023
01260c4
Fix Symfony\Component\Config\Definition\Builder\TreeBuilder::root() m…
gcarlev Sep 15, 2023
5c1957d
Fix Symfony\Component\Config\Definition\Builder\TreeBuilder::root() m…
gcarlev Sep 15, 2023
4f8c881
Merge pull request #6 from gcarlev/fix-bundle-configuration
derdawid Sep 15, 2023
6392f0f
Fix deprecations related to JobController Auto-injection and Doctrine…
gcarlev Sep 28, 2023
8d3c2d4
Merge pull request #7 from gcarlev/fix-deprecations
derdawid Sep 28, 2023
a347de7
Fix console.xml
gcarlev Sep 29, 2023
fce3e28
Merge pull request #8 from gcarlev/fix-deprecations
derdawid Sep 29, 2023
903f862
Fix The doctrine-bundle type commenting features were removed
gcarlev Sep 29, 2023
1514000
Merge pull request #9 from gcarlev/fix-doctrine-deprecations
derdawid Sep 29, 2023
d690ea0
Fix Twig deprecations
gcarlev Sep 29, 2023
f440648
Merge pull request #10 from gcarlev/fixtwig-deprecations
derdawid Sep 29, 2023
dd0ce53
Sync Job entity with the jms_jobs table present in prod in order to a…
gcarlev Nov 3, 2023
e77a43c
Merge pull request #11 from gcarlev/sync-jms-job-table-with-prod
derdawid Nov 6, 2023
8bc76c6
rename package
Mar 5, 2024
a9976c6
Merge pull request #12 from levuro/rename_package
Mar 6, 2024
100226b
Upgrade to symfony 5/6
gcarlev Apr 2, 2024
7d76a57
Replaces json_array coulm type with json
gcarlev Apr 3, 2024
6cce646
Merge pull request #13 from gcarlev/symfony-5
gcarlev Apr 3, 2024
13335db
Replaces executeUpdate with executeStatement and fix event dispatcher
gcarlev Apr 9, 2024
05a83d8
Fix listeners
gcarlev Apr 9, 2024
b978034
Fix deprecations in SafeObjectType.php
gcarlev Apr 9, 2024
e9ad5d9
Adds some missing return types
gcarlev Apr 9, 2024
045fde1
Merge pull request #15 from gcarlev/fix-job-cleanup
gcarlev Apr 10, 2024
6ef582e
Fix JobEvent
gcarlev Apr 10, 2024
78d8ccc
JmsJobBundle for symfony 5.4 and code clean up
gcarlev Apr 10, 2024
fbabed0
Merge pull request #16 from gcarlev/fix-clean-up-2
gcarlev Apr 10, 2024
c0d68aa
Fix configuration
gcarlev Apr 10, 2024
39235e9
Merge pull request #17 from gcarlev/fix-clean-up-2
gcarlev Apr 10, 2024
43d22e3
Fix SafeObjectType
gcarlev Apr 10, 2024
03f3170
Merge pull request #18 from gcarlev/fix-clean-up-2
gcarlev Apr 11, 2024
7ca957a
Fix ScheduleCommand
gcarlev Apr 11, 2024
535965e
Merge pull request #19 from gcarlev/fix-clean-up-2
gcarlev Apr 11, 2024
7242973
Drop symfony/debug and requires symfony/error-handler instead
gcarlev Apr 16, 2024
58c644e
Merge pull request #20 from gcarlev/drop-symfony-debug
gcarlev Apr 16, 2024
aa109e1
chore: prepare twig 3
mbreliberty Nov 19, 2024
6ccde0d
chore: remove stats functionality (#1)
robeliberty Oct 14, 2025
ddc7586
Annotations -> Attributes & Fixes
Dec 23, 2025
b3e7a0c
Updated bundle name
Dec 23, 2025
5cdba26
Updated bundle name
Dec 23, 2025
ff0e9ba
Updated bundle name
Dec 23, 2025
5de423f
SF6 support
Dec 23, 2025
56388e0
Fixed SF6 deprecations
Dec 24, 2025
dc347d2
Fixed typing
Dec 29, 2025
99b9fe4
Old to new notation of doctrine entity
Jan 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 65 additions & 39 deletions Command/CleanUpCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

namespace JMS\JobQueueBundle\Command;

use Doctrine\Persistence\ManagerRegistry;
use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManager;
use Doctrine\DBAL\Exception;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Exception\ORMException;
use Doctrine\ORM\NonUniqueResultException;
use Doctrine\ORM\OptimisticLockException;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use Symfony\Component\Console\Command\Command;
Expand All @@ -14,20 +16,24 @@

class CleanUpCommand extends Command
{
protected static $defaultName = 'jms-job-queue:clean-up';
public const COMMAND_NAME = 'jms-job-queue:clean-up';

private $jobManager;
private $registry;
private EntityManagerInterface $entityManager;
private JobManager $jobManager;

public function __construct(ManagerRegistry $registry, JobManager $jobManager)
public function __construct(EntityManagerInterface $entityManager, JobManager $jobManager)
{
parent::__construct();

parent::__construct(self::COMMAND_NAME);
$this->entityManager = $entityManager;
$this->jobManager = $jobManager;
$this->registry = $registry;
}

protected function configure()
public static function getDefaultName(): string
{
return self::COMMAND_NAME;
}

protected function configure(): void
{
$this
->setDescription('Cleans up jobs which exceed the maximum retention time.')
Expand All @@ -37,20 +43,29 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
/**
* @throws OptimisticLockException
* @throws \Throwable
* @throws ORMException
* @throws Exception
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var EntityManager $em */
$em = $this->registry->getManagerForClass(Job::class);
$con = $em->getConnection();
$this->cleanUpExpiredJobs($input);
$this->collectStaleJobs();

$this->cleanUpExpiredJobs($em, $con, $input);
$this->collectStaleJobs($em);
return 0;
}

private function collectStaleJobs(EntityManager $em)
/**
* @throws OptimisticLockException
* @throws ORMException
* @throws NonUniqueResultException
* @throws Exception
*/
private function collectStaleJobs(): void
{
foreach ($this->findStaleJobs($em) as $job) {
foreach ($this->findStaleJobs() as $job) {
if ($job->isRetried()) {
continue;
}
Expand All @@ -61,16 +76,17 @@ private function collectStaleJobs(EntityManager $em)

/**
* @return Job[]
* @throws NonUniqueResultException
*/
private function findStaleJobs(EntityManager $em)
private function findStaleJobs(): iterable
{
$excludedIds = array(-1);

do {
$em->clear();
$this->entityManager->clear();

/** @var Job $job */
$job = $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j
$job = $this->entityManager->createQuery("SELECT j FROM JMSJobQueueBundle:Job j
WHERE j.state = :running AND j.workerName IS NOT NULL AND j.checkedAt < :maxAge
AND j.id NOT IN (:excludedIds)")
->setParameter('running', Job::STATE_RUNNING)
Expand All @@ -87,37 +103,47 @@ private function findStaleJobs(EntityManager $em)
} while ($job !== null);
}

private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInterface $input)
/**
* @throws Exception
*/
private function cleanUpExpiredJobs(InputInterface $input): void
{
$con = $this->entityManager->getConnection();
$incomingDepsSql = $con->getDatabasePlatform()->modifyLimitQuery("SELECT 1 FROM jms_job_dependencies WHERE dest_job_id = :id", 1);

$count = 0;
foreach ($this->findExpiredJobs($em, $input) as $job) {
foreach ($this->findExpiredJobs($input) as $job) {
/** @var Job $job */

$count++;

$result = $con->executeQuery($incomingDepsSql, array('id' => $job->getId()));
if ($result->fetchOne() !== false) {
$em->transactional(function() use ($em, $job) {
$this->resolveDependencies($em, $job);
$em->remove($job);
$this->entityManager->wrapInTransaction(function() use ($job) {
$this->resolveDependencies($job);
$this->entityManager->remove($job);
});

continue;
}

$em->remove($job);
$this->entityManager->remove($job);

if ($count >= $input->getOption('per-call')) {
break;
}
}

$em->flush();
$this->entityManager->flush();
}

private function resolveDependencies(EntityManager $em, Job $job)
/**
* @param Job $job
* @throws Exception
* @throws ORMException
* @throws OptimisticLockException
*/
private function resolveDependencies(Job $job)
{
// If this job has failed, or has otherwise not succeeded, we need to set the
// incoming dependencies to failed if that has not been done already.
Expand All @@ -136,13 +162,13 @@ private function resolveDependencies(EntityManager $em, Job $job)
}
}

$em->getConnection()->executeStatement("DELETE FROM jms_job_dependencies WHERE dest_job_id = :id", ['id' => $job->getId()]);
$this->entityManager->getConnection()->executeStatement("DELETE FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId()));
}

private function findExpiredJobs(EntityManager $em, InputInterface $input)
private function findExpiredJobs(InputInterface $input): \Generator
{
$succeededJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.state = :succeeded AND j.id NOT IN (:excludedIds)")
$succeededJobs = function(array $excludedIds) use ($input) {
return $this->entityManager->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.state = :succeeded AND j.id NOT IN (:excludedIds)")
->setParameter('maxRetentionTime', new \DateTime('-'.$input->getOption('max-retention-succeeded')))
->setParameter('excludedIds', $excludedIds)
->setParameter('succeeded', Job::STATE_FINISHED)
Expand All @@ -151,17 +177,17 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input)
};
yield from $this->whileResults( $succeededJobs );

$finishedJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
$finishedJobs = function(array $excludedIds) use ($input) {
return $this->entityManager->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
->setParameter('maxRetentionTime', new \DateTime('-'.$input->getOption('max-retention')))
->setParameter('excludedIds', $excludedIds)
->setMaxResults(100)
->getResult();
};
yield from $this->whileResults( $finishedJobs );

$canceledJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
$canceledJobs = function(array $excludedIds) use ($input) {
return $this->entityManager->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
->setParameter('maxRetentionTime', new \DateTime('-'.$input->getOption('max-retention')))
->setParameter('canceled', Job::STATE_CANCELED)
->setParameter('excludedIds', $excludedIds)
Expand All @@ -171,7 +197,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input)
yield from $this->whileResults( $canceledJobs );
}

private function whileResults(callable $resultProducer)
private function whileResults(callable $resultProducer): \Generator
{
$excludedIds = array(-1);

Expand Down
22 changes: 16 additions & 6 deletions Command/MarkJobIncompleteCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace JMS\JobQueueBundle\Command;

use Doctrine\ORM\NonUniqueResultException;
use Doctrine\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Job;
Expand All @@ -13,28 +14,37 @@

class MarkJobIncompleteCommand extends Command
{
protected static $defaultName = 'jms-job-queue:mark-incomplete';
public const COMMAND_NAME = 'jms-job-queue:mark-incomplete';

private $registry;
private $jobManager;
private ManagerRegistry $registry;
private JobManager $jobManager;

public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager)
{
parent::__construct();
parent::__construct(self::COMMAND_NAME);

$this->registry = $managerRegistry;
$this->jobManager = $jobManager;
}

protected function configure()
public static function getDefaultName(): string
{
return self::COMMAND_NAME;
}

protected function configure(): void
{
$this
->setDescription('Internal command (do not use). It marks jobs as incomplete.')
->addArgument('job-id', InputArgument::REQUIRED, 'The ID of the Job.')
;
}

protected function execute(InputInterface $input, OutputInterface $output)
/**
* @throws NonUniqueResultException
* @throws \Exception
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var EntityManager $em */
$em = $this->registry->getManagerForClass(Job::class);
Expand Down
Loading