Skip to content
Snippets Groups Projects
Commit e368affd authored by David Hoese's avatar David Hoese Committed by sparc
Browse files

got control center working with aiozmq and added to circus control

parent e47ed5a3
No related branches found
No related tags found
No related merge requests found
...@@ -3,11 +3,33 @@ ...@@ -3,11 +3,33 @@
; watcher logs are limited to 268435456 bytes = 256MB ; watcher logs are limited to 268435456 bytes = 256MB
[circus] [circus]
endpoint = ipc:///var/circus/endpoint endpoint = ipc:///var/circus/endpoint
pubsub_endpoint = ipc:///var/circus/pubsub pubsub_endpoint = tcp://127.0.0.1:5556
stats_endpoint = ipc:///var/circus/stats stats_endpoint = tcp://127.0.0.1:5557
#pubsub_endpoint = ipc:///var/circus/pubsub
#stats_endpoint = ipc:///var/circus/stats
endpoint_owner = sparc endpoint_owner = sparc
logoutput = /var/log/circus.log logoutput = /var/log/circus.log
[watcher:control_center]
#cmd = /home/sparc/repos/git/sparchive/scripts/start_control_center.sh
cmd = /home/sparc/envs/sparchive_control/bin/gunicorn --paste /home/sparc/repos/git/sparchive/sparchive/sparchive/control_center/production.ini
send_hup = True
uid = sparc
gid = users
numprocesses = 1
singleton = True
autostart = True
stdout_stream.class = FileStream
stdout_stream.filename = /var/log/sparchive/control_center_out.log
stdout_stream.max_bytes = 268435456
stdout_stream.backup_count = 3
stderr_stream.class = FileStream
stderr_stream.filename = /var/log/sparchive/control_center_err.log
stderr_stream.max_bytes = 268435456
stderr_stream.backup_count = 3
[watcher:archive_aeri] [watcher:archive_aeri]
cmd = /home/sparc/repos/git/sparchive/scripts/minicron.sh 600 /home/sparc/repos/git/sparchive/scripts/archive_aeri.sh cmd = /home/sparc/repos/git/sparchive/scripts/minicron.sh 600 /home/sparc/repos/git/sparchive/scripts/archive_aeri.sh
uid = sparc uid = sparc
......
...@@ -12,13 +12,14 @@ __author__ = "David Hoese, davidh" ...@@ -12,13 +12,14 @@ __author__ = "David Hoese, davidh"
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
from pyramid.config import Configurator from pyramid.config import Configurator
from pyramid.view import view_config from pyramid.view import view_config
from aiopyramid.websocket.config import WebsocketMapper from aiopyramid.websocket.config.gunicorn import WebsocketMapper
from aiopyramid.websocket.view import WebsocketConnectionView from aiopyramid.websocket.view import WebsocketConnectionView
from circus.client import CircusClient from circus.client import CircusClient
from circus.py3compat import string_types, s, b from circus.py3compat import string_types, s, b
from circus.exc import CallError from circus.exc import CallError
import zmq import zmq
from zmq import ssh from zmq import ssh
import aiozmq
import os import os
import json import json
...@@ -30,7 +31,8 @@ import logging ...@@ -30,7 +31,8 @@ import logging
LOG = logging.getLogger("sparchive_control_center") LOG = logging.getLogger("sparchive_control_center")
# Don't try to communicate with circus when true # Don't try to communicate with circus when true
DEBUG = True DEBUG = False
#DEBUG = True
# List of tasks that we actually want to notify the web interface about # List of tasks that we actually want to notify the web interface about
ENABLED_TASKS = [] ENABLED_TASKS = []
...@@ -43,6 +45,7 @@ def create_json_error(msg_dict, error_msg=None): ...@@ -43,6 +45,7 @@ def create_json_error(msg_dict, error_msg=None):
response_dict = msg_dict.copy() response_dict = msg_dict.copy()
response_dict["status"] = "ERROR" response_dict["status"] = "ERROR"
response_dict["comment"] = error_msg or "" response_dict["comment"] = error_msg or ""
#return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict) return json.dumps(response_dict)
...@@ -71,6 +74,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None): ...@@ -71,6 +74,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None):
"status": "ERROR", "status": "ERROR",
"comment": "Could not call command on circus client", "comment": "Could not call command on circus client",
} }
#return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict) return json.dumps(response_dict)
response_dict = { response_dict = {
...@@ -79,6 +83,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None): ...@@ -79,6 +83,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None):
"status": "SUCCESS", "status": "SUCCESS",
"comment": "", "comment": "",
} }
#return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict) return json.dumps(response_dict)
...@@ -88,6 +93,12 @@ def _handle_stop(circus_ctl, msg_dict): ...@@ -88,6 +93,12 @@ def _handle_stop(circus_ctl, msg_dict):
return response_dict return response_dict
def prepare_status_dict(status_dict):
for k in list(status_dict["statuses"].keys()):
if k not in ENABLED_TASKS:
del status_dict["statuses"][k]
@asyncio.coroutine @asyncio.coroutine
def _handle_status(circus_ctl, msg_dict, command="status", name=""): def _handle_status(circus_ctl, msg_dict, command="status", name=""):
try: try:
...@@ -99,25 +110,26 @@ def _handle_status(circus_ctl, msg_dict, command="status", name=""): ...@@ -99,25 +110,26 @@ def _handle_status(circus_ctl, msg_dict, command="status", name=""):
props["name"] = name props["name"] = name
status_dict = yield from circus_ctl.send_message(command, **props) status_dict = yield from circus_ctl.send_message(command, **props)
# TODO: How does this command actually work? prepare_status_dict(status_dict)
from pprint import pprint
pprint(status_dict)
except Exception: except Exception:
LOG.exception("Could not call command on circus client") LOG.exception("Could not call command on circus client")
response_dict = { response_dict = {
"task_names": status_dict["watchers"],
"task_statuses": status_dict["statuses"],
"command": command, "command": command,
"status": "ERROR", "status": "ERROR",
"comment": "Could not call command on circus client", "comment": "Could not call command on circus client",
} }
#return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict) return json.dumps(response_dict)
response_dict = { response_dict = {
"task_names": status_dict["watchers"], "statuses": status_dict["statuses"],
"task_statuses": status_dict["statuses"],
"command": command, "command": command,
"status": "SUCCESS", "status": "SUCCESS" if status_dict["status"] == "ok" else "ERROR",
"comment": "", "comment": "",
} }
#return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict) return json.dumps(response_dict)
...@@ -139,13 +151,12 @@ def receive_circus_status(future, ws, circus_status): ...@@ -139,13 +151,12 @@ def receive_circus_status(future, ws, circus_status):
LOG.debug("Received circus status event: {}, {}, {}".format(watcher, action, msg)) LOG.debug("Received circus status event: {}, {}, {}".format(watcher, action, msg))
if action == "spawn": if action == "start":
# something was started # something was started
ws_msg = json.dumps({ ws_msg = json.dumps({
"command": "status", "command": "status",
"status": "SUCCESS", "status": "SUCCESS",
"task_names": [watcher], "statuses": {watcher: "active"},
"task_statuses": ["active"]
}) })
yield from ws.send(ws_msg) yield from ws.send(ws_msg)
elif action == "stop": elif action == "stop":
...@@ -153,8 +164,7 @@ def receive_circus_status(future, ws, circus_status): ...@@ -153,8 +164,7 @@ def receive_circus_status(future, ws, circus_status):
ws_msg = json.dumps({ ws_msg = json.dumps({
"command": "status", "command": "status",
"status": "SUCCESS", "status": "SUCCESS",
"task_names": [watcher], "statuses": {watcher: "stopped"},
"task_statuses": ["stopped"]
}) })
yield from ws.send(ws_msg) yield from ws.send(ws_msg)
else: else:
...@@ -172,25 +182,26 @@ def current_experiment(incoming_root): ...@@ -172,25 +182,26 @@ def current_experiment(incoming_root):
class CircusEvents(object): class CircusEvents(object):
def __init__(self, pubsub_endpoint, topic=b'watcher.'): def __init__(self, pubsub_endpoint, topic=b'watcher.', callback=receive_circus_status):
self.pubsub_endpoint = pubsub_endpoint self.pubsub_endpoint = pubsub_endpoint
self.topic = topic self.topic = topic
self.callback = callback
self.ctx = zmq.Context() @asyncio.coroutine
self.sub_socket = self.ctx.socket(zmq.SUB) def connect(self):
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic) self.sub_socket = yield from aiozmq.create_zmq_stream(zmq.SUB, connect=self.pubsub_endpoint)
self.sub_socket.connect(self.pubsub_endpoint) self.sub_socket._transport.subscribe(self.topic)
@asyncio.coroutine @asyncio.coroutine
def get_message(self): def get_message(self):
topic, msg = self.sub_socket.recv_string() topic, msg = yield from self.sub_socket.read()
topic = s(topic) topic = s(topic)
watcher = topic.split('.')[1:-1][0] watcher = topic.split('.')[1:-1][0]
action = topic.split('.')[-1] action = topic.split('.')[-1]
msg = json.loads(msg) msg = json.loads(msg.decode("utf8"))
# actions could be reap, kill, spawn, stop # actions could be reap, kill, spawn, stop
yield watcher, action, msg return watcher, action, msg
class FakeCircusEvents(object): class FakeCircusEvents(object):
...@@ -220,6 +231,7 @@ def make_json(command, **props): ...@@ -220,6 +231,7 @@ def make_json(command, **props):
return json.dumps(make_message(command, **props)) return json.dumps(make_message(command, **props))
# TODO: Rewrite to use aiozmq. For now the messages are short so this is OK
class MyCircusClient(object): class MyCircusClient(object):
"""Custom circus command client for this application because we may have to """Custom circus command client for this application because we may have to
resort to asyncio-compatible zmq libraries. resort to asyncio-compatible zmq libraries.
...@@ -281,7 +293,7 @@ class MyCircusClient(object): ...@@ -281,7 +293,7 @@ class MyCircusClient(object):
raise CallError(str(e)) raise CallError(str(e))
try: try:
self.socket.send(cmd) self.socket.send_string(cmd)
except zmq.ZMQError as e: except zmq.ZMQError as e:
raise CallError(str(e)) raise CallError(str(e))
...@@ -299,7 +311,7 @@ class MyCircusClient(object): ...@@ -299,7 +311,7 @@ class MyCircusClient(object):
raise CallError("Timed out.") raise CallError("Timed out.")
for socket in events: for socket in events:
msg = socket.recv() msg = socket.recv_string()
try: try:
res = json.loads(msg) res = json.loads(msg)
if res.get('id') != call_id: if res.get('id') != call_id:
...@@ -321,12 +333,12 @@ class FakeCircusClient(object): ...@@ -321,12 +333,12 @@ class FakeCircusClient(object):
yield from asyncio.sleep(2) yield from asyncio.sleep(2)
print(datetime.utcnow().isoformat(" ")) print(datetime.utcnow().isoformat(" "))
if command == "status": if command == "status":
# TODO: How does this actually work
if name: if name:
# return pids # return pids
# TODO: How does this actually work
return [1234] return [1234]
else: else:
return {"watchers": ["archive_aeri"], "statuses": ["active"]} return {"status": "ok", "statuses": {"archive_aeri": "active"}}
else: else:
# start, stop # start, stop
return [1234] return [1234]
...@@ -353,6 +365,8 @@ class CircusProxyView(WebsocketConnectionView): ...@@ -353,6 +365,8 @@ class CircusProxyView(WebsocketConnectionView):
circus_status = FakeCircusEvents() circus_status = FakeCircusEvents()
else: else:
circus_status = CircusEvents(self.circus_pubsub_endpoint) circus_status = CircusEvents(self.circus_pubsub_endpoint)
#circus_status = CircusEvents(self.circus_stats_endpoint)
yield from circus_status.connect()
self.circus_ctl = circus_ctl self.circus_ctl = circus_ctl
self.circus_status = circus_status self.circus_status = circus_status
...@@ -380,6 +394,7 @@ class CircusProxyView(WebsocketConnectionView): ...@@ -380,6 +394,7 @@ class CircusProxyView(WebsocketConnectionView):
return return
msg = yield from commands[command](self.circus_ctl, msg_dict) msg = yield from commands[command](self.circus_ctl, msg_dict)
print(msg)
yield from self.send(msg) yield from self.send(msg)
except Exception: except Exception:
LOG.exception("Could not complete command: {}".format(command)) LOG.exception("Could not complete command: {}".format(command))
......
...@@ -6,10 +6,11 @@ ...@@ -6,10 +6,11 @@
[app:main] [app:main]
use = egg:sparchive use = egg:sparchive
incoming_root = /no_backup/tmp/sparc-test/incoming/sparc incoming_root = /data/sparc
#incoming_root = /no_backup/tmp/sparc-test/incoming/sparc
circus_endpoint = ipc:///var/circus/endpoint circus_endpoint = ipc:///var/circus/endpoint
circus_pubsub_endpoint = ipc:///var/circus/pubsub circus_pubsub_endpoint = tcp://127.0.0.1:5556
circus_stats_endpoint = ipc:///var/circus/stats circus_stats_endpoint = tcp://127.0.0.1:5557
pyramid.reload_templates = true pyramid.reload_templates = true
pyramid.debug_authorization = false pyramid.debug_authorization = false
......
...@@ -82,25 +82,29 @@ ...@@ -82,25 +82,29 @@
function onMessage(evt) { function onMessage(evt) {
console.info("Received message: " + evt.data); console.info("Received message: " + evt.data);
//var msg_obj = evt.data;
var msg_obj = JSON.parse(evt.data); var msg_obj = JSON.parse(evt.data);
if (msg_obj.command == "start" && msg_obj.status == "SUCCESS") { if (msg_obj.command == "start" && msg_obj.status == "SUCCESS") {
taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "on"); // Classes are only changed by status updates in production
//taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "on");
} else if (msg_obj.command == "stop" && msg_obj.status == "SUCCESS") { } else if (msg_obj.command == "stop" && msg_obj.status == "SUCCESS") {
taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "off"); //taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "off");
} else if (msg_obj.command == "status") { } else if (msg_obj.command == "status") {
if (msg_obj.status == "SUCCESS") { if (msg_obj.status == "SUCCESS") {
var button_elem; var button_elem;
var task_status; var task_status;
for (var i = 0; i < msg_obj.task_names.length; i++) { for (var key in msg_obj.statuses) {
button_elem = document.getElementById("button_" + msg_obj.task_names[i]); if (msg_obj.statuses.hasOwnProperty(key)) {
task_status = msg_obj.task_statuses[i]; task_status = msg_obj.statuses[key];
if (task_status == "stopped") { button_elem = document.getElementById("button_" + key);
taskButtonChangeClass(button_elem, "off"); if (task_status == "stopped") {
} else if (task_status == "active") { taskButtonChangeClass(button_elem, "off");
taskButtonChangeClass(button_elem, "on"); } else if (task_status == "active") {
} else { taskButtonChangeClass(button_elem, "on");
taskButtonChangeClass(button_elem, "error"); } else {
taskButtonChangeClass(button_elem, "error");
}
} }
} }
} }
......
...@@ -8,8 +8,8 @@ use = egg:sparchive ...@@ -8,8 +8,8 @@ use = egg:sparchive
incoming_root = /data/sparc incoming_root = /data/sparc
circus_endpoint = ipc:///var/circus/endpoint circus_endpoint = ipc:///var/circus/endpoint
circus_pubsub_endpoint = ipc:///var/circus/pubsub circus_pubsub_endpoint = tcp://127.0.0.1:5556
circus_stats_endpoint = ipc:///var/circus/stats circus_stats_endpoint = tcp://127.0.0.1:5557
pyramid.reload_templates = false pyramid.reload_templates = false
pyramid.debug_authorization = false pyramid.debug_authorization = false
...@@ -20,10 +20,20 @@ pyramid.includes = ...@@ -20,10 +20,20 @@ pyramid.includes =
aiopyramid aiopyramid
pyramid_tm pyramid_tm
logging.config = %(here)s/development.ini
[uwsgi]
http-socket = 0.0.0.0:6543
workers = 1
plugins =
asyncio = 50 ; number of workers
greenlet
[server:main] [server:main]
use = egg:waitress#main use = egg:gunicorn#main
host = 0.0.0.0 host = 0.0.0.0
port = 6543 port = 6543
worker_class = aiopyramid.gunicorn.worker.AsyncGunicornWorker
### ###
# logging configuration # logging configuration
...@@ -31,7 +41,7 @@ port = 6543 ...@@ -31,7 +41,7 @@ port = 6543
### ###
[loggers] [loggers]
keys = root, sparchive_control_center, sqlalchemy keys = root, asyncio, sparchive_control_center, gunicorn
[handlers] [handlers]
keys = console keys = console
...@@ -43,18 +53,20 @@ keys = generic ...@@ -43,18 +53,20 @@ keys = generic
level = WARN level = WARN
handlers = console handlers = console
[logger_sparchive_control_center] [logger_asyncio]
level = WARN level = WARN
handlers = handlers =
qualname = sparchive_control_center qualname = asyncio
[logger_sqlalchemy] [logger_gunicorn]
level = INFO
handlers =
qualname = gunicorn
[logger_sparchive_control_center]
level = WARN level = WARN
handlers = handlers =
qualname = sqlalchemy.engine qualname = sparchive_control_center
# "level = INFO" logs SQL queries.
# "level = DEBUG" logs SQL queries and results.
# "level = WARN" logs neither. (Recommended for production systems.)
[handler_console] [handler_console]
class = StreamHandler class = StreamHandler
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment