-
Notifications
You must be signed in to change notification settings - Fork 25
feat: Add transactional queue system to prevent data loss #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: fix/handle-flushing-events-with-32kb-limit
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,9 +7,13 @@ abstract class QueueConsumer extends Consumer | |
| protected $type = "QueueConsumer"; | ||
|
|
||
| protected $queue; | ||
| protected $failed_queue = array(); | ||
| protected $max_queue_size = 1000; | ||
| protected $batch_size = 100; | ||
| protected $maximum_backoff_duration = 10000; // Set maximum waiting limit to 10s | ||
| protected $max_retry_attempts = 3; | ||
| protected $max_failed_queue_size = 1000; | ||
| protected $initial_retry_delay = 60; // Initial retry delay in seconds | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very high, retrying in a couple seconds usually makes sense if you make them exponentially grow |
||
| protected $host = "app.posthog.com"; | ||
| protected $compress_request = false; | ||
|
|
||
|
|
@@ -34,6 +38,18 @@ public function __construct($apiKey, $options = array()) | |
| $this->maximum_backoff_duration = (int) $options["maximum_backoff_duration"]; | ||
| } | ||
|
|
||
| if (isset($options["max_retry_attempts"])) { | ||
| $this->max_retry_attempts = (int) $options["max_retry_attempts"]; | ||
| } | ||
|
|
||
| if (isset($options["max_failed_queue_size"])) { | ||
| $this->max_failed_queue_size = (int) $options["max_failed_queue_size"]; | ||
| } | ||
|
|
||
| if (isset($options["initial_retry_delay"])) { | ||
| $this->initial_retry_delay = (int) $options["initial_retry_delay"]; | ||
| } | ||
|
|
||
| if (isset($options["host"])) { | ||
| $this->host = $options["host"]; | ||
|
|
||
|
|
@@ -48,6 +64,7 @@ public function __construct($apiKey, $options = array()) | |
| } | ||
|
|
||
| $this->queue = array(); | ||
| $this->failed_queue = array(); | ||
| } | ||
|
|
||
| public function __destruct() | ||
|
|
@@ -92,27 +109,168 @@ public function alias(array $message) | |
| /** | ||
| * Flushes our queue of messages by batching them to the server | ||
| */ | ||
| public function flush() | ||
| public function flush(): bool | ||
| { | ||
| $count = count($this->queue); | ||
| $overallSuccess = true; | ||
| // First, try to retry any failed batches | ||
| $this->retryFailedBatches(); | ||
|
|
||
| // If no new messages, we're done | ||
| if (empty($this->queue)) { | ||
| return true; | ||
| } | ||
|
|
||
| while ($count > 0) { | ||
| $batch = array_splice($this->queue, 0, min($this->batch_size, $count)); | ||
| $batchSuccess = $this->flushBatch($batch); | ||
| // Process messages batch by batch, maintaining transactional behavior | ||
| $overallSuccess = true; | ||
| $initialQueueSize = count($this->queue); | ||
|
|
||
| // Track overall success but continue processing remaining batches | ||
| // This ensures we attempt to send all queued events even if some batches fail | ||
| if (!$batchSuccess) { | ||
| while (!empty($this->queue)) { | ||
| $queueSizeBefore = count($this->queue); | ||
| $batchSize = min($this->batch_size, $queueSizeBefore); | ||
| $batch = array_slice($this->queue, 0, $batchSize); | ||
|
|
||
| if ($this->flushBatchWithRetry($batch)) { | ||
| // Success: remove these messages from queue | ||
| $this->queue = array_slice($this->queue, $batchSize); | ||
| } else { | ||
| // Failed: move to failed queue and remove from main queue | ||
| $this->addToFailedQueue($batch); | ||
| $this->queue = array_slice($this->queue, $batchSize); | ||
| $overallSuccess = false; | ||
| } | ||
|
|
||
| $count = count($this->queue); | ||
| // Safety check: ensure queue size is actually decreasing | ||
| $queueSizeAfter = count($this->queue); | ||
| if ($queueSizeAfter >= $queueSizeBefore) { | ||
| // This should never happen, but prevents infinite loops | ||
| $this->handleError('flush_safety_break', | ||
| sprintf('Queue size not decreasing: before=%d, after=%d. Breaking to prevent infinite loop.', | ||
| $queueSizeBefore, $queueSizeAfter)); | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| return $overallSuccess; | ||
| } | ||
|
|
||
| /** | ||
| * Flush a batch with immediate retry logic | ||
| */ | ||
| protected function flushBatchWithRetry(array $batch): bool | ||
| { | ||
| $backoff = 100; // Start with 100ms | ||
|
|
||
| for ($attempt = 0; $attempt < $this->max_retry_attempts; $attempt++) { | ||
| if ($attempt > 0) { | ||
| usleep($backoff * 1000); // Wait with exponential backoff | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we really sleep? Is PHP multithreaded or will this block the whole server? I dont like this, we should always yield back control when possible |
||
| $backoff = min($backoff * 2, $this->maximum_backoff_duration); | ||
| } | ||
|
|
||
| if ($this->flushBatch($batch)) { | ||
| return true; | ||
| } | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Add batch to failed queue for later retry | ||
| */ | ||
| protected function addToFailedQueue(array $batch): void | ||
| { | ||
| // Prevent memory issues by limiting failed queue size | ||
| if (count($this->failed_queue) >= $this->max_failed_queue_size) { | ||
| array_shift($this->failed_queue); // Remove oldest | ||
| $this->handleError('failed_queue_overflow', | ||
| 'Failed queue size limit reached. Dropping oldest failed batch.'); | ||
| } | ||
|
|
||
| $this->failed_queue[] = [ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an append? PHP is crazy lol |
||
| 'messages' => $batch, | ||
| 'attempts' => 0, | ||
| 'next_retry' => time() + $this->initial_retry_delay, | ||
| 'created_at' => time() | ||
| ]; | ||
| } | ||
|
|
||
| /** | ||
| * Retry failed batches that are ready for retry | ||
| */ | ||
| protected function retryFailedBatches(): void | ||
| { | ||
| if (empty($this->failed_queue)) { | ||
| return; | ||
| } | ||
|
|
||
| $currentTime = time(); | ||
| $remainingFailed = []; | ||
|
|
||
| foreach ($this->failed_queue as $failedBatch) { | ||
| if (!$this->isReadyForRetry($failedBatch, $currentTime)) { | ||
| $remainingFailed[] = $failedBatch; | ||
| continue; | ||
| } | ||
|
|
||
| if ($this->retryFailedBatch($failedBatch)) { | ||
| // Success - don't add back to queue | ||
| continue; | ||
| } | ||
|
|
||
| // Still failed - update for next retry or mark as permanent failure | ||
| $updatedBatch = $this->updateFailedBatch($failedBatch, $currentTime); | ||
| if ($updatedBatch !== null) { | ||
| $remainingFailed[] = $updatedBatch; | ||
| } | ||
| } | ||
|
|
||
| $this->failed_queue = $remainingFailed; | ||
| } | ||
|
|
||
| /** | ||
| * Check if a failed batch is ready for retry | ||
| */ | ||
| private function isReadyForRetry(array $failedBatch, int $currentTime): bool | ||
| { | ||
| return $failedBatch['next_retry'] <= $currentTime && | ||
| $failedBatch['attempts'] < $this->max_retry_attempts; | ||
| } | ||
|
|
||
| /** | ||
| * Attempt to retry a single failed batch | ||
| */ | ||
| private function retryFailedBatch(array $failedBatch): bool | ||
| { | ||
| if ($this->flushBatch($failedBatch['messages'])) { | ||
| $this->handleError('batch_retry_success', | ||
| sprintf('Successfully retried batch after %d failed attempts', $failedBatch['attempts'])); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Update failed batch for next retry or mark as permanently failed | ||
| * @return array|null Updated batch or null if permanently failed | ||
| */ | ||
| private function updateFailedBatch(array $failedBatch, int $currentTime): ?array | ||
| { | ||
| $failedBatch['attempts']++; | ||
|
|
||
| if ($failedBatch['attempts'] >= $this->max_retry_attempts) { | ||
| // Permanently failed | ||
| $this->handleError('batch_permanently_failed', | ||
| sprintf('Batch permanently failed after %d attempts, %d messages lost', | ||
| $this->max_retry_attempts, count($failedBatch['messages']))); | ||
| return null; | ||
| } | ||
|
|
||
| // Calculate next retry time with exponential backoff (capped at 1 hour) | ||
| $backoffMinutes = min(pow(2, $failedBatch['attempts']), 60); | ||
| $failedBatch['next_retry'] = $currentTime + ($backoffMinutes * 60); | ||
|
|
||
| return $failedBatch; | ||
| } | ||
|
|
||
| /** | ||
| * Adds an item to our queue. | ||
| * @param mixed $item | ||
|
|
@@ -149,4 +307,44 @@ protected function payload($batch) | |
| "api_key" => $this->apiKey, | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Get statistics about failed queue for observability | ||
| */ | ||
| public function getFailedQueueStats(): array | ||
| { | ||
| $totalMessages = 0; | ||
| $oldestRetry = null; | ||
| $attemptCounts = []; | ||
|
|
||
| foreach ($this->failed_queue as $failedBatch) { | ||
| $totalMessages += count($failedBatch['messages']); | ||
|
|
||
| if ($oldestRetry === null || $failedBatch['next_retry'] < $oldestRetry) { | ||
| $oldestRetry = $failedBatch['next_retry']; | ||
| } | ||
|
|
||
| $attempts = $failedBatch['attempts']; | ||
| $attemptCounts[$attempts] = ($attemptCounts[$attempts] ?? 0) + 1; | ||
| } | ||
|
|
||
| return [ | ||
| 'failed_batches' => count($this->failed_queue), | ||
| 'total_failed_messages' => $totalMessages, | ||
| 'oldest_retry_time' => $oldestRetry, | ||
| 'attempt_distribution' => $attemptCounts, | ||
| 'current_queue_size' => count($this->queue), | ||
| 'max_failed_queue_size' => $this->max_failed_queue_size, | ||
| ]; | ||
| } | ||
|
|
||
| /** | ||
| * Clear all failed queues (useful for testing or manual recovery) | ||
| */ | ||
| public function clearFailedQueue(): int | ||
| { | ||
| $clearedCount = count($this->failed_queue); | ||
| $this->failed_queue = []; | ||
| return $clearedCount; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,4 +28,9 @@ public function getErrors() | |
| { | ||
| return $this->errors; | ||
| } | ||
|
|
||
| public function clearErrors() | ||
| { | ||
| $this->errors = []; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get the version from a version.php file somehow?