From 0c98583b976180dc95c2a635cd726f1fa3d4b2ba Mon Sep 17 00:00:00 2001 From: Thierry Feuzeu Date: Thu, 3 Apr 2025 02:22:52 +0200 Subject: [PATCH 1/7] Add a simple batch processing sample. --- app/src/SimpleBatch/ExecuteCommand.php | 68 ++++++++++ app/src/SimpleBatch/README.md | 17 +++ app/src/SimpleBatch/SimpleBatchActivity.php | 81 ++++++++++++ .../SimpleBatchActivityInterface.php | 49 ++++++++ app/src/SimpleBatch/SimpleBatchWorkflow.php | 119 ++++++++++++++++++ .../SimpleBatchWorkflowInterface.php | 31 +++++ app/src/SimpleBatch/StatusCommand.php | 59 +++++++++ 7 files changed, 424 insertions(+) create mode 100755 app/src/SimpleBatch/ExecuteCommand.php create mode 100644 app/src/SimpleBatch/README.md create mode 100755 app/src/SimpleBatch/SimpleBatchActivity.php create mode 100755 app/src/SimpleBatch/SimpleBatchActivityInterface.php create mode 100755 app/src/SimpleBatch/SimpleBatchWorkflow.php create mode 100755 app/src/SimpleBatch/SimpleBatchWorkflowInterface.php create mode 100755 app/src/SimpleBatch/StatusCommand.php diff --git a/app/src/SimpleBatch/ExecuteCommand.php b/app/src/SimpleBatch/ExecuteCommand.php new file mode 100755 index 0000000..6414fac --- /dev/null +++ b/app/src/SimpleBatch/ExecuteCommand.php @@ -0,0 +1,68 @@ +getArgument('batchId')); + $minItemCount = intval($input->getOption('min')); + $maxItemCount = intval($input->getOption('max')); + + $workflow = $this->workflowClient->newWorkflowStub( + SimpleBatchWorkflowInterface::class, + WorkflowOptions::new() + ->withWorkflowId(SimpleBatchWorkflowInterface::WORKFLOW_ID . ':' . $batchId) + ->withWorkflowExecutionTimeout(CarbonInterval::week()) + ); + + $output->writeln("Starting SimpleBatchWorkflow... "); + + try { + $run = $this->workflowClient->start($workflow, $batchId, $minItemCount, $maxItemCount); + $output->writeln( + sprintf( + 'Started: WorkflowID=%s, RunID=%s', + $run->getExecution()->getID(), + $run->getExecution()->getRunID(), + ) + ); + } catch (WorkflowExecutionAlreadyStartedException $e) { + $output->writeln('Still running'); + } + + return self::SUCCESS; + } +} diff --git a/app/src/SimpleBatch/README.md b/app/src/SimpleBatch/README.md new file mode 100644 index 0000000..8403b83 --- /dev/null +++ b/app/src/SimpleBatch/README.md @@ -0,0 +1,17 @@ +# SimpleActivity sample + +This sample demonstrates a simple batch processing. + +Run the following command to start a batch with a number of items randomly chosen between given min and max values: + +```bash +php ./app/app.php simple-batch:start [--min ] [--max ] +``` + +The minimum and maximum item count resp. default to 20 and 50. + +Run the following command to show the batch status: + +```bash +php ./app/app.php simple-batch:status +``` diff --git a/app/src/SimpleBatch/SimpleBatchActivity.php b/app/src/SimpleBatch/SimpleBatchActivity.php new file mode 100755 index 0000000..58d0c40 --- /dev/null +++ b/app/src/SimpleBatch/SimpleBatchActivity.php @@ -0,0 +1,81 @@ +logger = new Logger(); + } + + /** + * @inheritDoc + */ + public function getBatchItemIds(int $batchId, int $minItemCount, int $maxItemCount): array + { + return array_map(fn(int $itemId) => ($batchId % 100) * 1000 + $itemId, + range(101, random_int(100 + $minItemCount, 100 + $maxItemCount))); + } + + /** + * @inheritDoc + */ + public function itemProcessingStarted(int $itemId, int $batchId): bool + { + $this->log("Started processing of item %d of batch %d.", $itemId, $batchId); + return true; + } + + /** + * @inheritDoc + */ + public function processItem(int $itemId, int $batchId): int + { + $this->log("Processing item %d of batch %d.", $itemId, $batchId); + + $random = random_int(0, 90); + // Wait for max 1 second. + usleep($random % 10000); + + // Randomly throw an error. + if($random > 30) + { + throw new Exception(sprintf("Error while processing of item %d of batch %d.", $itemId, $batchId)); + } + return $random; + } + + /** + * @inheritDoc + */ + public function itemProcessingEnded(int $itemId, int $batchId): bool + { + $this->log("Ended processing of item %d of batch %d.", $itemId, $batchId); + return true; + } + + /** + * @param string $message + * @param mixed ...$arg + */ + private function log(string $message, ...$arg) + { + $this->logger->debug(sprintf($message, ...$arg)); + } +} diff --git a/app/src/SimpleBatch/SimpleBatchActivityInterface.php b/app/src/SimpleBatch/SimpleBatchActivityInterface.php new file mode 100755 index 0000000..ccff198 --- /dev/null +++ b/app/src/SimpleBatch/SimpleBatchActivityInterface.php @@ -0,0 +1,49 @@ +batchActivity = Workflow::newActivityStub( + SimpleBatchActivityInterface::class, + ActivityOptions::new() + ->withStartToCloseTimeout(CarbonInterval::hours(6)) + ->withScheduleToStartTimeout(CarbonInterval::hours(4)) + ->withScheduleToCloseTimeout(CarbonInterval::hours(6)) + ->withRetryOptions( + RetryOptions::new() + ->withMaximumAttempts(100) + ->withInitialInterval(CarbonInterval::second(10)) + ->withMaximumInterval(CarbonInterval::seconds(100)) + ) + ); + } + + /** + * @inheritDoc + */ + public function start(int $batchId, int $minItemCount, int $maxItemCount) + { + $itemIds = yield $this->batchActivity + ->getBatchItemIds($batchId, $minItemCount, $maxItemCount); + + $promises = []; + foreach($itemIds as $itemId) + { + // Set the batch item as pending. + $this->pending[$itemId] = true; + // Process the batch item. + $promises[$itemId] = Workflow::async( + function() use($itemId, $batchId) { + // Set the item processing as started. + yield $this->batchActivity->itemProcessingStarted($itemId, $batchId); + + // This activity randomly throws an exception. + $output = yield $this->batchActivity->processItem($itemId, $batchId); + + // Set the item processing as ended. + yield $this->batchActivity->itemProcessingEnded($itemId, $batchId); + + return $output; + } + ) + ->then( + fn($output) => $this->results[$itemId] = [ + 'success' => true, + 'output' => $output, + ], + fn(Throwable $e) => $this->results[$itemId] = [ + 'success' => false, + 'message' => $e->getMessage(), + ] + ) + // We are calling always() instead of finally() because the Temporal PHP SDK depends on + // react/promise 2.9. Will need to change to finally() when upgrading to react/promise 3.x. + ->always(fn() => $this->pending[$itemId] = false); + } + + // Wait for all the async calls to terminate. + yield Promise::all($promises); + + return $this->results; + } + + /** + * @inheritDoc + */ + public function getAvailableResults(): array + { + return $this->results; + } + + /** + * @inheritDoc + */ + public function getPendingTasks(): array + { + return array_keys(array_filter($this->pending, fn($pending) => $pending)); + } +} diff --git a/app/src/SimpleBatch/SimpleBatchWorkflowInterface.php b/app/src/SimpleBatch/SimpleBatchWorkflowInterface.php new file mode 100755 index 0000000..26ef3c6 --- /dev/null +++ b/app/src/SimpleBatch/SimpleBatchWorkflowInterface.php @@ -0,0 +1,31 @@ +getArgument('batchId')); + + /** @var SimpleBatchWorkflowInterface */ + $workflow = $this->workflowClient->newRunningWorkflowStub( + SimpleBatchWorkflowInterface::class, + SimpleBatchWorkflowInterface::WORKFLOW_ID . ':' . $batchId + ); + + $results = $workflow->getAvailableResults(); + $pending = $workflow->getPendingTasks(); + $failedCount = count(array_filter($results, fn(array $result) => !$result['success'])); + + $output->writeln("SimpleBatchWorkflow (id $batchId) status"); + $output->writeln(json_encode([ + 'count' => [ + 'pending' => count($pending), + 'results' => count($results), + 'succeeded' => count($results) - $failedCount, + 'failed' => $failedCount, + ], + 'tasks' => [ + 'pending' => $pending, + 'results' => $results, + ], + ], JSON_PRETTY_PRINT)); + + return self::SUCCESS; + } +} From dc4dac04c840084e7c88c4007e58950fbe943ad8 Mon Sep 17 00:00:00 2001 From: Thierry Feuzeu Date: Thu, 3 Apr 2025 21:48:20 +0200 Subject: [PATCH 2/7] Change the workflow and activity parameters format. --- app/src/SimpleBatch/ExecuteCommand.php | 8 ++++-- app/src/SimpleBatch/README.md | 2 +- app/src/SimpleBatch/SimpleBatchActivity.php | 28 +++++++++++-------- .../SimpleBatchActivityInterface.php | 13 +++++---- app/src/SimpleBatch/SimpleBatchWorkflow.php | 9 +++--- .../SimpleBatchWorkflowInterface.php | 2 +- 6 files changed, 34 insertions(+), 28 deletions(-) diff --git a/app/src/SimpleBatch/ExecuteCommand.php b/app/src/SimpleBatch/ExecuteCommand.php index 6414fac..3f7bfc6 100755 --- a/app/src/SimpleBatch/ExecuteCommand.php +++ b/app/src/SimpleBatch/ExecuteCommand.php @@ -38,8 +38,10 @@ class ExecuteCommand extends Command public function execute(InputInterface $input, OutputInterface $output): int { $batchId = intval($input->getArgument('batchId')); - $minItemCount = intval($input->getOption('min')); - $maxItemCount = intval($input->getOption('max')); + $options = [ + 'min' => intval($input->getOption('min')), + 'max' => intval($input->getOption('max')), + ]; $workflow = $this->workflowClient->newWorkflowStub( SimpleBatchWorkflowInterface::class, @@ -51,7 +53,7 @@ public function execute(InputInterface $input, OutputInterface $output): int $output->writeln("Starting SimpleBatchWorkflow... "); try { - $run = $this->workflowClient->start($workflow, $batchId, $minItemCount, $maxItemCount); + $run = $this->workflowClient->start($workflow, $batchId, $options); $output->writeln( sprintf( 'Started: WorkflowID=%s, RunID=%s', diff --git a/app/src/SimpleBatch/README.md b/app/src/SimpleBatch/README.md index 8403b83..8d3dada 100644 --- a/app/src/SimpleBatch/README.md +++ b/app/src/SimpleBatch/README.md @@ -8,7 +8,7 @@ Run the following command to start a batch with a number of items randomly chose php ./app/app.php simple-batch:start [--min ] [--max ] ``` -The minimum and maximum item count resp. default to 20 and 50. +The minimum and maximum item count resp. default to 10 and 20. Run the following command to show the batch status: diff --git a/app/src/SimpleBatch/SimpleBatchActivity.php b/app/src/SimpleBatch/SimpleBatchActivity.php index 58d0c40..5c2d34f 100755 --- a/app/src/SimpleBatch/SimpleBatchActivity.php +++ b/app/src/SimpleBatch/SimpleBatchActivity.php @@ -17,6 +17,9 @@ class SimpleBatchActivity implements SimpleBatchActivityInterface { + /** + * @var LoggerInterface + */ private LoggerInterface $logger; public function __construct() @@ -27,21 +30,15 @@ public function __construct() /** * @inheritDoc */ - public function getBatchItemIds(int $batchId, int $minItemCount, int $maxItemCount): array + public function getBatchItemIds(int $batchId, array $options): array { + $minItemCount = $options['min'] ?? 10; + $maxItemCount = $options['max'] ?? 20; + // Return an array with between $minItemCount and $maxItemCount entries. return array_map(fn(int $itemId) => ($batchId % 100) * 1000 + $itemId, range(101, random_int(100 + $minItemCount, 100 + $maxItemCount))); } - /** - * @inheritDoc - */ - public function itemProcessingStarted(int $itemId, int $batchId): bool - { - $this->log("Started processing of item %d of batch %d.", $itemId, $batchId); - return true; - } - /** * @inheritDoc */ @@ -64,10 +61,17 @@ public function processItem(int $itemId, int $batchId): int /** * @inheritDoc */ - public function itemProcessingEnded(int $itemId, int $batchId): bool + public function itemProcessingStarted(int $itemId, int $batchId): void + { + $this->log("Started processing of item %d of batch %d.", $itemId, $batchId); + } + + /** + * @inheritDoc + */ + public function itemProcessingEnded(int $itemId, int $batchId): void { $this->log("Ended processing of item %d of batch %d.", $itemId, $batchId); - return true; } /** diff --git a/app/src/SimpleBatch/SimpleBatchActivityInterface.php b/app/src/SimpleBatch/SimpleBatchActivityInterface.php index ccff198..aa75aeb 100755 --- a/app/src/SimpleBatch/SimpleBatchActivityInterface.php +++ b/app/src/SimpleBatch/SimpleBatchActivityInterface.php @@ -18,26 +18,27 @@ interface SimpleBatchActivityInterface { /** * @param int $batchId + * @param array $options * * @return array */ - public function getBatchItemIds(int $batchId, int $minItemCount, int $maxItemCount): array; + public function getBatchItemIds(int $batchId, array $options): array; /** * @param int $itemId * @param int $batchId * - * @return void + * @return int */ - public function itemProcessingStarted(int $itemId, int $batchId): bool; + public function processItem(int $itemId, int $batchId): int; /** * @param int $itemId * @param int $batchId * - * @return int + * @return void */ - public function processItem(int $itemId, int $batchId): int; + public function itemProcessingStarted(int $itemId, int $batchId): void; /** * @param int $itemId @@ -45,5 +46,5 @@ public function processItem(int $itemId, int $batchId): int; * * @return void */ - public function itemProcessingEnded(int $itemId, int $batchId): bool; + public function itemProcessingEnded(int $itemId, int $batchId): void; } diff --git a/app/src/SimpleBatch/SimpleBatchWorkflow.php b/app/src/SimpleBatch/SimpleBatchWorkflow.php index 8cbe1aa..811ce46 100755 --- a/app/src/SimpleBatch/SimpleBatchWorkflow.php +++ b/app/src/SimpleBatch/SimpleBatchWorkflow.php @@ -55,10 +55,9 @@ public function __construct() /** * @inheritDoc */ - public function start(int $batchId, int $minItemCount, int $maxItemCount) + public function start(int $batchId, array $options) { - $itemIds = yield $this->batchActivity - ->getBatchItemIds($batchId, $minItemCount, $maxItemCount); + $itemIds = yield $this->batchActivity->getBatchItemIds($batchId, $options); $promises = []; foreach($itemIds as $itemId) @@ -68,13 +67,13 @@ public function start(int $batchId, int $minItemCount, int $maxItemCount) // Process the batch item. $promises[$itemId] = Workflow::async( function() use($itemId, $batchId) { - // Set the item processing as started. + // Notify the item processing start. yield $this->batchActivity->itemProcessingStarted($itemId, $batchId); // This activity randomly throws an exception. $output = yield $this->batchActivity->processItem($itemId, $batchId); - // Set the item processing as ended. + // Notify the item processing end. yield $this->batchActivity->itemProcessingEnded($itemId, $batchId); return $output; diff --git a/app/src/SimpleBatch/SimpleBatchWorkflowInterface.php b/app/src/SimpleBatch/SimpleBatchWorkflowInterface.php index 26ef3c6..1cba166 100755 --- a/app/src/SimpleBatch/SimpleBatchWorkflowInterface.php +++ b/app/src/SimpleBatch/SimpleBatchWorkflowInterface.php @@ -21,7 +21,7 @@ interface SimpleBatchWorkflowInterface public const WORKFLOW_ID = 'simple-batch-workflow'; #[WorkflowMethod(name: "SimpleBatch")] - public function start(int $batchId, int $minItemCount, int $maxItemCount); + public function start(int $batchId, array $options); #[QueryMethod] public function getAvailableResults(): array; From 33d85bc55ea2f9245bf34d951ae115fe861f8be4 Mon Sep 17 00:00:00 2001 From: Thierry Feuzeu Date: Thu, 3 Apr 2025 21:48:45 +0200 Subject: [PATCH 3/7] Add activity functions. --- app/src/SimpleBatch/SimpleBatchActivity.php | 24 +++++++++++++++++++ .../SimpleBatchActivityInterface.php | 16 +++++++++++++ app/src/SimpleBatch/SimpleBatchWorkflow.php | 6 +++++ 3 files changed, 46 insertions(+) diff --git a/app/src/SimpleBatch/SimpleBatchActivity.php b/app/src/SimpleBatch/SimpleBatchActivity.php index 5c2d34f..1ae0005 100755 --- a/app/src/SimpleBatch/SimpleBatchActivity.php +++ b/app/src/SimpleBatch/SimpleBatchActivity.php @@ -58,6 +58,30 @@ public function processItem(int $itemId, int $batchId): int return $random; } + /** + * @param int $batchId + * @param array $options + * + * @return void + */ + public function batchProcessingStarted(int $batchId, array $options): void + { + $this->log("Started processing of batch %d.", $batchId); + $this->log("Batch options: %s.", json_encode($options, JSON_PRETTY_PRINT)); + } + + /** + * @param int $batchId + * @param array $results + * + * @return void + */ + public function batchProcessingEnded(int $batchId, array $results): void + { + $this->log("Ended processing of batch %d.", $batchId); + $this->log("Batch results: %s.", json_encode($results, JSON_PRETTY_PRINT)); + } + /** * @inheritDoc */ diff --git a/app/src/SimpleBatch/SimpleBatchActivityInterface.php b/app/src/SimpleBatch/SimpleBatchActivityInterface.php index aa75aeb..faf43cd 100755 --- a/app/src/SimpleBatch/SimpleBatchActivityInterface.php +++ b/app/src/SimpleBatch/SimpleBatchActivityInterface.php @@ -32,6 +32,22 @@ public function getBatchItemIds(int $batchId, array $options): array; */ public function processItem(int $itemId, int $batchId): int; + /** + * @param int $batchId + * @param array $options + * + * @return void + */ + public function batchProcessingStarted(int $batchId, array $options): void; + + /** + * @param int $batchId + * @param array $results + * + * @return void + */ + public function batchProcessingEnded(int $batchId, array $results): void; + /** * @param int $itemId * @param int $batchId diff --git a/app/src/SimpleBatch/SimpleBatchWorkflow.php b/app/src/SimpleBatch/SimpleBatchWorkflow.php index 811ce46..2ce1b0f 100755 --- a/app/src/SimpleBatch/SimpleBatchWorkflow.php +++ b/app/src/SimpleBatch/SimpleBatchWorkflow.php @@ -57,6 +57,9 @@ public function __construct() */ public function start(int $batchId, array $options) { + // Notify the batch processing start. + yield $this->batchActivity->batchProcessingStarted($batchId, $options); + $itemIds = yield $this->batchActivity->getBatchItemIds($batchId, $options); $promises = []; @@ -97,6 +100,9 @@ function() use($itemId, $batchId) { // Wait for all the async calls to terminate. yield Promise::all($promises); + // Notify the batch processing end. + yield $this->batchActivity->batchProcessingEnded($batchId, $this->results); + return $this->results; } From e03f399713cb7eff9934315e96d821fd80e4e9ba Mon Sep 17 00:00:00 2001 From: Thierry Feuzeu Date: Tue, 15 Apr 2025 13:33:21 +0200 Subject: [PATCH 4/7] Change the activity timeout in the simple batch example. --- app/src/SimpleBatch/README.md | 2 +- app/src/SimpleBatch/SimpleBatchWorkflow.php | 6 +++--- app/src/SimpleBatch/StatusCommand.php | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/app/src/SimpleBatch/README.md b/app/src/SimpleBatch/README.md index 8d3dada..d08a8da 100644 --- a/app/src/SimpleBatch/README.md +++ b/app/src/SimpleBatch/README.md @@ -1,4 +1,4 @@ -# SimpleActivity sample +# Simple batch sample This sample demonstrates a simple batch processing. diff --git a/app/src/SimpleBatch/SimpleBatchWorkflow.php b/app/src/SimpleBatch/SimpleBatchWorkflow.php index 2ce1b0f..1a4fb79 100755 --- a/app/src/SimpleBatch/SimpleBatchWorkflow.php +++ b/app/src/SimpleBatch/SimpleBatchWorkflow.php @@ -40,9 +40,9 @@ public function __construct() $this->batchActivity = Workflow::newActivityStub( SimpleBatchActivityInterface::class, ActivityOptions::new() - ->withStartToCloseTimeout(CarbonInterval::hours(6)) - ->withScheduleToStartTimeout(CarbonInterval::hours(4)) - ->withScheduleToCloseTimeout(CarbonInterval::hours(6)) + ->withStartToCloseTimeout(CarbonInterval::seconds(10)) + ->withScheduleToStartTimeout(CarbonInterval::seconds(10)) + ->withScheduleToCloseTimeout(CarbonInterval::minutes(30)) ->withRetryOptions( RetryOptions::new() ->withMaximumAttempts(100) diff --git a/app/src/SimpleBatch/StatusCommand.php b/app/src/SimpleBatch/StatusCommand.php index a03d9b1..2e4d157 100755 --- a/app/src/SimpleBatch/StatusCommand.php +++ b/app/src/SimpleBatch/StatusCommand.php @@ -44,7 +44,7 @@ public function execute(InputInterface $input, OutputInterface $output): int $output->writeln(json_encode([ 'count' => [ 'pending' => count($pending), - 'results' => count($results), + 'ended' => count($results), 'succeeded' => count($results) - $failedCount, 'failed' => $failedCount, ], From cba15e4d4d6f58ece820d0d6119377b0eb8bf19a Mon Sep 17 00:00:00 2001 From: Thierry Feuzeu Date: Tue, 15 Apr 2025 14:43:21 +0200 Subject: [PATCH 5/7] New sample: simple batch with child workflows. --- app/src/SimpleBatchChild/ExecuteCommand.php | 70 +++++++++++ app/src/SimpleBatchChild/README.md | 19 +++ .../SimpleBatchChild/SimpleBatchActivity.php | 109 +++++++++++++++++ .../SimpleBatchActivityInterface.php | 66 +++++++++++ .../SimpleBatchChildWorkflow.php | 59 +++++++++ .../SimpleBatchChildWorkflowInterface.php | 28 +++++ .../SimpleBatchChild/SimpleBatchWorkflow.php | 112 ++++++++++++++++++ .../SimpleBatchWorkflowInterface.php | 31 +++++ app/src/SimpleBatchChild/StatusCommand.php | 59 +++++++++ 9 files changed, 553 insertions(+) create mode 100755 app/src/SimpleBatchChild/ExecuteCommand.php create mode 100644 app/src/SimpleBatchChild/README.md create mode 100755 app/src/SimpleBatchChild/SimpleBatchActivity.php create mode 100755 app/src/SimpleBatchChild/SimpleBatchActivityInterface.php create mode 100644 app/src/SimpleBatchChild/SimpleBatchChildWorkflow.php create mode 100644 app/src/SimpleBatchChild/SimpleBatchChildWorkflowInterface.php create mode 100755 app/src/SimpleBatchChild/SimpleBatchWorkflow.php create mode 100755 app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php create mode 100755 app/src/SimpleBatchChild/StatusCommand.php diff --git a/app/src/SimpleBatchChild/ExecuteCommand.php b/app/src/SimpleBatchChild/ExecuteCommand.php new file mode 100755 index 0000000..b839f11 --- /dev/null +++ b/app/src/SimpleBatchChild/ExecuteCommand.php @@ -0,0 +1,70 @@ +getArgument('batchId')); + $options = [ + 'min' => intval($input->getOption('min')), + 'max' => intval($input->getOption('max')), + ]; + + $workflow = $this->workflowClient->newWorkflowStub( + SimpleBatchWorkflowInterface::class, + WorkflowOptions::new() + ->withWorkflowId(SimpleBatchWorkflowInterface::WORKFLOW_ID . ':' . $batchId) + ->withWorkflowExecutionTimeout(CarbonInterval::week()) + ); + + $output->writeln("Starting SimpleBatchWorkflow... "); + + try { + $run = $this->workflowClient->start($workflow, $batchId, $options); + $output->writeln( + sprintf( + 'Started: WorkflowID=%s, RunID=%s', + $run->getExecution()->getID(), + $run->getExecution()->getRunID(), + ) + ); + } catch (WorkflowExecutionAlreadyStartedException $e) { + $output->writeln('Still running'); + } + + return self::SUCCESS; + } +} diff --git a/app/src/SimpleBatchChild/README.md b/app/src/SimpleBatchChild/README.md new file mode 100644 index 0000000..1f3ccf5 --- /dev/null +++ b/app/src/SimpleBatchChild/README.md @@ -0,0 +1,19 @@ +# Simple batch with child workflows sample + +This sample demonstrates a simple batch processing, with child workflows. + +Unlike the `SimpleBatch` sample, using child workflows will prevent from exceeding [the workflow history limits](https://docs.temporal.io/self-hosted-guide/defaults). + +Run the following command to start a batch with a number of items randomly chosen between given min and max values: + +```bash +php ./app/app.php simple-batch-child:start [--min ] [--max ] +``` + +The minimum and maximum item count resp. default to 10 and 20. + +Run the following command to show the batch status: + +```bash +php ./app/app.php simple-batch-child:status +``` diff --git a/app/src/SimpleBatchChild/SimpleBatchActivity.php b/app/src/SimpleBatchChild/SimpleBatchActivity.php new file mode 100755 index 0000000..0debd41 --- /dev/null +++ b/app/src/SimpleBatchChild/SimpleBatchActivity.php @@ -0,0 +1,109 @@ +logger = new Logger(); + } + + /** + * @inheritDoc + */ + public function getBatchItemIds(int $batchId, array $options): array + { + $minItemCount = $options['min'] ?? 10; + $maxItemCount = $options['max'] ?? 20; + // Return an array with between $minItemCount and $maxItemCount entries. + return array_map(fn(int $itemId) => ($batchId % 100) * 1000 + $itemId, + range(101, random_int(100 + $minItemCount, 100 + $maxItemCount))); + } + + /** + * @inheritDoc + */ + public function processItem(int $itemId, int $batchId): int + { + $this->log("Processing item %d of batch %d.", $itemId, $batchId); + + $random = random_int(0, 90); + // Wait for max 1 second. + usleep($random % 10000); + + // Randomly throw an error. + if($random > 30) + { + throw new Exception(sprintf("Error while processing of item %d of batch %d.", $itemId, $batchId)); + } + return $random; + } + + /** + * @param int $batchId + * @param array $options + * + * @return void + */ + public function batchProcessingStarted(int $batchId, array $options): void + { + $this->log("Started processing of batch %d.", $batchId); + $this->log("Batch options: %s.", json_encode($options, JSON_PRETTY_PRINT)); + } + + /** + * @param int $batchId + * @param array $results + * + * @return void + */ + public function batchProcessingEnded(int $batchId, array $results): void + { + $this->log("Ended processing of batch %d.", $batchId); + $this->log("Batch results: %s.", json_encode($results, JSON_PRETTY_PRINT)); + } + + /** + * @inheritDoc + */ + public function itemProcessingStarted(int $itemId, int $batchId): void + { + $this->log("Started processing of item %d of batch %d.", $itemId, $batchId); + } + + /** + * @inheritDoc + */ + public function itemProcessingEnded(int $itemId, int $batchId): void + { + $this->log("Ended processing of item %d of batch %d.", $itemId, $batchId); + } + + /** + * @param string $message + * @param mixed ...$arg + */ + private function log(string $message, ...$arg) + { + $this->logger->debug(sprintf($message, ...$arg)); + } +} diff --git a/app/src/SimpleBatchChild/SimpleBatchActivityInterface.php b/app/src/SimpleBatchChild/SimpleBatchActivityInterface.php new file mode 100755 index 0000000..aa04c6b --- /dev/null +++ b/app/src/SimpleBatchChild/SimpleBatchActivityInterface.php @@ -0,0 +1,66 @@ +batchActivity = Workflow::newActivityStub( + SimpleBatchActivityInterface::class, + ActivityOptions::new() + ->withStartToCloseTimeout(CarbonInterval::seconds(10)) + ->withScheduleToStartTimeout(CarbonInterval::seconds(10)) + ->withScheduleToCloseTimeout(CarbonInterval::minutes(30)) + ->withRetryOptions( + RetryOptions::new() + ->withMaximumAttempts(100) + ->withInitialInterval(CarbonInterval::second(10)) + ->withMaximumInterval(CarbonInterval::seconds(100)) + ) + ); + } + + /** + * @inheritDoc + */ + public function processItem(int $itemId, int $batchId) + { + // Set the item processing as started. + yield $this->batchActivity->itemProcessingStarted($itemId, $batchId); + + // This activity randomly throws an exception. + $output = yield $this->batchActivity->processItem($itemId, $batchId); + + // Set the item processing as ended. + yield $this->batchActivity->itemProcessingEnded($itemId, $batchId); + + return $output; + } +} diff --git a/app/src/SimpleBatchChild/SimpleBatchChildWorkflowInterface.php b/app/src/SimpleBatchChild/SimpleBatchChildWorkflowInterface.php new file mode 100644 index 0000000..b87e533 --- /dev/null +++ b/app/src/SimpleBatchChild/SimpleBatchChildWorkflowInterface.php @@ -0,0 +1,28 @@ +batchActivity = Workflow::newActivityStub( + SimpleBatchActivityInterface::class, + ActivityOptions::new() + ->withStartToCloseTimeout(CarbonInterval::seconds(10)) + ->withScheduleToStartTimeout(CarbonInterval::seconds(10)) + ->withScheduleToCloseTimeout(CarbonInterval::minutes(30)) + ->withRetryOptions( + RetryOptions::new() + ->withMaximumAttempts(100) + ->withInitialInterval(CarbonInterval::second(10)) + ->withMaximumInterval(CarbonInterval::seconds(100)) + ) + ); + } + + /** + * @inheritDoc + */ + public function start(int $batchId, array $options) + { + // Notify the batch processing start. + yield $this->batchActivity->batchProcessingStarted($batchId, $options); + + $itemIds = yield $this->batchActivity->getBatchItemIds($batchId, $options); + + $promises = []; + foreach($itemIds as $itemId) + { + // Set the batch item as pending. + $this->pending[$itemId] = true; + + $promises[$itemId] = Workflow::newChildWorkflowStub(SimpleBatchChildWorkflowInterface::class) + ->processItem($itemId, $batchId) + ->then( + fn($output) => $this->results[$itemId] = [ + 'success' => true, + 'output' => $output, + ], + fn(Throwable $e) => $this->results[$itemId] = [ + 'success' => false, + 'message' => $e->getMessage(), + ] + ) + // We are calling always() instead of finally() because the Temporal PHP SDK depends on + // react/promise 2.9. Will need to change to finally() when upgrading to react/promise 3.x. + ->always(fn() => $this->pending[$itemId] = false); + } + + // Wait for all the async calls to terminate. + yield Promise::all($promises); + + // Notify the batch processing end. + yield $this->batchActivity->batchProcessingEnded($batchId, $this->results); + + return $this->results; + } + + /** + * @inheritDoc + */ + public function getAvailableResults(): array + { + return $this->results; + } + + /** + * @inheritDoc + */ + public function getPendingTasks(): array + { + return array_keys(array_filter($this->pending, fn($pending) => $pending)); + } +} diff --git a/app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php b/app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php new file mode 100755 index 0000000..8db1af4 --- /dev/null +++ b/app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php @@ -0,0 +1,31 @@ +getArgument('batchId')); + + /** @var SimpleBatchWorkflowInterface */ + $workflow = $this->workflowClient->newRunningWorkflowStub( + SimpleBatchWorkflowInterface::class, + SimpleBatchWorkflowInterface::WORKFLOW_ID . ':' . $batchId + ); + + $results = $workflow->getAvailableResults(); + $pending = $workflow->getPendingTasks(); + $failedCount = count(array_filter($results, fn(array $result) => !$result['success'])); + + $output->writeln("SimpleBatchWorkflow (id $batchId) status"); + $output->writeln(json_encode([ + 'count' => [ + 'pending' => count($pending), + 'ended' => count($results), + 'succeeded' => count($results) - $failedCount, + 'failed' => $failedCount, + ], + 'tasks' => [ + 'pending' => $pending, + 'results' => $results, + ], + ], JSON_PRETTY_PRINT)); + + return self::SUCCESS; + } +} From 6d33acef2687cc061e65cc3d1fe304059224c9e0 Mon Sep 17 00:00:00 2001 From: Thierry Feuzeu Date: Tue, 15 Apr 2025 15:55:38 +0200 Subject: [PATCH 6/7] Change workflow name. --- app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php b/app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php index 8db1af4..5c8daed 100755 --- a/app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php +++ b/app/src/SimpleBatchChild/SimpleBatchWorkflowInterface.php @@ -20,7 +20,7 @@ interface SimpleBatchWorkflowInterface { public const WORKFLOW_ID = 'simple-batch-child-workflow'; - #[WorkflowMethod(name: "SimpleBatch")] + #[WorkflowMethod(name: "SimpleBatchChild")] public function start(int $batchId, array $options); #[QueryMethod] From 3d2a832bd9aef055fdd0e302d0bebd6635e27f6e Mon Sep 17 00:00:00 2001 From: Thierry Feuzeu Date: Thu, 28 Aug 2025 03:46:19 +0200 Subject: [PATCH 7/7] Change activity timeouts. --- app/src/SimpleBatch/SimpleBatchActivity.php | 2 +- app/src/SimpleBatch/SimpleBatchWorkflow.php | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/app/src/SimpleBatch/SimpleBatchActivity.php b/app/src/SimpleBatch/SimpleBatchActivity.php index 1ae0005..5b39701 100755 --- a/app/src/SimpleBatch/SimpleBatchActivity.php +++ b/app/src/SimpleBatch/SimpleBatchActivity.php @@ -48,7 +48,7 @@ public function processItem(int $itemId, int $batchId): int $random = random_int(0, 90); // Wait for max 1 second. - usleep($random % 10000); + usleep($random * 10000); // Randomly throw an error. if($random > 30) diff --git a/app/src/SimpleBatch/SimpleBatchWorkflow.php b/app/src/SimpleBatch/SimpleBatchWorkflow.php index 2ce1b0f..10b1b2c 100755 --- a/app/src/SimpleBatch/SimpleBatchWorkflow.php +++ b/app/src/SimpleBatch/SimpleBatchWorkflow.php @@ -40,9 +40,9 @@ public function __construct() $this->batchActivity = Workflow::newActivityStub( SimpleBatchActivityInterface::class, ActivityOptions::new() - ->withStartToCloseTimeout(CarbonInterval::hours(6)) - ->withScheduleToStartTimeout(CarbonInterval::hours(4)) - ->withScheduleToCloseTimeout(CarbonInterval::hours(6)) + ->withStartToCloseTimeout(CarbonInterval::seconds(10)) + ->withScheduleToStartTimeout(CarbonInterval::seconds(10)) + ->withScheduleToCloseTimeout(CarbonInterval::hours(4)) ->withRetryOptions( RetryOptions::new() ->withMaximumAttempts(100) @@ -93,7 +93,7 @@ function() use($itemId, $batchId) { ] ) // We are calling always() instead of finally() because the Temporal PHP SDK depends on - // react/promise 2.9. Will need to change to finally() when upgrading to react/promise 3.x. + // react/promise 2.9. Need to be changed to finally() after upgrade to react/promise 3.x. ->always(fn() => $this->pending[$itemId] = false); }