#!/usr/bin/env python """Ingestor for Viasala CT25K Ceilometer. Reads messages from serial port injecting an epoch timestamp before the header of each message. No validation of message data is done. The output should match the legacy output written by the older Java software. """ import logging import os import re import signal import time from datetime import datetime, timedelta import serial # https://stackoverflow.com/a/13638084/433202 TRACE_LEVEL_NUM = 9 logging.addLevelName(TRACE_LEVEL_NUM, "TRACE") def _debugv(self, message, *args, **kws): # Yes, logger takes its '*args' as 'args'. if self.isEnabledFor(TRACE_LEVEL_NUM): self._log(TRACE_LEVEL_NUM, message, args, **kws) setattr(logging, "TRACE", TRACE_LEVEL_NUM) setattr(logging.getLoggerClass(), "debugv", _debugv) LOG = logging.getLogger(__name__) def epoch_secs(dt): """Datetime to seconds from epoch.""" return time.mktime(dt.utctimetuple()) def is_header(line): """Is the line a valid message 2 header.""" return re.match(r"^\x01CT[A-Z0-9][0-9]{2}[2].\x02\r\n$", line) def process_lines(in_lines, ref_dt): """Process lines from the serial port. Epoch timestamps are injected before the message header. All lines are stripped of white space before being returned, except for a NL before the epoch timestamp. """ out_lines = [] num_hdrs = 0 for line in in_lines: if is_header(line): secs = epoch_secs(ref_dt - timedelta(seconds=15) * num_hdrs) out_lines.append("%d\r\n" % secs) num_hdrs += 1 out_lines.append(line) return num_hdrs, out_lines def init_ceilo(portdev): """Initialize ceilometer by sending default configuration values to instrument. When this completes the instrument should be in autosend mode and generating messages. """ port = serial.Serial(port=portdev, baudrate=2400, bytesize=7, parity="E", stopbits=1, timeout=1) init_commands = ( "\r\r\r", "OPEN\r\n", "SET MESSAGE MODE AUTOSEND\r\n", "SET MESSAGE PORT DATA\r\n", "SET MESSAGE TYPE MSG2\r\n", "SET OPER_MODE CONTINUOUS\r\n", "CLOSE\r\n", ) for line in init_commands: LOG.log(9, "SEND: %s", line.strip()) port.write(line.encode("ascii")) port.flush() lines = port.readlines() for data_line in lines: LOG.log(9, "RECV: %s", data_line.decode().strip()) port.close() return serial.Serial(port=portdev, baudrate=2400, bytesize=7, parity="E", stopbits=1, timeout=7.5) def read_cfg(cfgfile): """Read ceilometer configuration file.""" from configparser import ConfigParser parser = ConfigParser() parser.read(cfgfile) return parser def main(): """Start ingest of realtime instrument data.""" from argparse import ArgumentParser parser = ArgumentParser() levels = { "trace": 9, "debug": logging.DEBUG, "info": logging.INFO, "warn": logging.WARN, "error": logging.ERROR, } parser.add_argument("-v", dest="loglvl", choices=levels.keys(), default="info") parser.add_argument("--log-dir", help="Base directory where log files will be written") parser.add_argument("-o", dest="outdir", default=".") parser.add_argument( "-f", dest="fmt", default="aoss_ceilo.%Y-%m-%d.ascii", help="output filename (supports date formatting)", ) parser.add_argument("-p", dest="port", help="serial device") parser.add_argument( "-c", dest="cfgfile", help="INI style config. If provided all other options are ignored", ) args = parser.parse_args() if args.cfgfile: from logging.config import fileConfig config = read_cfg(args.cfgfile) config.set("DEFAULT", "root_log_dir", args.log_dir) fileConfig(config) ceilo_config = dict(config.items("ct25k")) portdev = ceilo_config.get("port") filefmt = ceilo_config.get("filefmt") outdir = ceilo_config.get("outdir") else: outdir = args.outdir portdev = args.port filefmt = args.fmt loglvl = levels[args.loglvl] logging.basicConfig(level=loglvl) for name in ["portdev", "filefmt", "outdir"]: if not locals().get(name): parser.print_usage() parser.exit(1) def handle_signal(*args, **kwargs): LOG.warning("received TERM or INT") signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) LOG.info("initializing ceilometer...") port = init_ceilo(portdev) LOG.info("starting ingest") while True: fptr = datalog(filefmt, outdir) LOG.log(9, "got log %s", fptr.name) try: in_lines = [x.decode() for x in port.readlines()] LOG.debug("read %s lines", len(in_lines)) num_hdrs, out_lines = process_lines(in_lines, datetime.now()) LOG.debug("found %s potential messages", num_hdrs) LOG.log(9, "".join(out_lines)) LOG.debug("writing %s lines", len(out_lines)) fptr.write("".join(out_lines)) fptr.flush() except Exception as err: if err.args and err.args[0] == 4: # interrupted syscall break raise try: port.close() except (serial.SerialException, IOError, OSError): pass def datalog(filefmt, outdir): """Handle automatic opening and closing of dated data files.""" now = datetime.now() if not datalog.fptr or now.date() > datalog.date: datalog.date = now.date() fn = datalog.date.strftime(filefmt) fpth = os.path.join(outdir, fn) if datalog.fptr: LOG.info("closing %s", datalog.fptr.name) datalog.fptr.close() datalog.fptr = open(fpth, "a") LOG.info("opened %s", datalog.fptr.name) return datalog.fptr datalog.fptr = None # type: ignore[attr-defined] if __name__ == "__main__": main()