Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions app/src/SimpleBatch/ExecuteCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Samples\SimpleBatch;

use Carbon\CarbonInterval;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Temporal\Client\WorkflowOptions;
use Temporal\Exception\Client\WorkflowExecutionAlreadyStartedException;
use Temporal\SampleUtils\Command;

class ExecuteCommand extends Command
{
protected const NAME = 'simple-batch:start';

protected const DESCRIPTION = 'Start SimpleBatchWorkflow';

protected const ARGUMENTS = [
['batchId', InputArgument::REQUIRED, 'The batch id'],
];

protected const OPTIONS = [
['min', null, InputOption::VALUE_REQUIRED, 'The minimum number of batch items', 10],
['max', null, InputOption::VALUE_REQUIRED, 'The maximum number of batch items', 20],
];

public function execute(InputInterface $input, OutputInterface $output): int
{
$batchId = intval($input->getArgument('batchId'));
$options = [
'min' => intval($input->getOption('min')),
'max' => intval($input->getOption('max')),
];

$workflow = $this->workflowClient->newWorkflowStub(
SimpleBatchWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId(SimpleBatchWorkflowInterface::WORKFLOW_ID . ':' . $batchId)
->withWorkflowExecutionTimeout(CarbonInterval::week())
);

$output->writeln("Starting <comment>SimpleBatchWorkflow</comment>... ");

try {
$run = $this->workflowClient->start($workflow, $batchId, $options);
$output->writeln(
sprintf(
'Started: WorkflowID=<fg=magenta>%s</fg=magenta>, RunID=<fg=magenta>%s</fg=magenta>',
$run->getExecution()->getID(),
$run->getExecution()->getRunID(),
)
);
} catch (WorkflowExecutionAlreadyStartedException $e) {
$output->writeln('<fg=red>Still running</fg=red>');
}

return self::SUCCESS;
}
}
17 changes: 17 additions & 0 deletions app/src/SimpleBatch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Simple batch sample

This sample demonstrates a simple batch processing.

Run the following command to start a batch with a number of items randomly chosen between given min and max values:

```bash
php ./app/app.php simple-batch:start <batchId> [--min <min item count>] [--max <max item count>]
```

The minimum and maximum item count resp. default to 10 and 20.

Run the following command to show the batch status:

```bash
php ./app/app.php simple-batch:status <batchId>
```
109 changes: 109 additions & 0 deletions app/src/SimpleBatch/SimpleBatchActivity.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Samples\SimpleBatch;

use Exception;
use Psr\Log\LoggerInterface;
use Temporal\SampleUtils\Logger;

class SimpleBatchActivity implements SimpleBatchActivityInterface
{
/**
* @var LoggerInterface
*/
private LoggerInterface $logger;

public function __construct()
{
$this->logger = new Logger();
}

/**
* @inheritDoc
*/
public function getBatchItemIds(int $batchId, array $options): array
{
$minItemCount = $options['min'] ?? 10;
$maxItemCount = $options['max'] ?? 20;
// Return an array with between $minItemCount and $maxItemCount entries.
return array_map(fn(int $itemId) => ($batchId % 100) * 1000 + $itemId,
range(101, random_int(100 + $minItemCount, 100 + $maxItemCount)));
}

/**
* @inheritDoc
*/
public function processItem(int $itemId, int $batchId): int
{
$this->log("Processing item %d of batch %d.", $itemId, $batchId);

$random = random_int(0, 90);
// Wait for max 1 second.
usleep($random * 10000);

// Randomly throw an error.
if($random > 30)
{
throw new Exception(sprintf("Error while processing of item %d of batch %d.", $itemId, $batchId));
}
return $random;
}

/**
* @param int $batchId
* @param array $options
*
* @return void
*/
public function batchProcessingStarted(int $batchId, array $options): void
{
$this->log("Started processing of batch %d.", $batchId);
$this->log("Batch options: %s.", json_encode($options, JSON_PRETTY_PRINT));
}

/**
* @param int $batchId
* @param array $results
*
* @return void
*/
public function batchProcessingEnded(int $batchId, array $results): void
{
$this->log("Ended processing of batch %d.", $batchId);
$this->log("Batch results: %s.", json_encode($results, JSON_PRETTY_PRINT));
}

/**
* @inheritDoc
*/
public function itemProcessingStarted(int $itemId, int $batchId): void
{
$this->log("Started processing of item %d of batch %d.", $itemId, $batchId);
}

/**
* @inheritDoc
*/
public function itemProcessingEnded(int $itemId, int $batchId): void
{
$this->log("Ended processing of item %d of batch %d.", $itemId, $batchId);
}

/**
* @param string $message
* @param mixed ...$arg
*/
private function log(string $message, ...$arg)
{
$this->logger->debug(sprintf($message, ...$arg));
}
}
66 changes: 66 additions & 0 deletions app/src/SimpleBatch/SimpleBatchActivityInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Samples\SimpleBatch;

use Temporal\Activity\ActivityInterface;

#[ActivityInterface(prefix: "SimpleBatch.")]
interface SimpleBatchActivityInterface
{
/**
* @param int $batchId
* @param array $options
*
* @return array
*/
public function getBatchItemIds(int $batchId, array $options): array;

/**
* @param int $itemId
* @param int $batchId
*
* @return int
*/
public function processItem(int $itemId, int $batchId): int;

/**
* @param int $batchId
* @param array $options
*
* @return void
*/
public function batchProcessingStarted(int $batchId, array $options): void;

/**
* @param int $batchId
* @param array $results
*
* @return void
*/
public function batchProcessingEnded(int $batchId, array $results): void;

/**
* @param int $itemId
* @param int $batchId
*
* @return void
*/
public function itemProcessingStarted(int $itemId, int $batchId): void;

/**
* @param int $itemId
* @param int $batchId
*
* @return void
*/
public function itemProcessingEnded(int $itemId, int $batchId): void;
}
124 changes: 124 additions & 0 deletions app/src/SimpleBatch/SimpleBatchWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Samples\SimpleBatch;

use Carbon\CarbonInterval;
use Temporal\Activity\ActivityOptions;
use Temporal\Common\RetryOptions;
use Temporal\Promise;
use Temporal\Workflow;
use Throwable;

class SimpleBatchWorkflow implements SimpleBatchWorkflowInterface
{
/**
* @var array
*/
private array $results = [];

/**
* @var array
*/
private array $pending = [];

/**
* @var SimpleBatchActivityInterface
*/
private $batchActivity;

public function __construct()
{
$this->batchActivity = Workflow::newActivityStub(
SimpleBatchActivityInterface::class,
ActivityOptions::new()
->withStartToCloseTimeout(CarbonInterval::seconds(10))
->withScheduleToStartTimeout(CarbonInterval::seconds(10))
->withScheduleToCloseTimeout(CarbonInterval::minutes(30))
->withRetryOptions(
RetryOptions::new()
->withMaximumAttempts(100)
->withInitialInterval(CarbonInterval::second(10))
->withMaximumInterval(CarbonInterval::seconds(100))
)
);
}

/**
* @inheritDoc
*/
public function start(int $batchId, array $options)
{
// Notify the batch processing start.
yield $this->batchActivity->batchProcessingStarted($batchId, $options);

$itemIds = yield $this->batchActivity->getBatchItemIds($batchId, $options);

$promises = [];
foreach($itemIds as $itemId)
{
// Set the batch item as pending.
$this->pending[$itemId] = true;
// Process the batch item.
$promises[$itemId] = Workflow::async(
function() use($itemId, $batchId) {
// Notify the item processing start.
yield $this->batchActivity->itemProcessingStarted($itemId, $batchId);

// This activity randomly throws an exception.
$output = yield $this->batchActivity->processItem($itemId, $batchId);

// Notify the item processing end.
yield $this->batchActivity->itemProcessingEnded($itemId, $batchId);

return $output;
}
)
->then(
fn($output) => $this->results[$itemId] = [
'success' => true,
'output' => $output,
],
fn(Throwable $e) => $this->results[$itemId] = [
'success' => false,
'message' => $e->getMessage(),
]
)
// We are calling always() instead of finally() because the Temporal PHP SDK depends on
// react/promise 2.9. Need to be changed to finally() after upgrade to react/promise 3.x.
->always(fn() => $this->pending[$itemId] = false);
}

// Wait for all the async calls to terminate.
yield Promise::all($promises);

// Notify the batch processing end.
yield $this->batchActivity->batchProcessingEnded($batchId, $this->results);

return $this->results;
}

/**
* @inheritDoc
*/
public function getAvailableResults(): array
{
return $this->results;
}

/**
* @inheritDoc
*/
public function getPendingTasks(): array
{
return array_keys(array_filter($this->pending, fn($pending) => $pending));
}
}
Loading
Loading