diff --git a/README.rst b/README.rst index 85a95e2..feb567d 100644 --- a/README.rst +++ b/README.rst @@ -6,6 +6,8 @@ http://logstash.net/ Changelog ========= +0.4.6 + - Before publishing, reconnects to rabbitmq if connection is closed 0.4.5 - Allow passing exchange's routing key to AMQP handler 0.4.4 diff --git a/logstash/handler_amqp.py b/logstash/handler_amqp.py index 99d190e..5a621f1 100644 --- a/logstash/handler_amqp.py +++ b/logstash/handler_amqp.py @@ -90,34 +90,55 @@ def makePickle(self, record): class PikaSocket(object): def __init__(self, host, port, username, password, virtual_host, exchange, - routing_key, durable, exchange_type): + routing_key, durable, exchange_type, max_retry_attempts=3): # create connection parameters credentials = pika.PlainCredentials(username, password) - parameters = pika.ConnectionParameters(host, port, virtual_host, - credentials) + self.parameters = pika.ConnectionParameters(host, + port, + virtual_host, + credentials) - # create connection & channel - self.connection = pika.BlockingConnection(parameters) - self.channel = self.connection.channel() - - # create an exchange, if needed - self.channel.exchange_declare(exchange=exchange, - exchange_type=exchange_type, - durable=durable) # needed when publishing self.spec = pika.spec.BasicProperties(delivery_mode=2) self.routing_key = routing_key self.exchange = exchange + self.exchange_type = exchange_type + self.durable = durable + self.max_retry_attempts = max_retry_attempts + # connect for the first time + self.connect() - def sendall(self, data): + def connect(self): + # create connection & channel + self.connection = pika.BlockingConnection(self.parameters) + self.channel = self.connection.channel() - self.channel.basic_publish(self.exchange, - self.routing_key, - data, - properties=self.spec) + # create an exchange, if needed + self.channel.exchange_declare(exchange=self.exchange, + exchange_type=self.exchange_type, + durable=self.durable) + + def sendall(self, data): + attempts_left = self.max_retry_attempts + published = False + + while attempts_left > 0: + try: + self.channel.basic_publish(self.exchange, + self.routing_key, + data, + properties=self.spec) + published = True + break + except: + self.connect() + attempts_left -= 1 + + if not published: + print "Error: Faild to connect to RabbitMQ" def close(self): try: diff --git a/setup.py b/setup.py index e641ca3..bcf4eb8 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='python-logstash', packages=['logstash'], - version='0.4.5', + version='0.4.6', description='Python logging handler for Logstash.', long_description=open('README.rst').read(), author='Volodymyr Klochan',