Skip to content
Snippets Groups Projects
amqpfind.py 5.50 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
ampqfind.py
~~~~~~~~~~~

PURPOSE
Use a AMQP exchange in simple ways.
Documentation at https://docs.google.com/document/d/1OQV0ewHOyupsFO_MNyu0qBowS3InsprvWmiMunbXbMg

Based on himawari8_delta.py

REFERENCES
http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
https://kombu.readthedocs.org/en/latest/userguide/introduction.html
http://kombu.readthedocs.org/en/latest/userguide/examples.html
http://oriolrius.cat/blog/2013/09/30/hello-world-using-kombu-library-and-python/
https://gist.github.com/thanos/2933599
https://gist.github.com/markmc/5685616

REQUIRES
pika, python 2.6 or newer

USAGE

# receive (consume)
amqpfind -H hwing.ssec.wisc.edu -X himawari -u himawari -p guest -C 'himawari8.delta' -j '{path}' |xargs -n1 -P4 mycode.sh


:authors: K.Hallock <kevin.hallock@ssec.wisc.edu>, R.K.Garcia <rayg@ssec.wisc.edu>
:copyright: 2015 by University of Wisconsin Regents
:license: GPLv3, see LICENSE for more details
"""
__author__ = 'rayg'
__docformat__ = 'reStructuredText'

import os, sys, re
import logging, unittest, optparse, json
from functools import partial
try:
    import pika
except ImportError:
    home,_ = os.path.split(__file__)
    sys.path.append(os.path.join(home, 'pika.egg'))
    import pika

LOG = logging.getLogger(__name__)


#DEFAULT_SERVER='mq1.ssec.wisc.edu'
DEFAULT_SERVER='localhost'
DEFAULT_USER='himawari' # FUTURE: this should be guest
DEFAULT_PASSWORD='guest'
DEFAULT_EXCHANGE='satellite'
DEFAULT_KEY='geo.himawari.8.ahi.file.delta.hsf.image.complete'


# RE_KVP = re.compile(r'(\w+)=([^,]+),?\s*')

def default_callback(body):
    print("{0:s}".format(body))
    sys.stdout.flush()
    LOG.debug(repr(body))


def json_callback(json_txt, format_str=None):
    content = json.loads(json_txt)
    print(format_str.format(**content))
    sys.stdout.flush()


def callback_wrapper(ch, method, properties, body, callback=default_callback):
    callback(body)
    ch.basic_ack(delivery_tag = method.delivery_tag)


class AmqpExchange(object):

    def __init__(self, server=DEFAULT_SERVER,
                        user=DEFAULT_USER, password=DEFAULT_PASSWORD,
                        exchange=DEFAULT_EXCHANGE, key=DEFAULT_KEY):

        credentials = pika.PlainCredentials(user, password)
        conn_params = pika.ConnectionParameters(host=server,
                                                credentials=credentials)
        self.host = server
        self.exchange = exchange
        self.key = key

        self.connection = pika.BlockingConnection(conn_params)
        self.channel = self.connection.channel()

    def consume(self, callback=None, exchange=None, key=None):
        # declare temporary queue
        result = self.channel.queue_declare(exclusive=True, auto_delete=True)
        exchange = exchange or self.exchange
        key = key or self.key
        self.channel.queue_bind(exchange=exchange,
                   queue=result.method.queue,
                   routing_key=key)
        self.channel.basic_qos(prefetch_count=1)
        wrapped_callback = partial(callback_wrapper, callback=callback or default_callback)
        self.channel.basic_consume(wrapped_callback, queue=result.method.queue)
        try:
            LOG.info("about to consume {0}/{1}/{2}".format(self.host, exchange, key))
            self.channel.start_consuming()
        except KeyboardInterrupt:
            # FIXME any shutdown?
            LOG.warning('keyboard interrupt')
            return 



def main():
    parser = optparse.OptionParser("print out AMQP message payloads as lines of text")
    parser.add_option('-v', '--verbose', dest='verbosity', action="count", default=0,
                        help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG')
    # http://docs.python.org/2.7/library/argparse.html#nargs
    # parser.add_option('--stuff', nargs='5', dest='my_stuff',
    # help="one or more random things")
    parser.add_option('-H', '--host', dest='host', default=DEFAULT_SERVER,
                        help='name of AMQP server to connect to')
    parser.add_option('-X', '--exchange', dest='exchange', default=DEFAULT_EXCHANGE,
                        help='name of exchange to connect to')
    parser.add_option('-C', '--consume', dest='consume', default=DEFAULT_KEY,
                        help='topic pattern to listen for and consume')
    parser.add_option('-u', '--user', dest='user', default=DEFAULT_USER,
                        help='user id to talk to AMQP exchange as')
    parser.add_option('-p', '--passwd', dest='passwd', default=DEFAULT_PASSWORD,
                        help='password for user')
    parser.add_option('-j', '--json', dest='json',
                        help='parse content as json and print this expression using python string.format()')
    # parser.add_option('-P', '--produce', dest='produce', default=None,
                        # help='topic to produce')
    opts, args = parser.parse_args()

    levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
    logging.basicConfig(level=levels[min(3, opts.verbosity)])

    if not opts.exchange:
        unittest.main()
        return 0

    session = AmqpExchange(server=opts.host, user=opts.user, password=opts.passwd)

    callback = default_callback
    if opts.json:
        callback = partial(json_callback, format_str=opts.json)

    if len(args)==1:
        session.consume(callback, exchange=opts.exchange, key=args[0])
    elif opts.consume:
        session.consume(callback, exchange=opts.exchange, key=opts.consume)

    return 0


if __name__ == '__main__':
    sys.exit(main())