-
EC2 Default User authoredEC2 Default User authored
amqpfind.py 5.50 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
ampqfind.py
~~~~~~~~~~~
PURPOSE
Use a AMQP exchange in simple ways.
Documentation at https://docs.google.com/document/d/1OQV0ewHOyupsFO_MNyu0qBowS3InsprvWmiMunbXbMg
Based on himawari8_delta.py
REFERENCES
http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
https://kombu.readthedocs.org/en/latest/userguide/introduction.html
http://kombu.readthedocs.org/en/latest/userguide/examples.html
http://oriolrius.cat/blog/2013/09/30/hello-world-using-kombu-library-and-python/
https://gist.github.com/thanos/2933599
https://gist.github.com/markmc/5685616
REQUIRES
pika, python 2.6 or newer
USAGE
# receive (consume)
amqpfind -H hwing.ssec.wisc.edu -X himawari -u himawari -p guest -C 'himawari8.delta' -j '{path}' |xargs -n1 -P4 mycode.sh
:authors: K.Hallock <kevin.hallock@ssec.wisc.edu>, R.K.Garcia <rayg@ssec.wisc.edu>
:copyright: 2015 by University of Wisconsin Regents
:license: GPLv3, see LICENSE for more details
"""
__author__ = 'rayg'
__docformat__ = 'reStructuredText'
import os, sys, re
import logging, unittest, optparse, json
from functools import partial
try:
import pika
except ImportError:
home,_ = os.path.split(__file__)
sys.path.append(os.path.join(home, 'pika.egg'))
import pika
LOG = logging.getLogger(__name__)
#DEFAULT_SERVER='mq1.ssec.wisc.edu'
DEFAULT_SERVER='localhost'
DEFAULT_USER='himawari' # FUTURE: this should be guest
DEFAULT_PASSWORD='guest'
DEFAULT_EXCHANGE='satellite'
DEFAULT_KEY='geo.himawari.8.ahi.file.delta.hsf.image.complete'
# RE_KVP = re.compile(r'(\w+)=([^,]+),?\s*')
def default_callback(body):
print("{0:s}".format(body))
sys.stdout.flush()
LOG.debug(repr(body))
def json_callback(json_txt, format_str=None):
content = json.loads(json_txt)
print(format_str.format(**content))
sys.stdout.flush()
def callback_wrapper(ch, method, properties, body, callback=default_callback):
callback(body)
ch.basic_ack(delivery_tag = method.delivery_tag)
class AmqpExchange(object):
def __init__(self, server=DEFAULT_SERVER,
user=DEFAULT_USER, password=DEFAULT_PASSWORD,
exchange=DEFAULT_EXCHANGE, key=DEFAULT_KEY):
credentials = pika.PlainCredentials(user, password)
conn_params = pika.ConnectionParameters(host=server,
credentials=credentials)
self.host = server
self.exchange = exchange
self.key = key
self.connection = pika.BlockingConnection(conn_params)
self.channel = self.connection.channel()
def consume(self, callback=None, exchange=None, key=None):
# declare temporary queue
result = self.channel.queue_declare(exclusive=True, auto_delete=True)
exchange = exchange or self.exchange
key = key or self.key
self.channel.queue_bind(exchange=exchange,
queue=result.method.queue,
routing_key=key)
self.channel.basic_qos(prefetch_count=1)
wrapped_callback = partial(callback_wrapper, callback=callback or default_callback)
self.channel.basic_consume(wrapped_callback, queue=result.method.queue)
try:
LOG.info("about to consume {0}/{1}/{2}".format(self.host, exchange, key))
self.channel.start_consuming()
except KeyboardInterrupt:
# FIXME any shutdown?
LOG.warning('keyboard interrupt')
return
def main():
parser = optparse.OptionParser("print out AMQP message payloads as lines of text")
parser.add_option('-v', '--verbose', dest='verbosity', action="count", default=0,
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
# http://docs.python.org/2.7/library/argparse.html#nargs
# parser.add_option('--stuff', nargs='5', dest='my_stuff',
# help="one or more random things")
parser.add_option('-H', '--host', dest='host', default=DEFAULT_SERVER,
help='name of AMQP server to connect to')
parser.add_option('-X', '--exchange', dest='exchange', default=DEFAULT_EXCHANGE,
help='name of exchange to connect to')
parser.add_option('-C', '--consume', dest='consume', default=DEFAULT_KEY,
help='topic pattern to listen for and consume')
parser.add_option('-u', '--user', dest='user', default=DEFAULT_USER,
help='user id to talk to AMQP exchange as')
parser.add_option('-p', '--passwd', dest='passwd', default=DEFAULT_PASSWORD,
help='password for user')
parser.add_option('-j', '--json', dest='json',
help='parse content as json and print this expression using python string.format()')
# parser.add_option('-P', '--produce', dest='produce', default=None,
# help='topic to produce')
opts, args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
logging.basicConfig(level=levels[min(3, opts.verbosity)])
if not opts.exchange:
unittest.main()
return 0
session = AmqpExchange(server=opts.host, user=opts.user, password=opts.passwd)
callback = default_callback
if opts.json:
callback = partial(json_callback, format_str=opts.json)
if len(args)==1:
session.consume(callback, exchange=opts.exchange, key=args[0])
elif opts.consume:
session.consume(callback, exchange=opts.exchange, key=opts.consume)
return 0
if __name__ == '__main__':
sys.exit(main())