From 0eea3ba6cb6dc14850b555bdae1d6cabef3ee004 Mon Sep 17 00:00:00 2001 From: Luke Kuzmish <42181698+cosmastech@users.noreply.github.com> Date: Mon, 11 Aug 2025 21:37:16 -0400 Subject: [PATCH 1/6] socketFailureHandler --- src/DogStatsd.php | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/DogStatsd.php b/src/DogStatsd.php index 2abb59b..0c218ee 100644 --- a/src/DogStatsd.php +++ b/src/DogStatsd.php @@ -58,6 +58,10 @@ class DogStatsd * @var string The container ID field, used for origin detection */ private $containerID; + /** + * @var (callable(\Throwable))|null The closure which is executed when there is a failure flushing metrics. + */ + private $socketFailureHandler = null; // Telemetry private $disable_telemetry; @@ -98,7 +102,8 @@ class DogStatsd * metric_prefix?: string, * disable_telemetry?: bool, * container_id?: string, - * origin_detection?: bool + * origin_detection?: bool, + * socket_failure_handler?: callable(\Throwable) * } $config */ public function __construct(array $config = array()) @@ -180,6 +185,8 @@ public function __construct(array $config = array()) $containerID = isset($config["container_id"]) ? $config["container_id"] : ""; $this->containerID = $originDetection->getContainerID($containerID, $originDetectionEnabled); + + $this->socketFailureHandler = isset($config['socket_failure_handler']) ? $config['socket_failure_handler'] : null; } /** @@ -647,20 +654,28 @@ public function flush($message) { $message .= $this->flushTelemetry(); - // Non - Blocking UDP I/O - Use IP Addresses! - if (!is_null($this->socketPath)) { - $socket = socket_create(AF_UNIX, SOCK_DGRAM, 0); - } elseif (filter_var($this->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) { - $socket = socket_create(AF_INET6, SOCK_DGRAM, SOL_UDP); - } else { - $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); - } - socket_set_nonblock($socket); - - if (!is_null($this->socketPath)) { - $res = socket_sendto($socket, $message, strlen($message), 0, $this->socketPath); - } else { - $res = socket_sendto($socket, $message, strlen($message), 0, $this->host, $this->port); + try { + // Non - Blocking UDP I/O - Use IP Addresses! + if (!is_null($this->socketPath)) { + $socket = socket_create(AF_UNIX, SOCK_DGRAM, 0); + } elseif (filter_var($this->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) { + $socket = socket_create(AF_INET6, SOCK_DGRAM, SOL_UDP); + } else { + $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); + } + socket_set_nonblock($socket); + + if (!is_null($this->socketPath)) { + $res = socket_sendto($socket, $message, strlen($message), 0, $this->socketPath); + } else { + $res = socket_sendto($socket, $message, strlen($message), 0, $this->host, $this->port); + } + } catch (\Throwable $e) { + if ($this->socketFailureHandler === null) { + throw $e; + } + call_user_func($this->socketFailureHandler, $e); + $res = false; } if ($res !== false) { From b4ed267b9b836102a34bb30a53f7d7558f666fbf Mon Sep 17 00:00:00 2001 From: Luke Kuzmish Date: Tue, 12 Aug 2025 08:57:22 -0400 Subject: [PATCH 2/6] updates --- src/DogStatsd.php | 16 +++++++++++----- tests/TestHelpers/SocketSpy.php | 9 +++++++++ tests/UnitTests/DogStatsd/SocketsTest.php | 21 +++++++++++++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/DogStatsd.php b/src/DogStatsd.php index 0c218ee..c66b100 100644 --- a/src/DogStatsd.php +++ b/src/DogStatsd.php @@ -3,6 +3,7 @@ namespace DataDog; use DataDog\OriginDetection; +use Throwable; /** * Datadog implementation of StatsD @@ -89,7 +90,8 @@ class DogStatsd * metric_prefix, * disable_telemetry, * container_id, - * origin_detecion + * origin_detection + * socket_failure_handler * * @param array{ * host?: string, @@ -103,7 +105,7 @@ class DogStatsd * disable_telemetry?: bool, * container_id?: string, * origin_detection?: bool, - * socket_failure_handler?: callable(\Throwable) + * socket_failure_handler?: callable * } $config */ public function __construct(array $config = array()) @@ -186,7 +188,9 @@ public function __construct(array $config = array()) $containerID = isset($config["container_id"]) ? $config["container_id"] : ""; $this->containerID = $originDetection->getContainerID($containerID, $originDetectionEnabled); - $this->socketFailureHandler = isset($config['socket_failure_handler']) ? $config['socket_failure_handler'] : null; + $this->socketFailureHandler = isset($config['socket_failure_handler']) + ? $config['socket_failure_handler'] + : null; } /** @@ -664,7 +668,7 @@ public function flush($message) $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP); } socket_set_nonblock($socket); - + if (!is_null($this->socketPath)) { $res = socket_sendto($socket, $message, strlen($message), 0, $this->socketPath); } else { @@ -687,7 +691,9 @@ public function flush($message) $this->packets_dropped += 1; } - socket_close($socket); + if (isset($socket)) { + socket_close($socket); + } } /** diff --git a/tests/TestHelpers/SocketSpy.php b/tests/TestHelpers/SocketSpy.php index a3f1c33..18bbeb0 100644 --- a/tests/TestHelpers/SocketSpy.php +++ b/tests/TestHelpers/SocketSpy.php @@ -42,6 +42,11 @@ class SocketSpy */ public $returnErrorOnSend = false; + /** + * @var null|callable + */ + public $errorThrownOnSend = null; + /** * @param int $domain * @param int $type @@ -88,6 +93,10 @@ public function socketSendtoWasCalledWithArgs( $addr, $port ) { + if ($this->errorThrownOnSend !== null) { + call_user_func($this->errorThrownOnSend, $socket, $buf, $len, $flags); + } + if ($this->returnErrorOnSend === true) { return false; } diff --git a/tests/UnitTests/DogStatsd/SocketsTest.php b/tests/UnitTests/DogStatsd/SocketsTest.php index 79ce558..5760b68 100644 --- a/tests/UnitTests/DogStatsd/SocketsTest.php +++ b/tests/UnitTests/DogStatsd/SocketsTest.php @@ -3,6 +3,7 @@ namespace DataDog\UnitTests\DogStatsd; use DateTime; +use ErrorException; use ReflectionProperty; use DataDog\DogStatsd; use DataDog\TestHelpers\SocketSpyTestCase; @@ -42,6 +43,7 @@ public function set_up() $this->oldOriginDetectionEnabled = getenv("DD_ORIGIN_DETECTION_ENABLED"); putenv("DD_EXTERNAL_ENV"); + $this->getSocketSpy()->errorThrownOnSend = null; } protected function tear_down() { @@ -1500,6 +1502,25 @@ public function testTelemetryNetworkError() $this->assertSameWithTelemetry('', $this->getSocketSpy()->argsFromSocketSendtoCalls[1][1], "", array("bytes_sent" => 677, "packets_sent" => 1, "metrics" => 0)); } + public function testCustomSocketFailureHandler() + { + $this->disableOriginDetectionLinux(); + + $errorStore = null; + $dog = new DogStatsd(array("disable_telemetry" => false, "socket_failure_handler" => function ($err) use (&$errorStore) { + $errorStore = $err; + })); + + $this->getSocketSpy()->errorThrownOnSend = function () { + throw new ErrorException('ErrorException: socket_sendto(): Unable to write to socket [111]: Connection refused'); + }; + $dog->increment('test'); + $this->assertNotNull($errorStore); + $this->getSocketSpy()->errorThrownOnSend = null; + $dog->flush(''); + $this->assertSameWithTelemetry('', $this->getSocketSpy()->argsFromSocketSendtoCalls[0][1], "", array("bytes_dropped" => 673, "packets_sent" => 0, "metrics" => 1, 'packets_dropped' => 1)); + } + public function testDecimalNormalization() { $this->disableOriginDetectionLinux(); From c3ea175caebdbe6bd066a2476949fe6e97bff06a Mon Sep 17 00:00:00 2001 From: Luke Kuzmish Date: Wed, 20 Aug 2025 17:26:26 -0400 Subject: [PATCH 3/6] more elegant? --- src/DogStatsd.php | 82 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 22 deletions(-) diff --git a/src/DogStatsd.php b/src/DogStatsd.php index c66b100..b7fa00f 100644 --- a/src/DogStatsd.php +++ b/src/DogStatsd.php @@ -3,7 +3,7 @@ namespace DataDog; use DataDog\OriginDetection; -use Throwable; +use Exception; /** * Datadog implementation of StatsD @@ -60,9 +60,13 @@ class DogStatsd */ private $containerID; /** - * @var (callable(\Throwable))|null The closure which is executed when there is a failure flushing metrics. + * @var (callable(\Throwable, string))|null The closure which is executed when there is a failure flushing metrics. */ - private $socketFailureHandler = null; + private $flushFailureHandler = null; + /** + * @var bool Whether the PHP Version of the app has the Throwable interface (>7.0)) + */ + private $phpVersionHasThrowableInterface = false; // Telemetry private $disable_telemetry; @@ -188,9 +192,11 @@ public function __construct(array $config = array()) $containerID = isset($config["container_id"]) ? $config["container_id"] : ""; $this->containerID = $originDetection->getContainerID($containerID, $originDetectionEnabled); - $this->socketFailureHandler = isset($config['socket_failure_handler']) + $this->flushFailureHandler = isset($config['socket_failure_handler']) ? $config['socket_failure_handler'] : null; + + $this->phpVersionHasThrowableInterface = version_compare(PHP_VERSION, '7.0.0', '>='); } /** @@ -654,10 +660,54 @@ public function report($message) $this->flush($message); } + /** + * @throws \Exception|\Throwable + */ public function flush($message) { $message .= $this->flushTelemetry(); + if ($this->phpVersionHasThrowableInterface) { + try { + $res = $this->writeToSocket($message); + } catch (\Throwable $e) { + $res = false; + if ($this->flushFailureHandler !== null) { + call_user_func($this->flushFailureHandler, $e, $message); + } else { + throw $e; + } + } + } else { + try { + $res = $this->writeToSocket($message); + } catch (Exception $e) { + $res = false; + if ($this->flushFailureHandler !== null) { + call_user_func($this->flushFailureHandler, $e, $message); + } else { + throw $e; + } + } + } + + if ($res !== false) { + $this->resetTelemetry(); + $this->bytes_sent += strlen($message); + $this->packets_sent += 1; + } else { + $this->bytes_dropped += strlen($message); + $this->packets_dropped += 1; + } + } + + /** + * @param string $message + * @return false|int + * @throws \Exception|\Throwable + */ + protected function writeToSocket($message) + { try { // Non - Blocking UDP I/O - Use IP Addresses! if (!is_null($this->socketPath)) { @@ -674,28 +724,16 @@ public function flush($message) } else { $res = socket_sendto($socket, $message, strlen($message), 0, $this->host, $this->port); } - } catch (\Throwable $e) { - if ($this->socketFailureHandler === null) { - throw $e; - } - call_user_func($this->socketFailureHandler, $e); - $res = false; - } - - if ($res !== false) { - $this->resetTelemetry(); - $this->bytes_sent += strlen($message); - $this->packets_sent += 1; - } else { - $this->bytes_dropped += strlen($message); - $this->packets_dropped += 1; - } - if (isset($socket)) { - socket_close($socket); + return $res; + } finally { + if (isset($socket)) { + socket_close($socket); + } } } + /** * Formats $vals array into event for submission to Datadog via UDP * From 6ac918a4d71d49ad5f27e2bb33cb12913f2881b8 Mon Sep 17 00:00:00 2001 From: Luke Kuzmish Date: Wed, 20 Aug 2025 17:29:55 -0400 Subject: [PATCH 4/6] tests --- src/DogStatsd.php | 24 +++++++++++------------ tests/UnitTests/DogStatsd/SocketsTest.php | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/DogStatsd.php b/src/DogStatsd.php index b7fa00f..3ccf5c7 100644 --- a/src/DogStatsd.php +++ b/src/DogStatsd.php @@ -95,7 +95,7 @@ class DogStatsd * disable_telemetry, * container_id, * origin_detection - * socket_failure_handler + * flush_failure_handler * * @param array{ * host?: string, @@ -109,7 +109,7 @@ class DogStatsd * disable_telemetry?: bool, * container_id?: string, * origin_detection?: bool, - * socket_failure_handler?: callable + * flush_failure_handler?: callable * } $config */ public function __construct(array $config = array()) @@ -192,8 +192,8 @@ public function __construct(array $config = array()) $containerID = isset($config["container_id"]) ? $config["container_id"] : ""; $this->containerID = $originDetection->getContainerID($containerID, $originDetectionEnabled); - $this->flushFailureHandler = isset($config['socket_failure_handler']) - ? $config['socket_failure_handler'] + $this->flushFailureHandler = isset($config['flush_failure_handler']) + ? $config['flush_failure_handler'] : null; $this->phpVersionHasThrowableInterface = version_compare(PHP_VERSION, '7.0.0', '>='); @@ -671,22 +671,22 @@ public function flush($message) try { $res = $this->writeToSocket($message); } catch (\Throwable $e) { - $res = false; - if ($this->flushFailureHandler !== null) { - call_user_func($this->flushFailureHandler, $e, $message); - } else { + if ($this->flushFailureHandler === null) { throw $e; + } else { + call_user_func($this->flushFailureHandler, $e, $message); + $res = false; } } } else { try { $res = $this->writeToSocket($message); } catch (Exception $e) { - $res = false; - if ($this->flushFailureHandler !== null) { - call_user_func($this->flushFailureHandler, $e, $message); - } else { + if ($this->flushFailureHandler === null) { throw $e; + } else { + call_user_func($this->flushFailureHandler, $e, $message); + $res = false; } } } diff --git a/tests/UnitTests/DogStatsd/SocketsTest.php b/tests/UnitTests/DogStatsd/SocketsTest.php index 5760b68..a676104 100644 --- a/tests/UnitTests/DogStatsd/SocketsTest.php +++ b/tests/UnitTests/DogStatsd/SocketsTest.php @@ -1507,7 +1507,7 @@ public function testCustomSocketFailureHandler() $this->disableOriginDetectionLinux(); $errorStore = null; - $dog = new DogStatsd(array("disable_telemetry" => false, "socket_failure_handler" => function ($err) use (&$errorStore) { + $dog = new DogStatsd(array("disable_telemetry" => false, "flush_failure_handler" => function ($err) use (&$errorStore) { $errorStore = $err; })); From ac7411774a5b8dd340e9c55cd12755eccac8e918 Mon Sep 17 00:00:00 2001 From: Luke Kuzmish Date: Wed, 20 Aug 2025 17:43:13 -0400 Subject: [PATCH 5/6] simplify --- src/DogStatsd.php | 40 ++++++++++++++-------------------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/src/DogStatsd.php b/src/DogStatsd.php index 3ccf5c7..92d87da 100644 --- a/src/DogStatsd.php +++ b/src/DogStatsd.php @@ -63,10 +63,6 @@ class DogStatsd * @var (callable(\Throwable, string))|null The closure which is executed when there is a failure flushing metrics. */ private $flushFailureHandler = null; - /** - * @var bool Whether the PHP Version of the app has the Throwable interface (>7.0)) - */ - private $phpVersionHasThrowableInterface = false; // Telemetry private $disable_telemetry; @@ -195,8 +191,6 @@ public function __construct(array $config = array()) $this->flushFailureHandler = isset($config['flush_failure_handler']) ? $config['flush_failure_handler'] : null; - - $this->phpVersionHasThrowableInterface = version_compare(PHP_VERSION, '7.0.0', '>='); } /** @@ -667,27 +661,21 @@ public function flush($message) { $message .= $this->flushTelemetry(); - if ($this->phpVersionHasThrowableInterface) { - try { - $res = $this->writeToSocket($message); - } catch (\Throwable $e) { - if ($this->flushFailureHandler === null) { - throw $e; - } else { - call_user_func($this->flushFailureHandler, $e, $message); - $res = false; - } + try { + $res = $this->writeToSocket($message); + } catch (\Throwable $e) { + if ($this->flushFailureHandler === null) { + throw $e; + } else { + call_user_func($this->flushFailureHandler, $e, $message); + $res = false; } - } else { - try { - $res = $this->writeToSocket($message); - } catch (Exception $e) { - if ($this->flushFailureHandler === null) { - throw $e; - } else { - call_user_func($this->flushFailureHandler, $e, $message); - $res = false; - } + } catch (Exception $e) { + if ($this->flushFailureHandler === null) { + throw $e; + } else { + call_user_func($this->flushFailureHandler, $e, $message); + $res = false; } } From 0ca1687133fe412b9fdbeb55229b9982d0e45fa7 Mon Sep 17 00:00:00 2001 From: Luke Kuzmish Date: Thu, 21 Aug 2025 06:59:51 -0400 Subject: [PATCH 6/6] PR feedback --- tests/UnitTests/DogStatsd/SocketsTest.php | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/UnitTests/DogStatsd/SocketsTest.php b/tests/UnitTests/DogStatsd/SocketsTest.php index a676104..2580882 100644 --- a/tests/UnitTests/DogStatsd/SocketsTest.php +++ b/tests/UnitTests/DogStatsd/SocketsTest.php @@ -351,7 +351,7 @@ public function testMicrotiming() public function testGauge() { $this->disableOriginDetectionLinux(); - + $stat = 'some.gauge_metric'; $value = 5; $sampleRate = 1.0; @@ -1512,7 +1512,10 @@ public function testCustomSocketFailureHandler() })); $this->getSocketSpy()->errorThrownOnSend = function () { - throw new ErrorException('ErrorException: socket_sendto(): Unable to write to socket [111]: Connection refused'); + trigger_error( + 'ErrorException: socket_sendto(): Unable to write to socket [111]: Connection refused', + E_USER_WARNING + ); }; $dog->increment('test'); $this->assertNotNull($errorStore);