#!/usr/bin/env python """ Command line tool which will... Poll Location Cast Kafka queue for messages Using Python Kafka client: https://github.com/confluentinc/confluent-kafka-python (you can install librdkafka via brew (on Mac) and confluent_kafka via pip). """ import sys, socket, threading from datetime import datetime from confluent_kafka import Consumer, KafkaError # change these parameters for other uses, deployment updates, etc. KafkaServers = ['kafka-0.example.com', 'kafka-1.example.com', 'kafka-2.example.coml'] KafkaServerPointer = 0 # an index into the above array sys.stderr.write("index = %d\n" % (KafkaServerPointer,)) Topic = 'example' LastIP = socket.gethostbyname(KafkaServers[KafkaServerPointer]) # setup wait timer timer = threading.Condition() timer.acquire() def log(msg): fp = open('/tmp/Kafka.log','a') fp.write(str(datetime.now()) + ' ' + msg + '\n') fp.close() def goToWork(): global KafkaServerPointer try: c = Consumer({'bootstrap.servers': KafkaServers[KafkaServerPointer], 'group.id': 'lcmongroup', 'default.topic.config': {'auto.offset.reset': 'smallest'}}) except: log('Failed to allocate a Consumer') KafkaServerPointer += 1 if KafkaServerPointer == 3: KafkaServerPointer = 0 sys.stderr.write("index = %d\n" % (KafkaServerPointer,)) return # safety check if str(c.__class__) != "": return try: c.subscribe([Topic,]) running = True while running: try: msg = c.poll() except: log('Exception while polling') KafkaServerPointer += 1 if KafkaServerPointer == 3: KafkaServerPointer = 0 sys.stderr.write("index = %d\n" % (KafkaServerPointer,)) break # exit while loop and function if msg is not None: if not msg.error(): if msg.value() is not None: log(msg.value() + '\n') elif msg.error().code() != KafkaError._PARTITION_EOF: log(str(msg.error())) running = False else: log('--- Kafka cycled ----') KafkaServerPointer += 1 if KafkaServerPointer == 3: KafkaServerPointer = 0 sys.stderr.write("index = %d\n" % (KafkaServerPointer,)) break # exit while loop and goToWork function and re-enter it # see if the Kafka instance cycled if LastIP != socket.gethostbyname(KafkaServers[KafkaServerPointer]): log('--- Kafka cycled ----') c.close() KafkaServerPointer += 1 if KafkaServerPointer == 3: KafkaServerPointer = 0 sys.stderr.write("index = %d\n" % (KafkaServerPointer,)) break # exit function and re-enter it except: print("Caught an exception") # will exit goToWork and then re-enter it KafkaServerPointer += 1 if KafkaServerPointer == 3: KafkaServerPointer = 0 sys.stderr.write("index = %d\n" % (KafkaServerPointer,)) finally: # safety check if str(c.__class__) == "": c.close() # closed consumer - necessary cleanup. while True: goToWork() # constantly running it - keeping up if Kafka cycles # take a break - kafka is perhaps churning timer.wait(60) # waits 60 seconds