Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions lib/Consumer/LibCurl.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,37 @@ private function performHttpRequest($payload, $sampleMessage)
$payload = gzencode($payload);
}

$userAgent = sprintf('%s/%s',
$sampleMessage['library'] ?? 'PostHog-PHP',
$sampleMessage['library_version'] ?? 'Unknown'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the version from a version.php file somehow?

);

$response = $this->httpClient->sendRequest(
'/batch/',
$payload,
[
"User-Agent: {$sampleMessage['library']}/{$sampleMessage['library_version']}",
"User-Agent: {$userAgent}",
],
[
'shouldVerify' => $this->options['verify_batch_events_request'] ?? true,
]
);

// Return boolean based on whether we got a response
return !empty($response->getResponse());
$responseCode = $response->getResponseCode();
$success = $responseCode >= 200 && $responseCode < 300;

if (!$success) {
$this->handleError(
'batch_delivery_failed',
sprintf(
'Batch delivery failed with HTTP %d. Payload size: %d bytes. Will retry if attempts remain.',
$responseCode,
strlen($payload)
)
);
}

return $success;
}

}
218 changes: 208 additions & 10 deletions lib/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ abstract class QueueConsumer extends Consumer
protected $type = "QueueConsumer";

protected $queue;
protected $failed_queue = array();
protected $max_queue_size = 1000;
protected $batch_size = 100;
protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s
protected $max_retry_attempts = 3;
protected $max_failed_queue_size = 1000;
protected $initial_retry_delay = 60; // Initial retry delay in seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very high, retrying in a couple seconds usually makes sense if you make them exponentially grow

protected $host = "app.posthog.com";
protected $compress_request = false;

Expand All @@ -34,6 +38,18 @@ public function __construct($apiKey, $options = array())
$this->maximum_backoff_duration = (int) $options["maximum_backoff_duration"];
}

if (isset($options["max_retry_attempts"])) {
$this->max_retry_attempts = (int) $options["max_retry_attempts"];
}

if (isset($options["max_failed_queue_size"])) {
$this->max_failed_queue_size = (int) $options["max_failed_queue_size"];
}

if (isset($options["initial_retry_delay"])) {
$this->initial_retry_delay = (int) $options["initial_retry_delay"];
}

if (isset($options["host"])) {
$this->host = $options["host"];

Expand All @@ -48,6 +64,7 @@ public function __construct($apiKey, $options = array())
}

$this->queue = array();
$this->failed_queue = array();
}

public function __destruct()
Expand Down Expand Up @@ -92,27 +109,168 @@ public function alias(array $message)
/**
* Flushes our queue of messages by batching them to the server
*/
public function flush()
public function flush(): bool
{
$count = count($this->queue);
$overallSuccess = true;
// First, try to retry any failed batches
$this->retryFailedBatches();

// If no new messages, we're done
if (empty($this->queue)) {
return true;
}

while ($count > 0) {
$batch = array_splice($this->queue, 0, min($this->batch_size, $count));
$batchSuccess = $this->flushBatch($batch);
// Process messages batch by batch, maintaining transactional behavior
$overallSuccess = true;
$initialQueueSize = count($this->queue);

// Track overall success but continue processing remaining batches
// This ensures we attempt to send all queued events even if some batches fail
if (!$batchSuccess) {
while (!empty($this->queue)) {
$queueSizeBefore = count($this->queue);
$batchSize = min($this->batch_size, $queueSizeBefore);
$batch = array_slice($this->queue, 0, $batchSize);

if ($this->flushBatchWithRetry($batch)) {
// Success: remove these messages from queue
$this->queue = array_slice($this->queue, $batchSize);
} else {
// Failed: move to failed queue and remove from main queue
$this->addToFailedQueue($batch);
$this->queue = array_slice($this->queue, $batchSize);
$overallSuccess = false;
}

$count = count($this->queue);
// Safety check: ensure queue size is actually decreasing
$queueSizeAfter = count($this->queue);
if ($queueSizeAfter >= $queueSizeBefore) {
// This should never happen, but prevents infinite loops
$this->handleError('flush_safety_break',
sprintf('Queue size not decreasing: before=%d, after=%d. Breaking to prevent infinite loop.',
$queueSizeBefore, $queueSizeAfter));
break;
}
}

return $overallSuccess;
}

/**
* Flush a batch with immediate retry logic
*/
protected function flushBatchWithRetry(array $batch): bool
{
$backoff = 100; // Start with 100ms

for ($attempt = 0; $attempt < $this->max_retry_attempts; $attempt++) {
if ($attempt > 0) {
usleep($backoff * 1000); // Wait with exponential backoff
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we really sleep? Is PHP multithreaded or will this block the whole server? I dont like this, we should always yield back control when possible

$backoff = min($backoff * 2, $this->maximum_backoff_duration);
}

if ($this->flushBatch($batch)) {
return true;
}
}

return false;
}

/**
* Add batch to failed queue for later retry
*/
protected function addToFailedQueue(array $batch): void
{
// Prevent memory issues by limiting failed queue size
if (count($this->failed_queue) >= $this->max_failed_queue_size) {
array_shift($this->failed_queue); // Remove oldest
$this->handleError('failed_queue_overflow',
'Failed queue size limit reached. Dropping oldest failed batch.');
}

$this->failed_queue[] = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an append? PHP is crazy lol

'messages' => $batch,
'attempts' => 0,
'next_retry' => time() + $this->initial_retry_delay,
'created_at' => time()
];
}

/**
* Retry failed batches that are ready for retry
*/
protected function retryFailedBatches(): void
{
if (empty($this->failed_queue)) {
return;
}

$currentTime = time();
$remainingFailed = [];

foreach ($this->failed_queue as $failedBatch) {
if (!$this->isReadyForRetry($failedBatch, $currentTime)) {
$remainingFailed[] = $failedBatch;
continue;
}

if ($this->retryFailedBatch($failedBatch)) {
// Success - don't add back to queue
continue;
}

// Still failed - update for next retry or mark as permanent failure
$updatedBatch = $this->updateFailedBatch($failedBatch, $currentTime);
if ($updatedBatch !== null) {
$remainingFailed[] = $updatedBatch;
}
}

$this->failed_queue = $remainingFailed;
}

/**
* Check if a failed batch is ready for retry
*/
private function isReadyForRetry(array $failedBatch, int $currentTime): bool
{
return $failedBatch['next_retry'] <= $currentTime &&
$failedBatch['attempts'] < $this->max_retry_attempts;
}

/**
* Attempt to retry a single failed batch
*/
private function retryFailedBatch(array $failedBatch): bool
{
if ($this->flushBatch($failedBatch['messages'])) {
$this->handleError('batch_retry_success',
sprintf('Successfully retried batch after %d failed attempts', $failedBatch['attempts']));
return true;
}
return false;
}

/**
* Update failed batch for next retry or mark as permanently failed
* @return array|null Updated batch or null if permanently failed
*/
private function updateFailedBatch(array $failedBatch, int $currentTime): ?array
{
$failedBatch['attempts']++;

if ($failedBatch['attempts'] >= $this->max_retry_attempts) {
// Permanently failed
$this->handleError('batch_permanently_failed',
sprintf('Batch permanently failed after %d attempts, %d messages lost',
$this->max_retry_attempts, count($failedBatch['messages'])));
return null;
}

// Calculate next retry time with exponential backoff (capped at 1 hour)
$backoffMinutes = min(pow(2, $failedBatch['attempts']), 60);
$failedBatch['next_retry'] = $currentTime + ($backoffMinutes * 60);

return $failedBatch;
}

/**
* Adds an item to our queue.
* @param mixed $item
Expand Down Expand Up @@ -149,4 +307,44 @@ protected function payload($batch)
"api_key" => $this->apiKey,
);
}

/**
* Get statistics about failed queue for observability
*/
public function getFailedQueueStats(): array
{
$totalMessages = 0;
$oldestRetry = null;
$attemptCounts = [];

foreach ($this->failed_queue as $failedBatch) {
$totalMessages += count($failedBatch['messages']);

if ($oldestRetry === null || $failedBatch['next_retry'] < $oldestRetry) {
$oldestRetry = $failedBatch['next_retry'];
}

$attempts = $failedBatch['attempts'];
$attemptCounts[$attempts] = ($attemptCounts[$attempts] ?? 0) + 1;
}

return [
'failed_batches' => count($this->failed_queue),
'total_failed_messages' => $totalMessages,
'oldest_retry_time' => $oldestRetry,
'attempt_distribution' => $attemptCounts,
'current_queue_size' => count($this->queue),
'max_failed_queue_size' => $this->max_failed_queue_size,
];
}

/**
* Clear all failed queues (useful for testing or manual recovery)
*/
public function clearFailedQueue(): int
{
$clearedCount = count($this->failed_queue);
$this->failed_queue = [];
return $clearedCount;
}
}
5 changes: 5 additions & 0 deletions test/MockErrorHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public function getErrors()
{
return $this->errors;
}

public function clearErrors()
{
$this->errors = [];
}
}
40 changes: 40 additions & 0 deletions test/MockedHttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class MockedHttpClient extends \PostHog\HttpClient

private $flagEndpointResponse;
private $flagsEndpointResponse;
private $batchResponse;
private $batchResponses = [];

public function __construct(
string $host,
Expand Down Expand Up @@ -53,9 +55,47 @@ public function sendRequest(string $path, ?string $payload, array $extraHeaders
}

if (str_starts_with($path, "/batch/")) {
// Use configured response if available
if (!empty($this->batchResponses)) {
$response = array_shift($this->batchResponses);
return new HttpResponse($response[1], $response[0]);
}

if ($this->batchResponse !== null) {
return new HttpResponse($this->batchResponse[1], $this->batchResponse[0]);
}

return new HttpResponse('{"status":"Ok"}', 200);
}

return parent::sendRequest($path, $payload, $extraHeaders, $requestOptions);
}

/**
* Set a single response for batch requests
* @param int $statusCode
* @param string $body
*/
public function setResponse(int $statusCode, string $body): void
{
$this->batchResponse = [$statusCode, $body];
}

/**
* Set multiple responses for batch requests (used in sequence)
* @param array $responses Array of [statusCode, body] pairs
*/
public function setResponses(array $responses): void
{
$this->batchResponses = $responses;
}

/**
* Reset all configured responses
*/
public function resetResponses(): void
{
$this->batchResponse = null;
$this->batchResponses = [];
}
}
Loading