ingest.py 6.34 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python
"""Ingestor for Viasala CT25K Ceilometer.

Reads messages from serial port injecting an epoch timestamp before the header
of each message. No validation of message data is done. 

The output should match the legacy output written by the older Java software.

In-general, to configure ingest:

    1. Install python >= 2.7
    2. Install metobs.ceilo
    3. Install rc_userboot.py 
        https://groups.ssec.wisc.edu/employee-info/for-programmers/programming-environment/rc-userboot
    4. Create a config file, see example in SVN
    5. Add ~/.bootrc to start ingest in screen detached session:
        #!/usr/bin/env bash
        screen -d -m $HOME/env/production/bin/ct25k_ingest $HOME/ceilo.cfg
    6. Create cronjob script to push data:
        #!/usr/bin/env bash
        export TZ="UTC-00:01:00"
        SRC="$HOME/data/rig_ceilo-$(date +%Y-%m-%d).ascii"
        if [ -e $SRC ]; then
        rsync -aux $SRC rsync://tahiti.ssec.wisc.edu/incoming/Instrument_Data/METOBS/RIG/Ceilo/raw
        fi
    7. Create cronjob for push
        */5 * * * * $HOME/data/rsync_data.sh &> /dev/null

"""
import logging
import time
import re
import os
import signal
from datetime import datetime, timedelta

import serial

# FIXME: bad-joo-joo
logging._levelNames[9] = 'TRACE'
logging._levelNames['TRACE'] = 9

LOG = logging.getLogger(__name__)

def epoch_secs(dt):
    """Datetime to seconds from epoch.
    """
    return time.mktime(dt.utctimetuple())

def is_header(line):
    """Is the line a valid message 2 header.
    """
    return re.match(r'^\x01CT[A-Z0-9][0-9]{2}[2].\x02\r\n$', line) 

def process_lines(in_lines, ref_dt):
    """Process lines from the serial port. Epoch timestamps are injected
    before the message header. All lines are stripped of white space before 
    being returned, except for a NL before the epoch timestamp.
    """
    out_lines = []
    num_hdrs = 0
    for line in in_lines:
        if is_header(line):
            secs = epoch_secs(ref_dt - timedelta(seconds=15) * num_hdrs)
            out_lines.append('%d\r\n' % secs)
            num_hdrs += 1
        out_lines.append(line)
    return num_hdrs, out_lines

def init_ceilo(portdev):
    """Initialize ceilometer by sending default configuration values to 
    instrument. When this completes the instrument should be in autosend mode
    and generating messages.
    """
    port = serial.Serial(port=portdev, 
                            baudrate=2400, 
                            bytesize=7, 
                            parity='E', 
                            stopbits=1,
                            timeout=1)
    init_commands = ("\r\r\r",
                        "OPEN\r\n",
                        "SET MESSAGE MODE AUTOSEND\r\n",
                        "SET MESSAGE PORT DATA\r\n",
                        "SET MESSAGE TYPE MSG2\r\n",
                        "SET OPER_MODE CONTINUOUS\r\n",
                        "CLOSE\r\n")
    for line in init_commands:
        LOG.log(9, "SEND: %s", line.strip())
        port.write(line)
        port.flush()
        lines = port.readlines()
        for l in lines:
            LOG.log(9, "RECV: %s", l.strip())
    
    port.close()
    
    return serial.Serial(port=portdev, 
                            baudrate=2400, 
                            bytesize=7, 
                            parity='E', 
                            stopbits=1, 
                            timeout=7.5)

def read_cfg(cfgfile):

    from ConfigParser import SafeConfigParser
    parser = SafeConfigParser()
    parser.read(cfgfile)

    return dict(parser.items('ct25k'))

def main():

    from argparse import ArgumentParser
    parser = ArgumentParser()

    levels = {'trace':9, 'debug':logging.DEBUG, 'info':logging.INFO, 
                'warn':logging.WARN, 'error':logging.ERROR}
    parser.add_argument('-v', dest="loglvl", choices=levels.keys(), 
        default='info')
    parser.add_argument('-o', dest='outdir', default='.')
    parser.add_argument('-f', dest='fmt', default='rig_ceilo-%Y-%m-%d.ascii',
        help="output filename (supports date formatting)")
    parser.add_argument('-p', dest='port', help="serial device")
    parser.add_argument('-c', dest='cfgfile', 
        help="INI style config. If provided all other options are ignored")

    args =  parser.parse_args()

    if args.cfgfile:
        from logging.config import fileConfig
        fileConfig(args.cfgfile)
        config = read_cfg(args.cfgfile) 
        portdev = config.get('port')
        filefmt = config.get('filefmt')
        outdir = config.get('outdir')
    else:
        outdir = args.outdir
        portdev = args.port
        filefmt = args.fmt
        loglvl = levels[args.loglvl]
        logging.basicConfig(level=loglvl)

    for name in ['portdev', 'filefmt', 'outdir']:
        if not locals().get(name):
            parser.print_usage()
            parser.exit(1)

    def datalog():
        now = datetime.now()
        if not datalog.fptr or now.date() > datalog.date:
            datalog.date = now.date()
            fn = datalog.date.strftime(filefmt)
            fpth = os.path.join(outdir, fn)
            if datalog.fptr:
                LOG.info("closing %s", datalog.fptr.name)
                datalog.fptr.close()
            datalog.fptr = open(fpth, 'a') 
            LOG.info("opened %s", datalog.fptr.name)
        return datalog.fptr
    datalog.fptr = None

    def handle_signal(*args, **kwargs):
        LOG.warn("received TERM or INT")
    signal.signal(signal.SIGTERM, handle_signal)
    signal.signal(signal.SIGINT, handle_signal)

    LOG.info("initializing ceilometer...")
    port = init_ceilo(portdev)

    LOG.info("starting ingest")

    while True:

        fptr = datalog()
        LOG.log(9, "got log %s", fptr.name)

        try:
            in_lines = port.readlines()
            LOG.debug("read %s lines", len(in_lines))
            
            num_hdrs, out_lines = process_lines(in_lines, datetime.now())
            LOG.debug("found %s potential messages", num_hdrs)
            LOG.log(9, ''.join(out_lines))
            
            LOG.debug("writing %s lines", len(out_lines))
            fptr.write(''.join(out_lines))
            fptr.flush()
        except Exception as err:
            if err.args and err.args[0] == 4:
                # interrupted syscall
                break
            raise

    try:
        port.close()
    except:
        pass

if __name__ == '__main__':

    main()