From 848cf0b6c5b6d11fa977cd439d5040d9b3a067dc Mon Sep 17 00:00:00 2001 From: Ray Date: Thu, 27 Jan 2011 15:05:02 -0600 Subject: [PATCH 1/5] Allow messages to get in even after we request a cancel due to timing issues. A possible alternative is to reject the messages so they can be re-delivered --- amqp.inc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amqp.inc b/amqp.inc index 39ec609..eda9c63 100644 --- a/amqp.inc +++ b/amqp.inc @@ -1233,7 +1233,8 @@ class AMQPChannel extends AbstractChannel $args->write_bit($nowait); $this->send_method_frame(array(60, 30), $args); return $this->wait(array( - "60,31" // Channel.basic_cancel_ok + "60,60", // Channel.basic_deliver - timing issue + "60,31" // Channel.basic_cancel_ok )); } From 12f156e5aded49b25e7e49bf8fddfb416ee9a13c Mon Sep 17 00:00:00 2001 From: Ray Date: Mon, 31 Jan 2011 11:31:34 -0600 Subject: [PATCH 2/5] Adjusted basic_cancel to allow contining processing of incoming messages during wait for basic_cancel_ok message. Added options to allow special handling during cancelling. Added an option for special handling of unexpected basic_deliver messages --- amqp.inc | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/amqp.inc b/amqp.inc index eda9c63..491dfe0 100644 --- a/amqp.inc +++ b/amqp.inc @@ -816,6 +816,8 @@ class AMQPChannel extends AbstractChannel "90,31" => "tx_rollback_ok" ); + protected $defaultCallback = null; + public function __construct($connection, $channel_id=NULL, $auto_decode=true) @@ -837,6 +839,10 @@ class AMQPChannel extends AbstractChannel $this->alerts = array(); $this->callbacks = array(); $this->auto_decode = $auto_decode; + $this->defaultCallback = array('AMQPBehavior', 'ignoreMessage'); + if ($this->debug) { + $this->defaultCallback = array('AMQPBehavior', 'debugMessage'); + } $this->x_open(); } @@ -1225,17 +1231,28 @@ class AMQPChannel extends AbstractChannel /** * end a queue consumer + * + * @param string $consumer_tag + * @param boolean $nowait + * @param callback $cancellingBehavior - defaults to use the consumer callback + * @return null */ - public function basic_cancel($consumer_tag, $nowait=false) + public function basic_cancel($consumer_tag, $nowait=false, $cancellingBehavior = null) { $args = new AMQPWriter(); $args->write_shortstr($consumer_tag); $args->write_bit($nowait); $this->send_method_frame(array(60, 30), $args); - return $this->wait(array( - "60,60", // Channel.basic_deliver - timing issue - "60,31" // Channel.basic_cancel_ok - )); + if ($cancellingBehavior !== null) { + $this->callbacks[$consumer_tag] = $cancellingBehavior; + } + if (!$nowait) { + return $this->wait(array( + "60,60", // Channel.basic_deliver + "60,31" // Channel.basic_cancel_ok + )); + } + return null; } /** @@ -1307,7 +1324,7 @@ class AMQPChannel extends AbstractChannel if(array_key_exists($consumer_tag, $this->callbacks)) $func = $this->callbacks[$consumer_tag]; else - $func = NULL; + $func = $this->defaultCallback; if($func!=NULL) call_user_func($func, $msg); @@ -1494,7 +1511,16 @@ class AMQPChannel extends AbstractChannel protected function tx_select_ok($args) { } - + + /** + * Set the default callback to use, if there is no matching consumer + * + * @param callback $defaultCallBack + */ + public function setDefaultCallback($defaultCallBack) + { + $this->defaultCallback = $defaultCallBack; + } } /** @@ -1527,4 +1553,53 @@ class AMQPMessage extends GenericContent } } +/** + * A static class to use with the default callbacks of a channel + * + */ +class AMQPBehavior { + /** + * Reject a message + * @param AMQPMessage $msg + * @param boolean $requeue defaults to true + */ + public static function rejectMessage($msg, $requeue = true) + { + $deliveryTag = $msg->delivery_info['delivery_tag']; + $channel = $msg->delivery_info['channel']; + $channel->basic_reject($deliveryTag, $requeue); + } + + /** + * Drop and Reject a message + * @param AMQPMessage $msg + */ + public static function dropMessage($msg) + { + self::rejectMessage($msg, false); + } + + /** + * Ignore a message + * @param AMQPMessage $msg + */ + public static function ignoreMessage($msg) + { + // no-op + } + + /** + * send message to debugging messaging + * @param AMQPMessage $msg + */ + public static function debugMessage($msg) + { + $deliveryTag = $msg->delivery_info['delivery_tag']; + $consumerTag = $msg->delivery_info['consumer_tag']; + $channel = $msg->delivery_info['channel']; + $channel_id = $channel->getChannelId(); + + debug_msg("Message {$deliveryTag} on channel_id {$channel_id} with consumer-tag {$consumerTag}"); + } +} ?> From 2fe295979a6e95b081b4102ffce69c0e86de2dfc Mon Sep 17 00:00:00 2001 From: wizard Date: Thu, 7 Apr 2011 16:23:31 +0300 Subject: [PATCH 3/5] Added support for handling signals while reading from a socket --- amqp_wire.inc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/amqp_wire.inc b/amqp_wire.inc index 5e489e3..dcf4a15 100644 --- a/amqp_wire.inc +++ b/amqp_wire.inc @@ -302,6 +302,10 @@ class AMQPReader while($read < $n && !feof($this->sock->real_sock()) && (false !== ($buf = fread($this->sock->real_sock(), $n - $read)))) { + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + if ($buf == '') { usleep(100); From 6d4524714fb367fec31fc5919ba4ffa42b2e560f Mon Sep 17 00:00:00 2001 From: Ray Date: Thu, 7 Apr 2011 09:41:21 -0500 Subject: [PATCH 4/5] Adjusted wait loop in basic_cancel to process remaining delivers, and still wait for the actual cancel --- amqp.inc | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/amqp.inc b/amqp.inc index 0df229b..fbfc15a 100644 --- a/amqp.inc +++ b/amqp.inc @@ -1282,10 +1282,18 @@ class AMQPChannel extends AbstractChannel $this->callbacks[$consumer_tag] = $cancellingBehavior; } if (!$nowait) { - return $this->wait(array( - "60,60", // Channel.basic_deliver - "60,31" // Channel.basic_cancel_ok - )); + // Unsure we have a callback set + if (!isset($this->callbacks[$consumer_tag])) { + $this->callbacks[$consumer_tag] = $this->defaultCallback; + } + + // Wait for a cancel message before actually breaking out of the waiting loop. + while (array_key_exists($consumer_tag, $this->callbacks)) { + $this->wait(array( + "60,60", // Channel.basic_deliver + "60,31" // Channel.basic_cancel_ok + )); + } } return null; } From 7fbb865fac80d6c9a18c3822658efadb5633d89c Mon Sep 17 00:00:00 2001 From: Ray Date: Thu, 7 Apr 2011 09:45:32 -0500 Subject: [PATCH 5/5] Spelling correction --- amqp.inc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amqp.inc b/amqp.inc index fbfc15a..3f55aca 100644 --- a/amqp.inc +++ b/amqp.inc @@ -1282,7 +1282,7 @@ class AMQPChannel extends AbstractChannel $this->callbacks[$consumer_tag] = $cancellingBehavior; } if (!$nowait) { - // Unsure we have a callback set + // ensure we have a callback set if (!isset($this->callbacks[$consumer_tag])) { $this->callbacks[$consumer_tag] = $this->defaultCallback; }