From e368affdb087657048afc75daf8d0cddde8c0976 Mon Sep 17 00:00:00 2001
From: David Hoese <david.hoese@ssec.wisc.edu>
Date: Tue, 19 May 2015 20:44:01 +0000
Subject: [PATCH] got control center working with aiozmq and added to circus
control
---
circus.ini | 26 +++++++-
sparchive/sparchive/control_center/app.py | 65 ++++++++++++-------
.../sparchive/control_center/development.ini | 7 +-
sparchive/sparchive/control_center/home.pt | 26 ++++----
.../sparchive/control_center/production.ini | 34 ++++++----
5 files changed, 106 insertions(+), 52 deletions(-)
diff --git a/circus.ini b/circus.ini
index f9fb8b6..4298219 100644
--- a/circus.ini
+++ b/circus.ini
@@ -3,11 +3,33 @@
; watcher logs are limited to 268435456 bytes = 256MB
[circus]
endpoint = ipc:///var/circus/endpoint
-pubsub_endpoint = ipc:///var/circus/pubsub
-stats_endpoint = ipc:///var/circus/stats
+pubsub_endpoint = tcp://127.0.0.1:5556
+stats_endpoint = tcp://127.0.0.1:5557
+#pubsub_endpoint = ipc:///var/circus/pubsub
+#stats_endpoint = ipc:///var/circus/stats
endpoint_owner = sparc
logoutput = /var/log/circus.log
+[watcher:control_center]
+#cmd = /home/sparc/repos/git/sparchive/scripts/start_control_center.sh
+cmd = /home/sparc/envs/sparchive_control/bin/gunicorn --paste /home/sparc/repos/git/sparchive/sparchive/sparchive/control_center/production.ini
+send_hup = True
+uid = sparc
+gid = users
+numprocesses = 1
+singleton = True
+autostart = True
+
+stdout_stream.class = FileStream
+stdout_stream.filename = /var/log/sparchive/control_center_out.log
+stdout_stream.max_bytes = 268435456
+stdout_stream.backup_count = 3
+
+stderr_stream.class = FileStream
+stderr_stream.filename = /var/log/sparchive/control_center_err.log
+stderr_stream.max_bytes = 268435456
+stderr_stream.backup_count = 3
+
[watcher:archive_aeri]
cmd = /home/sparc/repos/git/sparchive/scripts/minicron.sh 600 /home/sparc/repos/git/sparchive/scripts/archive_aeri.sh
uid = sparc
diff --git a/sparchive/sparchive/control_center/app.py b/sparchive/sparchive/control_center/app.py
index 713cf7b..4991877 100644
--- a/sparchive/sparchive/control_center/app.py
+++ b/sparchive/sparchive/control_center/app.py
@@ -12,13 +12,14 @@ __author__ = "David Hoese, davidh"
from wsgiref.simple_server import make_server
from pyramid.config import Configurator
from pyramid.view import view_config
-from aiopyramid.websocket.config import WebsocketMapper
+from aiopyramid.websocket.config.gunicorn import WebsocketMapper
from aiopyramid.websocket.view import WebsocketConnectionView
from circus.client import CircusClient
from circus.py3compat import string_types, s, b
from circus.exc import CallError
import zmq
from zmq import ssh
+import aiozmq
import os
import json
@@ -30,7 +31,8 @@ import logging
LOG = logging.getLogger("sparchive_control_center")
# Don't try to communicate with circus when true
-DEBUG = True
+DEBUG = False
+#DEBUG = True
# List of tasks that we actually want to notify the web interface about
ENABLED_TASKS = []
@@ -43,6 +45,7 @@ def create_json_error(msg_dict, error_msg=None):
response_dict = msg_dict.copy()
response_dict["status"] = "ERROR"
response_dict["comment"] = error_msg or ""
+ #return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict)
@@ -71,6 +74,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None):
"status": "ERROR",
"comment": "Could not call command on circus client",
}
+ #return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict)
response_dict = {
@@ -79,6 +83,7 @@ def _handle_start(circus_ctl, msg_dict, command="start", name=None):
"status": "SUCCESS",
"comment": "",
}
+ #return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict)
@@ -88,6 +93,12 @@ def _handle_stop(circus_ctl, msg_dict):
return response_dict
+def prepare_status_dict(status_dict):
+ for k in list(status_dict["statuses"].keys()):
+ if k not in ENABLED_TASKS:
+ del status_dict["statuses"][k]
+
+
@asyncio.coroutine
def _handle_status(circus_ctl, msg_dict, command="status", name=""):
try:
@@ -99,25 +110,26 @@ def _handle_status(circus_ctl, msg_dict, command="status", name=""):
props["name"] = name
status_dict = yield from circus_ctl.send_message(command, **props)
- # TODO: How does this command actually work?
+ prepare_status_dict(status_dict)
+ from pprint import pprint
+ pprint(status_dict)
except Exception:
LOG.exception("Could not call command on circus client")
response_dict = {
- "task_names": status_dict["watchers"],
- "task_statuses": status_dict["statuses"],
"command": command,
"status": "ERROR",
"comment": "Could not call command on circus client",
}
+ #return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict)
response_dict = {
- "task_names": status_dict["watchers"],
- "task_statuses": status_dict["statuses"],
+ "statuses": status_dict["statuses"],
"command": command,
- "status": "SUCCESS",
+ "status": "SUCCESS" if status_dict["status"] == "ok" else "ERROR",
"comment": "",
}
+ #return json.dumps(response_dict).encode("ascii")
return json.dumps(response_dict)
@@ -139,13 +151,12 @@ def receive_circus_status(future, ws, circus_status):
LOG.debug("Received circus status event: {}, {}, {}".format(watcher, action, msg))
- if action == "spawn":
+ if action == "start":
# something was started
ws_msg = json.dumps({
"command": "status",
"status": "SUCCESS",
- "task_names": [watcher],
- "task_statuses": ["active"]
+ "statuses": {watcher: "active"},
})
yield from ws.send(ws_msg)
elif action == "stop":
@@ -153,8 +164,7 @@ def receive_circus_status(future, ws, circus_status):
ws_msg = json.dumps({
"command": "status",
"status": "SUCCESS",
- "task_names": [watcher],
- "task_statuses": ["stopped"]
+ "statuses": {watcher: "stopped"},
})
yield from ws.send(ws_msg)
else:
@@ -172,25 +182,26 @@ def current_experiment(incoming_root):
class CircusEvents(object):
- def __init__(self, pubsub_endpoint, topic=b'watcher.'):
+ def __init__(self, pubsub_endpoint, topic=b'watcher.', callback=receive_circus_status):
self.pubsub_endpoint = pubsub_endpoint
self.topic = topic
+ self.callback = callback
- self.ctx = zmq.Context()
- self.sub_socket = self.ctx.socket(zmq.SUB)
- self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.topic)
- self.sub_socket.connect(self.pubsub_endpoint)
+ @asyncio.coroutine
+ def connect(self):
+ self.sub_socket = yield from aiozmq.create_zmq_stream(zmq.SUB, connect=self.pubsub_endpoint)
+ self.sub_socket._transport.subscribe(self.topic)
@asyncio.coroutine
def get_message(self):
- topic, msg = self.sub_socket.recv_string()
+ topic, msg = yield from self.sub_socket.read()
topic = s(topic)
watcher = topic.split('.')[1:-1][0]
action = topic.split('.')[-1]
- msg = json.loads(msg)
+ msg = json.loads(msg.decode("utf8"))
# actions could be reap, kill, spawn, stop
- yield watcher, action, msg
+ return watcher, action, msg
class FakeCircusEvents(object):
@@ -220,6 +231,7 @@ def make_json(command, **props):
return json.dumps(make_message(command, **props))
+# TODO: Rewrite to use aiozmq. For now the messages are short so this is OK
class MyCircusClient(object):
"""Custom circus command client for this application because we may have to
resort to asyncio-compatible zmq libraries.
@@ -281,7 +293,7 @@ class MyCircusClient(object):
raise CallError(str(e))
try:
- self.socket.send(cmd)
+ self.socket.send_string(cmd)
except zmq.ZMQError as e:
raise CallError(str(e))
@@ -299,7 +311,7 @@ class MyCircusClient(object):
raise CallError("Timed out.")
for socket in events:
- msg = socket.recv()
+ msg = socket.recv_string()
try:
res = json.loads(msg)
if res.get('id') != call_id:
@@ -321,12 +333,12 @@ class FakeCircusClient(object):
yield from asyncio.sleep(2)
print(datetime.utcnow().isoformat(" "))
if command == "status":
- # TODO: How does this actually work
if name:
# return pids
+ # TODO: How does this actually work
return [1234]
else:
- return {"watchers": ["archive_aeri"], "statuses": ["active"]}
+ return {"status": "ok", "statuses": {"archive_aeri": "active"}}
else:
# start, stop
return [1234]
@@ -353,6 +365,8 @@ class CircusProxyView(WebsocketConnectionView):
circus_status = FakeCircusEvents()
else:
circus_status = CircusEvents(self.circus_pubsub_endpoint)
+ #circus_status = CircusEvents(self.circus_stats_endpoint)
+ yield from circus_status.connect()
self.circus_ctl = circus_ctl
self.circus_status = circus_status
@@ -380,6 +394,7 @@ class CircusProxyView(WebsocketConnectionView):
return
msg = yield from commands[command](self.circus_ctl, msg_dict)
+ print(msg)
yield from self.send(msg)
except Exception:
LOG.exception("Could not complete command: {}".format(command))
diff --git a/sparchive/sparchive/control_center/development.ini b/sparchive/sparchive/control_center/development.ini
index 2053158..286cb0e 100644
--- a/sparchive/sparchive/control_center/development.ini
+++ b/sparchive/sparchive/control_center/development.ini
@@ -6,10 +6,11 @@
[app:main]
use = egg:sparchive
-incoming_root = /no_backup/tmp/sparc-test/incoming/sparc
+incoming_root = /data/sparc
+#incoming_root = /no_backup/tmp/sparc-test/incoming/sparc
circus_endpoint = ipc:///var/circus/endpoint
-circus_pubsub_endpoint = ipc:///var/circus/pubsub
-circus_stats_endpoint = ipc:///var/circus/stats
+circus_pubsub_endpoint = tcp://127.0.0.1:5556
+circus_stats_endpoint = tcp://127.0.0.1:5557
pyramid.reload_templates = true
pyramid.debug_authorization = false
diff --git a/sparchive/sparchive/control_center/home.pt b/sparchive/sparchive/control_center/home.pt
index 2e27016..a80beb2 100644
--- a/sparchive/sparchive/control_center/home.pt
+++ b/sparchive/sparchive/control_center/home.pt
@@ -82,25 +82,29 @@
function onMessage(evt) {
console.info("Received message: " + evt.data);
+ //var msg_obj = evt.data;
var msg_obj = JSON.parse(evt.data);
if (msg_obj.command == "start" && msg_obj.status == "SUCCESS") {
- taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "on");
+ // Classes are only changed by status updates in production
+ //taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "on");
} else if (msg_obj.command == "stop" && msg_obj.status == "SUCCESS") {
- taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "off");
+ //taskButtonChangeClass(document.getElementById("button_" + msg_obj.task_name), "off");
} else if (msg_obj.command == "status") {
if (msg_obj.status == "SUCCESS") {
var button_elem;
var task_status;
- for (var i = 0; i < msg_obj.task_names.length; i++) {
- button_elem = document.getElementById("button_" + msg_obj.task_names[i]);
- task_status = msg_obj.task_statuses[i];
- if (task_status == "stopped") {
- taskButtonChangeClass(button_elem, "off");
- } else if (task_status == "active") {
- taskButtonChangeClass(button_elem, "on");
- } else {
- taskButtonChangeClass(button_elem, "error");
+ for (var key in msg_obj.statuses) {
+ if (msg_obj.statuses.hasOwnProperty(key)) {
+ task_status = msg_obj.statuses[key];
+ button_elem = document.getElementById("button_" + key);
+ if (task_status == "stopped") {
+ taskButtonChangeClass(button_elem, "off");
+ } else if (task_status == "active") {
+ taskButtonChangeClass(button_elem, "on");
+ } else {
+ taskButtonChangeClass(button_elem, "error");
+ }
}
}
}
diff --git a/sparchive/sparchive/control_center/production.ini b/sparchive/sparchive/control_center/production.ini
index 3733f85..4fccfdf 100644
--- a/sparchive/sparchive/control_center/production.ini
+++ b/sparchive/sparchive/control_center/production.ini
@@ -8,8 +8,8 @@ use = egg:sparchive
incoming_root = /data/sparc
circus_endpoint = ipc:///var/circus/endpoint
-circus_pubsub_endpoint = ipc:///var/circus/pubsub
-circus_stats_endpoint = ipc:///var/circus/stats
+circus_pubsub_endpoint = tcp://127.0.0.1:5556
+circus_stats_endpoint = tcp://127.0.0.1:5557
pyramid.reload_templates = false
pyramid.debug_authorization = false
@@ -20,10 +20,20 @@ pyramid.includes =
aiopyramid
pyramid_tm
+logging.config = %(here)s/development.ini
+
+[uwsgi]
+http-socket = 0.0.0.0:6543
+workers = 1
+plugins =
+ asyncio = 50 ; number of workers
+ greenlet
+
[server:main]
-use = egg:waitress#main
+use = egg:gunicorn#main
host = 0.0.0.0
port = 6543
+worker_class = aiopyramid.gunicorn.worker.AsyncGunicornWorker
###
# logging configuration
@@ -31,7 +41,7 @@ port = 6543
###
[loggers]
-keys = root, sparchive_control_center, sqlalchemy
+keys = root, asyncio, sparchive_control_center, gunicorn
[handlers]
keys = console
@@ -43,18 +53,20 @@ keys = generic
level = WARN
handlers = console
-[logger_sparchive_control_center]
+[logger_asyncio]
level = WARN
handlers =
-qualname = sparchive_control_center
+qualname = asyncio
-[logger_sqlalchemy]
+[logger_gunicorn]
+level = INFO
+handlers =
+qualname = gunicorn
+
+[logger_sparchive_control_center]
level = WARN
handlers =
-qualname = sqlalchemy.engine
-# "level = INFO" logs SQL queries.
-# "level = DEBUG" logs SQL queries and results.
-# "level = WARN" logs neither. (Recommended for production systems.)
+qualname = sparchive_control_center
[handler_console]
class = StreamHandler
--
GitLab