diff --git a/oni/.gitignore b/oni/.gitignore new file mode 100644 index 0000000..7e99e36 --- /dev/null +++ b/oni/.gitignore @@ -0,0 +1 @@ +*.pyc \ No newline at end of file diff --git a/oni/kafka_client.py b/oni/kafka_client.py index e43739e..5fb50b9 100755 --- a/oni/kafka_client.py +++ b/oni/kafka_client.py @@ -7,6 +7,15 @@ from kafka.partitioner.roundrobin import RoundRobinPartitioner from kafka.common import TopicPartition +#librdkafka kerberos configs +krb_conf_options = {'sasl.mechanisms': 'gssapi', + 'security.protocol': 'sasl_plaintext', + 'sasl.kerberos.service.name': 'kafka', + 'sasl.kerberos.kinit.cmd': 'kinit -k -t "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}', + 'sasl.kerberos.principal': os.getenv('KRB_USER'), + 'sasl.kerberos.keytab': os.getenv('KEYTABPATH'), + 'sasl.kerberos.min.time.before.relogin': 60000} + class KafkaTopic(object): @@ -28,6 +37,7 @@ def _initialize_members(self,topic,server,port,zk_server,zk_port,partitions): self._num_of_partitions = partitions self._partitions = [] self._partitioner = None + self._librdkafka_debug = {'debug': 'all'} # create topic with partitions self._create_topic() @@ -37,10 +47,10 @@ def _create_topic(self): self._logger.info("Creating topic: {0} with {1} parititions".format(self._topic,self._num_of_partitions)) # Create partitions for the workers. - self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))] + #self._partitions = [ TopicPartition(self._topic,p) for p in range(int(self._num_of_partitions))] # create partitioner - self._partitioner = RoundRobinPartitioner(self._partitions) + #self._partitioner = RoundRobinPartitioner(self._partitions) # get script path zk_conf = "{0}:{1}".format(self._zk_server,self._zk_port) @@ -53,10 +63,33 @@ def send_message(self,message,topic_partition): self._logger.info("Sending message to: Topic: {0} Partition:{1}".format(self._topic,topic_partition)) kafka_brokers = '{0}:{1}'.format(self._server,self._port) - producer = KafkaProducer(bootstrap_servers=[kafka_brokers],api_version_auto_timeout_ms=3600000) - future = producer.send(self._topic,message,partition=topic_partition) + self._producer_conf = {'bootstrap.servers': kafka_brokers, + 'session.timeout.ms': 6000, + 'api.version.request': 'false', + 'internal.termination.signal': 0, + 'broker.version.fallback': '0.9.0.0', + 'log.connection.close': 'false', + 'socket.keepalive.enable': 'false', + 'default.topic.config': {'request.required.acks': 'all'}} + + if os.getenv('ingest_kafka_debug'): + self._logger.info("librdkafka debug: all") + self._producer_conf.update(self._librdkafka_debug) + + if os.getenv('KRB_AUTH'): + self._logger.info("Updating Consumer Configuration with Kerberos options") + self._producer_conf.update(krb_conf_options) + + def delivery_callback (err, msg): + if err: + self._logger.info('Message failed delivery: {0}'.format(err)) + else: + self._logger.info('Message delivered to topic {0} on {1}'.format(msg.topic(), msg.partition())) + + producer = confluent_kafka_Producer(**self._producer_conf) + future = producer.produce(self._topic, message.encode('utf-8'), callback=delivery_callback) + producer.poll(0) producer.flush() - producer.close() @property def Topic(self): @@ -64,8 +97,8 @@ def Topic(self): @property def Partition(self): - return self._partitioner.partition(self._topic).partition - + #return self._partitioner.partition(self._topic).partition + return 0 class KafkaConsumer(object): @@ -76,21 +109,90 @@ def __init__(self,topic,server,port,zk_server,zk_port,partition): def _initialize_members(self,topic,server,port,zk_server,zk_port,partition): + self._logger = Util.get_logger("ONI.INGEST.KAFKA") self._topic = topic self._server = server self._port = port self._zk_server = zk_server self._zk_port = zk_port self._id = partition + self._librdkafka_debug = {'debug': 'all'} def start(self): kafka_brokers = '{0}:{1}'.format(self._server,self._port) - consumer = KC(bootstrap_servers=[kafka_brokers],group_id=self._topic) - partition = [TopicPartition(self._topic,int(self._id))] - consumer.assign(partitions=partition) - consumer.poll() - return consumer + kafka_brokers = '{0}:{1}'.format(self._server,self._port) + self._consumer_test = {'bootstrap.servers': kafka_brokers, + 'group.id': self._id} + self._consumer_conf = {'bootstrap.servers': kafka_brokers, + 'group.id': self._id, + 'internal.termination.signal': 0, + 'client.id': 'npsmithx-mac', + 'socket.timeout.ms': 30000, + 'socket.keepalive.enable': 'true', + 'reconnect.backoff.jitter.ms': '6000', + 'api.version.request': 'false', 'debug': 'generic', + 'broker.version.fallback': '0.9.0.0', 'log.connection.close': 'false', + 'default.topic.config': {'auto.commit.enable': 'true', 'auto.commit.interval.ms': '60000', 'auto.offset.reset': 'smallest'}} + + if os.getenv('ingest_kafka_debug'): + self._logger.info("librdkafka debug: all") + self._consumer_conf.update(self._librdkafka_debug) + + if os.getenv('KRB_AUTH'): + self._logger.info("Updating Consumer Configuration with Kerberos options") + self._consumer_conf.update(krb_conf_options) + + consumer = confluent_kafka_Consumer(self._consumer_conf) + subscribed = None + + def on_assign (consumer, partitions): + self._logger.info('Assigned: {0}, {1}'.format(len(partitions), partitions)) + for p in partitions: + print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset)) + p.offset=-1 + consumer.assign(partitions) + + def on_revoke (consumer, partitions): + self._logger.info('Revoked: {0} {1}'.format(len(partitions), partitions)) + for p in partitions: + print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset)) + consumer.unassign() + + running = True + + try: + + consumer.subscribe([self._topic], on_assign=on_assign, on_revoke=on_revoke) + self._logger.info('subscribing to ' + self._topic) + + while running: + print "polling" + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + self._logger.info('{0} {1} reached end at offset {2}'.format(msg.topic(), msg.partition(), msg.offset())) + continue + elif msg.error(): + raise KafkaException(msg.error()) + SystemExit + else: + return msg + + except KeyboardInterrupt: + self._logger.info('User interrupted') + raise SystemExit + except: + self._logger.info('Unexpected error:, {0}'.format(sys.exc_info()[0])) + raise SystemExit + finally: + self._logger.info('closing down consumer') + consumer.close() + + def stop(self): + running = False @property def Topic(self): diff --git a/oni/kerberos.py b/oni/kerberos.py index 54a4c52..ea1c2a6 100755 --- a/oni/kerberos.py +++ b/oni/kerberos.py @@ -20,9 +20,9 @@ def __init__(self): self._kinit_args = [self._kinit,self._kinitopts,self._keytab,self._krb_user] - def authenticate(self): + def authenticate(self): - kinit = subprocess.Popen(self._kinit_args, stderr = subprocess.PIPE) + kinit = subprocess.Popen(self._kinit_args, shell=True, stderr = subprocess.PIPE) output,error = kinit.communicate() if not kinit.returncode == 0: if error: diff --git a/pipelines/dns/worker.py b/pipelines/dns/worker.py index 229fb07..61ee941 100755 --- a/pipelines/dns/worker.py +++ b/pipelines/dns/worker.py @@ -36,8 +36,8 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): def start(self): self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic)) - for message in self.kafka_consumer.start(): - self._new_file(message.value) + for message in [self.kafka_consumer.start()]: + self._new_file(message.value()) def _new_file(self,file): diff --git a/pipelines/flow/worker.py b/pipelines/flow/worker.py index 9f2991c..8263d34 100755 --- a/pipelines/flow/worker.py +++ b/pipelines/flow/worker.py @@ -36,8 +36,8 @@ def _initialize_members(self,db_name,hdfs_app_path,kafka_consumer,conf_type): def start(self): self._logger.info("Listening topic:{0}".format(self.kafka_consumer.Topic)) - for message in self.kafka_consumer.start(): - self._new_file(message.value) + for message in [self.kafka_consumer.start()]: + self._new_file(message.value()) def _new_file(self,file): diff --git a/start_ingest_standalone.sh b/start_ingest_standalone.sh index 4a14cf6..f75ef33 100755 --- a/start_ingest_standalone.sh +++ b/start_ingest_standalone.sh @@ -1,5 +1,13 @@ #!/bin/bash +source /etc/duxbay.conf + +export KRB_AUTH +export KINITPATH +export KINITOPTS +export KEYTABPATH +export KRB_USER + #----------------------------------------------------------------------------------- # Validate parameters. #-----------------------------------------------------------------------------------