Skip to content
Snippets Groups Projects
Commit e47ed5a3 authored by David Hoese's avatar David Hoese
Browse files

Added simple zmq circus communications to control center.

parent 8012eb43
No related branches found
No related tags found
No related merge requests found
......@@ -11,15 +11,27 @@ __author__ = "David Hoese, davidh"
from wsgiref.simple_server import make_server
from pyramid.config import Configurator
from pyramid.response import Response
from pyramid.view import view_config
from aiopyramid.websocket.config import WebsocketMapper
import pyramid
from aiopyramid.websocket.view import WebsocketConnectionView
from circus.client import CircusClient
from circus.py3compat import string_types, s, b
from circus.exc import CallError
import zmq
from zmq import ssh
import os
import json
import uuid
import errno
import asyncio
import logging
LOG = logging.getLogger("sparchive_control_center")
# Don't try to communicate with circus when true
DEBUG = True
# List of tasks that we actually want to notify the web interface about
ENABLED_TASKS = []
for task_prefix in ["archive_", "push_"]:
......@@ -34,32 +46,82 @@ def create_json_error(msg_dict, error_msg=None):
return json.dumps(response_dict)
def _handle_start(msg_dict):
response_dict = msg_dict.copy()
if msg_dict["task_name"] == "push_aeri":
raise ValueError("Testing that push aeri doesn't work")
response_dict["status"] = "SUCCESS"
response_dict["comment"] = "ok"
return json.dumps(response_dict)
@asyncio.coroutine
def send_zmq_request(circus_ctl):
circus_ctl.send_message()
pass
def _handle_stop(msg_dict):
response_dict = msg_dict.copy()
response_dict["status"] = "SUCCESS"
response_dict["comment"] = "ok"
@asyncio.coroutine
def _handle_start(circus_ctl, msg_dict, command="start", name=None):
try:
if name is None:
name = msg_dict["task_name"]
props = {}
if name:
props["name"] = name
new_pids = yield from circus_ctl.send_message(command, **props)
except Exception:
LOG.exception("Could not call command on circus client")
response_dict = {
"task_name": msg_dict["task_name"],
"command": command,
"status": "ERROR",
"comment": "Could not call command on circus client",
}
return json.dumps(response_dict)
response_dict = {
"task_name": msg_dict["task_name"],
"command": command,
"status": "SUCCESS",
"comment": "",
}
return json.dumps(response_dict)
def _handle_status(msg_dict):
# TODO: Get status
response_dict = msg_dict.copy()
response_dict["status"] = "SUCCESS"
response_dict["comment"] = "ok"
response_dict["task_names"] = ["archive_aeri"]
response_dict["task_statuses"] = ["active"]
@asyncio.coroutine
def _handle_stop(circus_ctl, msg_dict):
response_dict = yield from _handle_start(circus_ctl, msg_dict, command="stop")
return response_dict
@asyncio.coroutine
def _handle_status(circus_ctl, msg_dict, command="status", name=""):
try:
if name is None:
name = msg_dict["task_name"]
props = {}
if name:
props["name"] = name
status_dict = yield from circus_ctl.send_message(command, **props)
# TODO: How does this command actually work?
except Exception:
LOG.exception("Could not call command on circus client")
response_dict = {
"task_names": status_dict["watchers"],
"task_statuses": status_dict["statuses"],
"command": command,
"status": "ERROR",
"comment": "Could not call command on circus client",
}
return json.dumps(response_dict)
response_dict = {
"task_names": status_dict["watchers"],
"task_statuses": status_dict["statuses"],
"command": command,
"status": "SUCCESS",
"comment": "",
}
return json.dumps(response_dict)
# XXX: Consider making these coroutines
commands = {
"start": _handle_start,
"stop": _handle_stop,
......@@ -67,6 +129,40 @@ commands = {
}
@asyncio.coroutine
def receive_circus_status(future, ws, circus_status):
print("Running zmq status coroutine")
while True:
watcher, action, msg = yield from circus_status.get_message()
if watcher not in ENABLED_TASKS:
continue
LOG.debug("Received circus status event: {}, {}, {}".format(watcher, action, msg))
if action == "spawn":
# something was started
ws_msg = json.dumps({
"command": "status",
"status": "SUCCESS",
"task_names": [watcher],
"task_statuses": ["active"]
})
yield from ws.send(ws_msg)
elif action == "stop":
# something was stopped
ws_msg = json.dumps({
"command": "status",
"status": "SUCCESS",
"task_names": [watcher],
"task_statuses": ["stopped"]
})
yield from ws.send(ws_msg)
else:
LOG.debug("Ignoring status message: {}, {}, {}".format(watcher, action, msg))
future.set_result("Done")
def available_experiments(incoming_root):
return [p for p in os.listdir(incoming_root) if p != "current"]
......@@ -75,45 +171,231 @@ def current_experiment(incoming_root):
return os.readlink(os.path.join(incoming_root, "current"))
@view_config(route_name='home', renderer='home.pt')
def my_view(request):
incoming_root = request.registry.settings["incoming_root"]
return {
'title': 'SPARC Control Center',
'project': 'sparchive.control_center',
'experiments': available_experiments(incoming_root),
'current_experiment': current_experiment(incoming_root),
}
class CircusEvents(object):
def __init__(self, pubsub_endpoint, topic=b'watcher.'):
self.pubsub_endpoint = pubsub_endpoint
self.topic = topic
self.ctx = zmq.Context()
self.sub_socket = self.ctx.socket(zmq.SUB)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic)
self.sub_socket.connect(self.pubsub_endpoint)
@asyncio.coroutine
def get_message(self):
topic, msg = self.sub_socket.recv_string()
topic = s(topic)
watcher = topic.split('.')[1:-1][0]
action = topic.split('.')[-1]
msg = json.loads(msg)
# actions could be reap, kill, spawn, stop
yield watcher, action, msg
class FakeCircusEvents(object):
def __init__(self, *args, **kwargs):
self.idx = 0
self.fake_events = list(zip(["archive_hsrl", "push_hsrl", "archive_aeri"],
["spawn", "spawn", "stop"], [{}, {}, {}]))
@asyncio.coroutine
def get_message(self):
yield from asyncio.sleep(5)
data = self.fake_events[self.idx]
self.idx += 1
self.idx = self.idx % len(self.fake_events)
return data
def make_message(command, **props):
return {"command": command, "properties": props or {}}
def cast_message(command, **props):
return {"command": command, "msg_type": "cast", "properties": props or {}}
def make_json(command, **props):
return json.dumps(make_message(command, **props))
class MyCircusClient(object):
"""Custom circus command client for this application because we may have to
resort to asyncio-compatible zmq libraries.
"""
def __init__(self, context=None, endpoint=None,
timeout=5.0, ssh_server=None, ssh_keyfile=None):
self._init_context(context)
self.endpoint = endpoint
self._id = b(uuid.uuid4().hex)
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self._id)
self.socket.setsockopt(zmq.LINGER, 0)
self.get_connection(self.socket, endpoint, ssh_server, ssh_keyfile)
self._init_poller()
self._timeout = timeout
self.timeout = timeout * 1000
def get_connection(self, socket, endpoint, ssh_server=None, ssh_keyfile=None):
if ssh_server is None:
socket.connect(endpoint)
else:
try:
try:
ssh.tunnel_connection(socket, endpoint, ssh_server,
keyfile=ssh_keyfile)
except ImportError:
ssh.tunnel_connection(socket, endpoint, ssh_server,
keyfile=ssh_keyfile, paramiko=True)
except ImportError:
raise ImportError("pexpect was not found, and failed to use "
"Paramiko. You need to install Paramiko")
def _init_context(self, context):
self.context = context or zmq.Context.instance()
def _init_poller(self):
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
def stop(self):
# only supported by libzmq >= 3
if hasattr(self.socket, 'disconnect'):
self.socket.disconnect(self.endpoint)
self.socket.close()
@asyncio.coroutine
def send_message(self, command, **props):
return self.call(make_message(command, **props))
def call(self, cmd):
if isinstance(cmd, string_types):
raise DeprecationWarning('call() takes a mapping')
call_id = uuid.uuid4().hex
cmd['id'] = call_id
try:
cmd = json.dumps(cmd)
except ValueError as e:
raise CallError(str(e))
try:
self.socket.send(cmd)
except zmq.ZMQError as e:
raise CallError(str(e))
@view_config(route_name='circus_proxy', mapper=WebsocketMapper)
def echo(ws):
# TODO: Listen for messages from circus
# FIXME: How can I loop over and still check for ZMQ messages
while True:
message = yield from ws.recv()
if message is None:
break
try:
events = dict(self.poller.poll(self.timeout))
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
continue
else:
print(str(e))
raise CallError(str(e))
if len(events) == 0:
raise CallError("Timed out.")
for socket in events:
msg = socket.recv()
try:
res = json.loads(msg)
if res.get('id') != call_id:
# we got the wrong message
continue
return res
except ValueError as e:
raise CallError(str(e))
class FakeCircusClient(object):
def __init__(self, *args, **kwargs):
pass
@asyncio.coroutine
def send_message(self, command, name=None):
from datetime import datetime
print(datetime.utcnow().isoformat(" "))
yield from asyncio.sleep(2)
print(datetime.utcnow().isoformat(" "))
if command == "status":
# TODO: How does this actually work
if name:
# return pids
return [1234]
else:
return {"watchers": ["archive_aeri"], "statuses": ["active"]}
else:
# start, stop
return [1234]
@view_config(route_name='circus_proxy', mapper=WebsocketMapper)
class CircusProxyView(WebsocketConnectionView):
def __init__(self, context, request):
super().__init__(context, request)
self.circus_endpoint = self.request.registry.settings["circus_endpoint"]
self.circus_pubsub_endpoint = self.request.registry.settings["circus_pubsub_endpoint"]
self.circus_stats_endpoint = self.request.registry.settings["circus_stats_endpoint"]
@asyncio.coroutine
def on_open(self):
# Create zmq socket for req/rep to send commands to circus
if DEBUG:
circus_ctl = FakeCircusClient()
else:
circus_ctl = MyCircusClient(endpoint=self.circus_endpoint)
# Create zmq socket for pub/sub to receive status updates from circus
if DEBUG:
circus_status = FakeCircusEvents()
else:
circus_status = CircusEvents(self.circus_pubsub_endpoint)
self.circus_ctl = circus_ctl
self.circus_status = circus_status
self.status_future = asyncio.Future()
asyncio.async(receive_circus_status(self.status_future, self.ws, self.circus_status))
@asyncio.coroutine
def on_message(self, message):
try:
msg_dict = json.loads(message)
LOG.debug("Received WS command: {}".format(msg_dict["command"]))
except Exception:
yield from ws.send(create_json_error({}, "Could not parse JSON message"))
LOG.exception("Could not parse JSON message")
continue
msg = create_json_error({}, "Could not parse JSON message")
yield from self.send(msg)
return
try:
command = msg_dict.get("command", None)
if command is None or command not in commands:
yield from ws.send(create_json_error(msg_dict, "Unknown command: {}".format(command)))
LOG.exception("Unknown command: {}".format(command))
continue
msg = create_json_error(msg_dict, "Unknown command: {}".format(command))
yield from self.send(msg)
return
yield from ws.send(commands[command](msg_dict))
msg = yield from commands[command](self.circus_ctl, msg_dict)
yield from self.send(msg)
except Exception:
yield from ws.send(create_json_error(msg_dict, "Could not complete command: {}".format(command)))
LOG.exception("Could not complete command: {}".format(command))
continue
msg = create_json_error(msg_dict, "Could not complete command: {}".format(command))
yield from self.send(msg)
@view_config(route_name='home', renderer='home.pt')
def my_view(request):
incoming_root = request.registry.settings["incoming_root"]
return {
'title': 'SPARC Control Center',
'project': 'sparchive.control_center',
'experiments': available_experiments(incoming_root),
'current_experiment': current_experiment(incoming_root),
}
def main(global_config, **settings):
......
......@@ -7,6 +7,9 @@
use = egg:sparchive
incoming_root = /no_backup/tmp/sparc-test/incoming/sparc
circus_endpoint = ipc:///var/circus/endpoint
circus_pubsub_endpoint = ipc:///var/circus/pubsub
circus_stats_endpoint = ipc:///var/circus/stats
pyramid.reload_templates = true
pyramid.debug_authorization = false
......
......@@ -7,6 +7,9 @@
use = egg:sparchive
incoming_root = /data/sparc
circus_endpoint = ipc:///var/circus/endpoint
circus_pubsub_endpoint = ipc:///var/circus/pubsub
circus_stats_endpoint = ipc:///var/circus/stats
pyramid.reload_templates = false
pyramid.debug_authorization = false
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment