diff --git a/circus.ini b/circus.ini index f9fb8b6913852c3870a2ef6706e3fb3894330617..429821922155623bb359c5d60bb915bfd9f54622 100644 --- a/circus.ini +++ b/circus.ini @@ -3,11 +3,33 @@ ; watcher logs are limited to 268435456 bytes = 256MB [circus] endpoint = ipc:///var/circus/endpoint -pubsub_endpoint = ipc:///var/circus/pubsub -stats_endpoint = ipc:///var/circus/stats +pubsub_endpoint = tcp://127.0.0.1:5556 +stats_endpoint = tcp://127.0.0.1:5557 +#pubsub_endpoint = ipc:///var/circus/pubsub +#stats_endpoint = ipc:///var/circus/stats endpoint_owner = sparc logoutput = /var/log/circus.log +[watcher:control_center] +#cmd = /home/sparc/repos/git/sparchive/scripts/start_control_center.sh +cmd = /home/sparc/envs/sparchive_control/bin/gunicorn --paste /home/sparc/repos/git/sparchive/sparchive/sparchive/control_center/production.ini +send_hup = True +uid = sparc +gid = users +numprocesses = 1 +singleton = True +autostart = True + +stdout_stream.class = FileStream +stdout_stream.filename = /var/log/sparchive/control_center_out.log +stdout_stream.max_bytes = 268435456 +stdout_stream.backup_count = 3 + +stderr_stream.class = FileStream +stderr_stream.filename = /var/log/sparchive/control_center_err.log +stderr_stream.max_bytes = 268435456 +stderr_stream.backup_count = 3 + [watcher:archive_aeri] cmd = /home/sparc/repos/git/sparchive/scripts/minicron.sh 600 /home/sparc/repos/git/sparchive/scripts/archive_aeri.sh uid = sparc diff --git a/sparchive/sparchive/control_center/app.py b/sparchive/sparchive/control_center/app.py index 713cf7bf9613aa21dac22ab2af00cf2e41ff3473..49918776636144d3ab456b37af8481c901b332b1 100644 --- a/sparchive/sparchive/control_center/app.py +++ b/sparchive/sparchive/control_center/app.py @@ -12,13 +12,14 @@ __author__ = "David Hoese, davidh" from wsgiref.simple_server import make_server from pyramid.config import Configurator from pyramid.view import view_config -from aiopyramid.websocket.config import WebsocketMapper +from aiopyramid.websocket.config.gunicorn import WebsocketMapper from aiopyramid.websocket.view import WebsocketConnectionView from circus.client import CircusClient from circus.py3compat import string_types, s, b from circus.exc import CallError import zmq from zmq import ssh +import aiozmq import os import json @@ -30,7 +31,8 @@ import logging LOG = logging.getLogger("sparchive_control_center") # Don't try to communicate with circus when true -DEBUG = True +DEBUG = False +#DEBUG = True # List of tasks that we actually want to notify the web interface about ENABLED_TASKS = [] @@ -43,6 +45,7 @@ def create_json_error(msg_dict, error_msg=None): response_dict = msg_dict.copy() response_dict["status"] = "ERROR" response_dict["comment"] = error_msg or "" + #return json.dumps(response_dict).encode("ascii") return json.dumps(response_dict) @@ -71,6 +74,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None): "status": "ERROR", "comment": "Could not call command on circus client", } + #return json.dumps(response_dict).encode("ascii") return json.dumps(response_dict) response_dict = { @@ -79,6 +83,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None): "status": "SUCCESS", "comment": "", } + #return json.dumps(response_dict).encode("ascii") return json.dumps(response_dict) @@ -88,6 +93,12 @@ def _handle_stop(circus_ctl, msg_dict): return response_dict +def prepare_status_dict(status_dict): + for k in list(status_dict["statuses"].keys()): + if k not in ENABLED_TASKS: + del status_dict["statuses"][k] + + @asyncio.coroutine def _handle_status(circus_ctl, msg_dict, command="status", name=""): try: @@ -99,25 +110,26 @@ def _handle_status(circus_ctl, msg_dict, command="status", name=""): props["name"] = name status_dict = yield from circus_ctl.send_message(command, **props) - # TODO: How does this command actually work? + prepare_status_dict(status_dict) + from pprint import pprint + pprint(status_dict) except Exception: LOG.exception("Could not call command on circus client") response_dict = { - "task_names": status_dict["watchers"], - "task_statuses": status_dict["statuses"], "command": command, "status": "ERROR", "comment": "Could not call command on circus client", } + #return json.dumps(response_dict).encode("ascii") return json.dumps(response_dict) response_dict = { - "task_names": status_dict["watchers"], - "task_statuses": status_dict["statuses"], + "statuses": status_dict["statuses"], "command": command, - "status": "SUCCESS", + "status": "SUCCESS" if status_dict["status"] == "ok" else "ERROR", "comment": "", } + #return json.dumps(response_dict).encode("ascii") return json.dumps(response_dict) @@ -139,13 +151,12 @@ def receive_circus_status(future, ws, circus_status): LOG.debug("Received circus status event: {}, {}, {}".format(watcher, action, msg)) - if action == "spawn": + if action == "start": # something was started ws_msg = json.dumps({ "command": "status", "status": "SUCCESS", - "task_names": [watcher], - "task_statuses": ["active"] + "statuses": {watcher: "active"}, }) yield from ws.send(ws_msg) elif action == "stop": @@ -153,8 +164,7 @@ def receive_circus_status(future, ws, circus_status): ws_msg = json.dumps({ "command": "status", "status": "SUCCESS", - "task_names": [watcher], - "task_statuses": ["stopped"] + "statuses": {watcher: "stopped"}, }) yield from ws.send(ws_msg) else: @@ -172,25 +182,26 @@ def current_experiment(incoming_root): class CircusEvents(object): - def __init__(self, pubsub_endpoint, topic=b'watcher.'): + def __init__(self, pubsub_endpoint, topic=b'watcher.', callback=receive_circus_status): self.pubsub_endpoint = pubsub_endpoint self.topic = topic + self.callback = callback - self.ctx = zmq.Context() - self.sub_socket = self.ctx.socket(zmq.SUB) - self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic) - self.sub_socket.connect(self.pubsub_endpoint) + @asyncio.coroutine + def connect(self): + self.sub_socket = yield from aiozmq.create_zmq_stream(zmq.SUB, connect=self.pubsub_endpoint) + self.sub_socket._transport.subscribe(self.topic) @asyncio.coroutine def get_message(self): - topic, msg = self.sub_socket.recv_string() + topic, msg = yield from self.sub_socket.read() topic = s(topic) watcher = topic.split('.')[1:-1][0] action = topic.split('.')[-1] - msg = json.loads(msg) + msg = json.loads(msg.decode("utf8")) # actions could be reap, kill, spawn, stop - yield watcher, action, msg + return watcher, action, msg class FakeCircusEvents(object): @@ -220,6 +231,7 @@ def make_json(command, **props): return json.dumps(make_message(command, **props)) +# TODO: Rewrite to use aiozmq. For now the messages are short so this is OK class MyCircusClient(object): """Custom circus command client for this application because we may have to resort to asyncio-compatible zmq libraries. @@ -281,7 +293,7 @@ class MyCircusClient(object): raise CallError(str(e)) try: - self.socket.send(cmd) + self.socket.send_string(cmd) except zmq.ZMQError as e: raise CallError(str(e)) @@ -299,7 +311,7 @@ class MyCircusClient(object): raise CallError("Timed out.") for socket in events: - msg = socket.recv() + msg = socket.recv_string() try: res = json.loads(msg) if res.get('id') != call_id: @@ -321,12 +333,12 @@ class FakeCircusClient(object): yield from asyncio.sleep(2) print(datetime.utcnow().isoformat(" ")) if command == "status": - # TODO: How does this actually work if name: # return pids + # TODO: How does this actually work return [1234] else: - return {"watchers": ["archive_aeri"], "statuses": ["active"]} + return {"status": "ok", "statuses": {"archive_aeri": "active"}} else: # start, stop return [1234] @@ -353,6 +365,8 @@ class CircusProxyView(WebsocketConnectionView): circus_status = FakeCircusEvents() else: circus_status = CircusEvents(self.circus_pubsub_endpoint) + #circus_status = CircusEvents(self.circus_stats_endpoint) + yield from circus_status.connect() self.circus_ctl = circus_ctl self.circus_status = circus_status @@ -380,6 +394,7 @@ class CircusProxyView(WebsocketConnectionView): return msg = yield from commands[command](self.circus_ctl, msg_dict) + print(msg) yield from self.send(msg) except Exception: LOG.exception("Could not complete command: {}".format(command)) diff --git a/sparchive/sparchive/control_center/development.ini b/sparchive/sparchive/control_center/development.ini index 2053158392053d4fd1d763cfc5d0cd27c74d81bc..286cb0e8ca7fe764df0b677dbbe068e588ae7fc1 100644 --- a/sparchive/sparchive/control_center/development.ini +++ b/sparchive/sparchive/control_center/development.ini @@ -6,10 +6,11 @@ [app:main] use = egg:sparchive -incoming_root = /no_backup/tmp/sparc-test/incoming/sparc +incoming_root = /data/sparc +#incoming_root = /no_backup/tmp/sparc-test/incoming/sparc circus_endpoint = ipc:///var/circus/endpoint -circus_pubsub_endpoint = ipc:///var/circus/pubsub -circus_stats_endpoint = ipc:///var/circus/stats +circus_pubsub_endpoint = tcp://127.0.0.1:5556 +circus_stats_endpoint = tcp://127.0.0.1:5557 pyramid.reload_templates = true pyramid.debug_authorization = false diff --git a/sparchive/sparchive/control_center/home.pt b/sparchive/sparchive/control_center/home.pt index 2e27016f67a085a300fe2c49c754082595b343dc..a80beb24eadcc4f0e31146910c1664dc7f465d46 100644 --- a/sparchive/sparchive/control_center/home.pt +++ b/sparchive/sparchive/control_center/home.pt @@ -82,25 +82,29 @@ function onMessage(evt) { console.info("Received message: " + evt.data); + //var msg_obj = evt.data; var msg_obj = JSON.parse(evt.data); if (msg_obj.command == "start" && msg_obj.status == "SUCCESS") { - taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "on"); + // Classes are only changed by status updates in production + //taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "on"); } else if (msg_obj.command == "stop" && msg_obj.status == "SUCCESS") { - taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "off"); + //taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "off"); } else if (msg_obj.command == "status") { if (msg_obj.status == "SUCCESS") { var button_elem; var task_status; - for (var i = 0; i < msg_obj.task_names.length; i++) { - button_elem = document.getElementById("button_" + msg_obj.task_names[i]); - task_status = msg_obj.task_statuses[i]; - if (task_status == "stopped") { - taskButtonChangeClass(button_elem, "off"); - } else if (task_status == "active") { - taskButtonChangeClass(button_elem, "on"); - } else { - taskButtonChangeClass(button_elem, "error"); + for (var key in msg_obj.statuses) { + if (msg_obj.statuses.hasOwnProperty(key)) { + task_status = msg_obj.statuses[key]; + button_elem = document.getElementById("button_" + key); + if (task_status == "stopped") { + taskButtonChangeClass(button_elem, "off"); + } else if (task_status == "active") { + taskButtonChangeClass(button_elem, "on"); + } else { + taskButtonChangeClass(button_elem, "error"); + } } } } diff --git a/sparchive/sparchive/control_center/production.ini b/sparchive/sparchive/control_center/production.ini index 3733f85314cfdb6f3774d85c48906b552434ed25..4fccfdfde5dceac8f6c1df96d056541bd1eda19f 100644 --- a/sparchive/sparchive/control_center/production.ini +++ b/sparchive/sparchive/control_center/production.ini @@ -8,8 +8,8 @@ use = egg:sparchive incoming_root = /data/sparc circus_endpoint = ipc:///var/circus/endpoint -circus_pubsub_endpoint = ipc:///var/circus/pubsub -circus_stats_endpoint = ipc:///var/circus/stats +circus_pubsub_endpoint = tcp://127.0.0.1:5556 +circus_stats_endpoint = tcp://127.0.0.1:5557 pyramid.reload_templates = false pyramid.debug_authorization = false @@ -20,10 +20,20 @@ pyramid.includes = aiopyramid pyramid_tm +logging.config = %(here)s/development.ini + +[uwsgi] +http-socket = 0.0.0.0:6543 +workers = 1 +plugins = + asyncio = 50 ; number of workers + greenlet + [server:main] -use = egg:waitress#main +use = egg:gunicorn#main host = 0.0.0.0 port = 6543 +worker_class = aiopyramid.gunicorn.worker.AsyncGunicornWorker ### # logging configuration @@ -31,7 +41,7 @@ port = 6543 ### [loggers] -keys = root, sparchive_control_center, sqlalchemy +keys = root, asyncio, sparchive_control_center, gunicorn [handlers] keys = console @@ -43,18 +53,20 @@ keys = generic level = WARN handlers = console -[logger_sparchive_control_center] +[logger_asyncio] level = WARN handlers = -qualname = sparchive_control_center +qualname = asyncio -[logger_sqlalchemy] +[logger_gunicorn] +level = INFO +handlers = +qualname = gunicorn + +[logger_sparchive_control_center] level = WARN handlers = -qualname = sqlalchemy.engine -# "level = INFO" logs SQL queries. -# "level = DEBUG" logs SQL queries and results. -# "level = WARN" logs neither. (Recommended for production systems.) +qualname = sparchive_control_center [handler_console] class = StreamHandler