diff --git a/README.rst b/README.rst index e637785..71b2d01 100644 --- a/README.rst +++ b/README.rst @@ -75,9 +75,9 @@ For example:: When using ``extra`` field make sure you don't use reserved names. From `Python documentation `_. | "The keys in the dictionary passed in extra should not clash with the keys used by the logging system. (See the `Formatter `_ documentation for more information on which keys are used by the logging system.)" -To use the AMQPLogstashHandler you will need to install pika first. +To use the AMQPLogstashHandler you will need to install kombu first. - pip install pika + pip install kombu For example:: diff --git a/logstash/handler_amqp.py b/logstash/handler_amqp.py index 4209964..819ca2b 100644 --- a/logstash/handler_amqp.py +++ b/logstash/handler_amqp.py @@ -1,13 +1,13 @@ -import json +import socket +from logging.handlers import SocketHandler + try: from urllib import urlencode except ImportError: from urllib.parse import urlencode -from logging import Filter -from logging.handlers import SocketHandler +from kombu import Connection, Exchange, producers -import pika from logstash import formatter @@ -48,8 +48,6 @@ def __init__(self, host='localhost', port=5672, username='guest', virtual_host='/', message_type='logstash', tags=None, durable=False, passive=False, version=0, extra_fields=True, fqdn=False, facility=None, exchange_routing_key=''): - - # AMQP parameters self.host = host self.port = port @@ -76,54 +74,67 @@ def __init__(self, host='localhost', port=5672, username='guest', self.facility = facility def makeSocket(self, **kwargs): - - return PikaSocket(self.host, - self.port, - self.username, - self.password, - self.virtual_host, - self.exchange, - self.routing_key, - self.exchange_is_durable, - self.declare_exchange_passively, - self.exchange_type) + socket = KombuSocket(self.host, + self.port, + self.username, + self.password, + self.virtual_host, + self.exchange, + self.routing_key, + self.exchange_is_durable, + self.declare_exchange_passively, + self.exchange_type) + socket.connect() + return socket def makePickle(self, record): return self.formatter.format(record) + def send(self, s): + """ + Behaves exactly like SocketHandler.send() except that it allows + exceptions to bubble up to emit() so we can atleast be aware of + logging failures. + """ + if self.sock is None: + self.createSocket() -class PikaSocket(object): + if self.sock: + try: + self.sock.sendall(s) + except (OSError, socket.error): #pragma: no cover + self.sock.close() + self.sock = None # so we can call createSocket next time + raise - def __init__(self, host, port, username, password, virtual_host, exchange, - routing_key, durable, passive, exchange_type): - # create connection parameters - credentials = pika.PlainCredentials(username, password) - parameters = pika.ConnectionParameters(host, port, virtual_host, - credentials) +class KombuSocket(object): - # create connection & channel - self.connection = pika.BlockingConnection(parameters) - self.channel = self.connection.channel() - - # create an exchange, if needed - self.channel.exchange_declare(exchange=exchange, - exchange_type=exchange_type, - passive=passive, - durable=durable) - - # needed when publishing - self.spec = pika.spec.BasicProperties(delivery_mode=2) + def __init__(self, host, port, username, password, virtual_host, exchange, + routing_key, durable, passive, exchange_type): + # create connection + self.connection = Connection(hostname=host, + port=port, + userid=username, + password=password, + virtual_host=virtual_host) + + # create exchange + self.exchange = Exchange(exchange, type=exchange_type, durable=durable) + self.exchange.passive = passive + + # other publishing params self.routing_key = routing_key - self.exchange = exchange - def sendall(self, data): - - self.channel.basic_publish(self.exchange, - self.routing_key, - data, - properties=self.spec) + with producers[self.connection].acquire(block=True) as producer: + producer.publish(data, + routing_key=self.routing_key, + exchange=self.exchange, + declare=[self.exchange]) + + def connect(self): + self.connection.connect() def close(self): try: