Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions app/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
"minimum-stability": "dev",
"prefer-stable": true,
"require": {
"temporal/sdk": "^2.12.0",
"spiral/tokenizer": "^3.7",
"spiral/tokenizer": "^3.14.5",
"temporal/sdk": "^2.13",
"temporal/open-telemetry-interceptors": "dev-master",
"open-telemetry/exporter-otlp": "^1.1",
"open-telemetry/transport-grpc": "^1.1",
"symfony/console": "^5.4 || ^6.0 || ^7.0"
},
"require-dev": {
"buggregator/trap": "^1.3",
"buggregator/trap": "^1.13",
"internal/dload": "^1.0",
"phpunit/phpunit": "^10.5"
},
Expand Down
743 changes: 333 additions & 410 deletions app/composer.lock

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions app/src/SafeMessageHandlers/DTO/AssignNodesToJobInput.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class AssignNodesToJobInput
{
/**
* @param string[] $nodes
*/
public function __construct(
public array $nodes,
public string $jobName,
) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

/**
* Be in the habit of storing message inputs and outputs in serializable structures.
* This makes it easier to add more over time in a backward-compatible way.
*/
final class ClusterManagerAssignNodesToJobInput
{
public function __construct(
public int $totalNumNodes,
public string $jobName,
) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class ClusterManagerAssignNodesToJobResult
{
/**
* @param string[] $nodesAssigned
*/
public function __construct(
public array $nodesAssigned,
) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class ClusterManagerDeleteJobInput
{
public function __construct(public string $jobName) {}
}
12 changes: 12 additions & 0 deletions app/src/SafeMessageHandlers/DTO/ClusterManagerInput.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class ClusterManagerInput {
public function __construct(
public ?ClusterManagerState $state = null,
public bool $testContinueAsNew = false,
) {}
}
13 changes: 13 additions & 0 deletions app/src/SafeMessageHandlers/DTO/ClusterManagerResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class ClusterManagerResult
{
public function __construct(
public int $count,
public int $count1,
) {}
}
15 changes: 15 additions & 0 deletions app/src/SafeMessageHandlers/DTO/ClusterManagerState.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class ClusterManagerState
{
public bool $clusterStarted = false;
public bool $clusterShutdown = false;
/** @var array<string, string|null> A [Node => Job Name] array*/
public array $nodes = [];
/** @var array<string> */
public array $jobsAssigned = [];
}
15 changes: 15 additions & 0 deletions app/src/SafeMessageHandlers/DTO/FindBadNodesInput.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class FindBadNodesInput
{
/**
* @param array $nodesToCheck
*/
public function __construct(
public array $nodesToCheck,
) {}
}
16 changes: 16 additions & 0 deletions app/src/SafeMessageHandlers/DTO/UnassignNodesForJobInput.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers\DTO;

final class UnassignNodesForJobInput
{
/**
* @param string[] $nodes
*/
public function __construct(
public array $nodes,
public string $jobName,
) {}
}
95 changes: 95 additions & 0 deletions app/src/SafeMessageHandlers/ExecuteCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers;

use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Temporal\Client\WorkflowOptions;
use Temporal\Common\IdReusePolicy;
use Temporal\Internal\Client\WorkflowProxy;
use Temporal\Samples\SafeMessageHandlers\DTO\ClusterManagerAssignNodesToJobInput;
use Temporal\Samples\SafeMessageHandlers\DTO\ClusterManagerDeleteJobInput;
use Temporal\Samples\SafeMessageHandlers\DTO\ClusterManagerInput;
use Temporal\SampleUtils\Command;

class ExecuteCommand extends Command
{
protected const NAME = 'safe-message-handlers';
protected const DESCRIPTION = 'Execute Safe Message Handlers Workflow';
protected const WORKFLOW_ID = 'safe-message-handlers';

protected const ARGUMENTS = [
['jobs', InputArgument::OPTIONAL, 'Jobs count', 6],
['continue', InputArgument::OPTIONAL, 'Test continue as new', false],
];

private InputInterface $input;
private OutputInterface $output;

public function execute(InputInterface $input, OutputInterface $output): int
{
$this->input = $input;
$this->output = $output;

$jobs = max(1, (int) $input->getArgument('jobs'));
$testContinueAsNew = (bool) $input->getArgument('continue');
$delay = 0.5;

$workflow = $this->workflowClient->newWorkflowStub(
MessageHandlerWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId(self::WORKFLOW_ID)
->withWorkflowIdReusePolicy(IdReusePolicy::TerminateIfRunning),
);

$output->writeln("Starting <comment>MessageHandlerWorkflow</comment>... ");

$this->workflowClient->start($workflow, new ClusterManagerInput(testContinueAsNew: $testContinueAsNew));
$this->doClusterLifecycle($workflow, $delay, $jobs);

return self::SUCCESS;
}

private function doClusterLifecycle(
MessageHandlerWorkflowInterface|WorkflowProxy $workflow,
float $delay,
int $jobs = 6,
) {
$untyped = $workflow->__getUntypedStub();
$workflow->startCluster();

$this->output->writeln('Assign jobs to nodes...');
$handle = [];
for ($i = 0; $i < $jobs; $i++) {
$handle[] = $untyped->startUpdate(
'assign_nodes_to_job',
new ClusterManagerAssignNodesToJobInput(2, "task-$i"),
);
}
// await
foreach ($handle as $h) {
$h->getResult();
}

$this->output->writeln("Sleeping for $delay second(s)");
$delay > 0 and \usleep((int) ($delay * 1000000));

$this->output->writeln('Deleting jobs...');
$handle = [];
for ($i = 0; $i < $jobs; $i++) {
$handle[] = $untyped->startUpdate(
'delete_job',
new ClusterManagerDeleteJobInput("task-$i"),
);
}
// await
foreach ($handle as $h) {
$h->getResult();
}

$workflow->shutdownCluster();
}
}
51 changes: 51 additions & 0 deletions app/src/SafeMessageHandlers/MessageHandlerActivity.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers;

use Temporal\Activity\ActivityInterface;
use Temporal\Activity\ActivityMethod;
use Temporal\Samples\SafeMessageHandlers\DTO\AssignNodesToJobInput;
use Temporal\Samples\SafeMessageHandlers\DTO\FindBadNodesInput;
use Temporal\Samples\SafeMessageHandlers\DTO\UnassignNodesForJobInput;

#[ActivityInterface]
class MessageHandlerActivity implements MessageHandlerActivityInterface
{
#[ActivityMethod(name: 'assign_nodes_to_job')]
public function assignNodesToJob(AssignNodesToJobInput $input): void
{
\trap('Assigning nodes ' . \implode(', ', $input->nodes) . ' to job ' . $input->jobName);
\sleep(1);
}

#[ActivityMethod(name: 'unassign_nodes_for_job')]
public function unassign_nodes_for_job(UnassignNodesForJobInput $input): void
{
\trap('Deallocating nodes ' . \implode(', ', $input->nodes) . ' from job ' . $input->jobName);
\sleep(1);
}

/**
* @return string[]
*/
#[ActivityMethod(name: 'find_bad_nodes')]
public function find_bad_nodes(FindBadNodesInput $input): array
{
\sleep(1);
$badNodes = [];
foreach ($input->nodesToCheck as $node) {
if ((int) $node % 5 === 0) {
$badNodes[] = $node;
}
}

\trap(\count($badNodes) > 0
? 'Found bad nodes: ' . \implode(', ', $badNodes)
: 'No new bad nodes found.',
);

return $badNodes;
}
}
27 changes: 27 additions & 0 deletions app/src/SafeMessageHandlers/MessageHandlerActivityInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Temporal\Samples\SafeMessageHandlers;

use Temporal\Activity\ActivityInterface;
use Temporal\Activity\ActivityMethod;
use Temporal\Samples\SafeMessageHandlers\DTO\AssignNodesToJobInput;
use Temporal\Samples\SafeMessageHandlers\DTO\FindBadNodesInput;
use Temporal\Samples\SafeMessageHandlers\DTO\UnassignNodesForJobInput;

#[ActivityInterface]
interface MessageHandlerActivityInterface
{
#[ActivityMethod(name: 'assign_nodes_to_job')]
public function assignNodesToJob(AssignNodesToJobInput $input): void;

#[ActivityMethod(name: 'unassign_nodes_for_job')]
public function unassign_nodes_for_job(UnassignNodesForJobInput $input): void;

/**
* @return string[]
*/
#[ActivityMethod(name: 'find_bad_nodes')]
public function find_bad_nodes(FindBadNodesInput $input): array;
}
Loading