Skip to content
Snippets Groups Projects
Unverified Commit 8df0e487 authored by David Hoese's avatar David Hoese
Browse files

Copy ceilometer code from old subversion repos

parent af5bc944
No related branches found
No related tags found
No related merge requests found
"""This module holds all directory/path/URL information for metobs.ceilo
packages. This file should be the only place that holds this static information.
This file should not do any file manipulation at all, only return strings or pass
other static information such as what plot number goes with what data.
This module will check for environment variables to create constants of PATHS
and URL bases. It also has functions for getting file locations for any file
for a specified day.
"""
import os
import re
from datetime import datetime, timedelta
from metobs.util import RODict, CONFIG as c
CEILO_INCOMING_DIR = os.environ.get( 'CEILO_INCOMING_DIR', '/beach/incoming/Instrument_Data/METOBS/RIG/Ceilo/raw')
CEILO_PRAW_DIR = os.environ.get( 'CEILO_PRAW_DIR', '/beach/raw/aoss/ceilo')
CEILO_CACHE_DIR = os.environ.get( 'CEILO_CACHE_DIR', '/beach/cache/aoss/ceilo')
CEILO_LATEST_DIR = os.environ.get( 'CEILO_LATEST_DIR', '/beach/cache/aoss/ceilo')
CEILO_DIR_FORMAT = os.environ.get( 'CEILO_DIR_FORMAT', '%Y/%m')
CEILO_ASCII_LOC = os.environ.get( 'CEILO_ASCII_LOC', '/beach/cache/aoss/ceilo')
CEILO_NC_LOC = os.environ.get( 'CEILO_NC_LOC', '/beach/cache/aoss/ceilo')
CEILO_IMG_LOC = os.environ.get( 'CEILO_IMG_LOC', 'http://metobs.ssec.wisc.edu/pub/cache/aoss/ceilo')
inst = 'ceilo'
RE_DIGITS = re.compile(r'\d+')
def get_incoming_dir():
"Return incoming directory for specified date"
return os.path.join( CEILO_INCOMING_DIR )
def get_praw_dir(when = None):
"Return raw directory for specified date and data_type"
when = when or datetime.now()
return os.path.join( CEILO_PRAW_DIR, when.strftime( CEILO_DIR_FORMAT ) )
def get_sraw_dir(when = None):
"Return raw directory for specified date and data_type"
raise NotImplementedError("This function is not used anymore, there should only be one primary storage location")
#when = when or datetime.now()
#return os.path.join( CEILO_SRAW_DIR, when.strftime( CEILO_DIR_FORMAT ) )
def get_cache_dir(data_type, when = None):
"Return cache directory for specified date and data_type"
when = when or datetime.now()
return os.path.join( CEILO_CACHE_DIR, data_type, when.strftime( CEILO_DIR_FORMAT ) )
def get_latest_dir():
"Return latest directory"
return os.path.join( CEILO_LATEST_DIR )
def get_ascii_filename(when = None, site="rig", description=""):
"Return the standard filename of the ascii file for the specified date"
when = when or datetime.now()
return c.get_filename(site, inst, when, ext='ascii', description=description)
def get_ascii_url(when = None, site="rig", description=""):
"Return the standard url of the ascii file for the specified date"
when = when or datetime.now()
return os.path.join(CEILO_ASCII_LOC, 'ascii', when.strftime(CEILO_DIR_FORMAT),
get_ascii_filename(when, site=site, description=description))
def get_nc_filename(when = None, site="rig", description=""):
"Return the standard filename of the netCDF file for the specified date"
when = when or datetime.now()
return c.get_filename(site, inst, when, ext='nc', description=description)
def get_nc_url(when = None, site="rig", description=""):
"Return the standard url of the netCDF file for the specified date"
when = when or datetime.now()
return os.path.join(CEILO_NC_LOC, 'nc', when.strftime(CEILO_DIR_FORMAT),
get_nc_filename(when, site=site, description=description))
def get_img_filename(begin, end, ptype=1, tag='', site="rig", description=""):
"Return the standard filename of the image that goes from begin to end"
pname = _handle_plot_type(ptype)
return c.get_filename(site, inst, begin, end=end, ext="png", plotname=pname, description=description, tag=tag)
def get_quicklook_filename(begin, end, ptype=1, site='rig', description=''):
return get_img_filename(begin, end, ptype, tag='', site=site, description=description)
def get_thumbnail_filename(begin, end, ptype=1, site='rig', description=''):
return get_img_filename(begin, end, ptype, tag='tn', site=site, description=description)
def get_img_url(begin, end, ptype=1, tag='', site="rig", description=""):
"Return the standard url of the image that goes from begin to end"
return os.path.join(CEILO_IMG_LOC, 'img', begin.strftime(CEILO_DIR_FORMAT),
get_img_filename(begin,end,ptype,tag=tag, site=site, description=description))
def get_quicklook_url(begin, end, ptype=1, site='rig', description=''):
return get_img_url(begin, end, ptype, tag='', site=site, description=description)
def get_thumbnail_url(begin, end, ptype=1, site='rig', description=''):
return get_img_url(begin, end, ptype, tag='tn', site=site, description=description)
def rename_incoming(incoming_file, site='rig', description=''):
file_date = datetime(*tuple( [ int(x) for x in RE_DIGITS.findall(incoming_file) ] ))
present_date = datetime.now()
praw = get_praw_dir(when = file_date)
cache = get_cache_dir('ascii', when = file_date)
rn = get_ascii_filename(when = file_date, site=site, description=description)
remove = file_date.date() < (present_date - timedelta(days=30)).date()
return praw,cache,rn,remove
def get_type_name():
return RODict({1:'Backscatter',
2:'Cloud Base Height',
3:'Vertical Visibility'})
def _handle_plot_type(plottype=1):
if plottype == 1:
return ''
elif plottype == 2:
return ''
elif plottype == 3:
return ''
else:
raise ValueError("Plot type must be between 1-3")
File added
__import__('pkg_resources').declare_namespace(__name__)
__docformat__ = 'Epytext'
import os
from metobs.ceilo import CONFIG as c
instrument_name = 'ceilo'
def get_ascii_name(dt, site='rig'):
"""
Make a standard filename for a tower ascii file.
@type dt: datetime
@param dt: datetime of the url to generate.
@type site: str
@param site: Name of an implemented instrument site
"""
return c.get_ascii_filename(dt)
def get_nc_name(dt, site='rig'):
"""
Make a standard filename for a ceilometer netcdf file.
@type dt: datetime
@param dt: datetime of the url to generate.
@type site: str
@param site: Name of an implemented instrument site
"""
return c.get_nc_filename(dt)
def get_thumbnail_name(begin, end=None, site='rig', plottype=1):
"""Make a standard filename for a ceilometer image thumbnail file.
"""
return c.get_image_filename(begin, end, ptype=plottype, tag='tn')
def get_nc_url(dt, site='rig', host=None):
"""
Get a URL to a ceilometer NetCDF file.
@type dt: datetime
@param dt: datetime of the url to generate.
@type site: str
@param site: Name of an implemented instrument site
@rtype: str
@return: A full URL suitable for OPeNDAP access
"""
return c.get_nc_url(dt)
def get_ascii_url(dt, site="rig", host=None):
"""Get a URL to a ceilometer ASCII file on the opendap server
@type dt: datetime
@param dt: datetime of the url to generate.
@type site: str
@param site: Name of an implemented instrument site
@rtype: str
@return: A full URL suitible for HTTP access
"""
return c.get_ascii_url(dt)
def get_thumbnail_url(begin, end=None, site="rig", host=None, plottype=1):
return c.get_image_url(begin, end, ptype=plottype, tag='tn')
def get_image_url(begin, end=None, site="rig", host=None, plottype=1):
return c.get_image_url(begin, end, ptype=plottype, tag='')
def get_type_name():
return c.get_type_name()
def _handle_plottype(plottype=1):
return c._handle_plot_type(plottype)
This diff is collapsed.
#!/usr/bin/env python
"""Ingestor for Viasala CT25K Ceilometer.
Reads messages from serial port injecting an epoch timestamp before the header
of each message. No validation of message data is done.
The output should match the legacy output written by the older Java software.
In-general, to configure ingest:
1. Install python >= 2.7
2. Install metobs.ceilo
3. Install rc_userboot.py
https://groups.ssec.wisc.edu/employee-info/for-programmers/programming-environment/rc-userboot
4. Create a config file, see example in SVN
5. Add ~/.bootrc to start ingest in screen detached session:
#!/usr/bin/env bash
screen -d -m $HOME/env/production/bin/ct25k_ingest $HOME/ceilo.cfg
6. Create cronjob script to push data:
#!/usr/bin/env bash
export TZ="UTC-00:01:00"
SRC="$HOME/data/rig_ceilo-$(date +%Y-%m-%d).ascii"
if [ -e $SRC ]; then
rsync -aux $SRC rsync://tahiti.ssec.wisc.edu/incoming/Instrument_Data/METOBS/RIG/Ceilo/raw
fi
7. Create cronjob for push
*/5 * * * * $HOME/data/rsync_data.sh &> /dev/null
"""
import logging
import time
import re
import os
import signal
from datetime import datetime, timedelta
import serial
# FIXME: bad-joo-joo
logging._levelNames[9] = 'TRACE'
logging._levelNames['TRACE'] = 9
LOG = logging.getLogger(__name__)
def epoch_secs(dt):
"""Datetime to seconds from epoch.
"""
return time.mktime(dt.utctimetuple())
def is_header(line):
"""Is the line a valid message 2 header.
"""
return re.match(r'^\x01CT[A-Z0-9][0-9]{2}[2].\x02\r\n$', line)
def process_lines(in_lines, ref_dt):
"""Process lines from the serial port. Epoch timestamps are injected
before the message header. All lines are stripped of white space before
being returned, except for a NL before the epoch timestamp.
"""
out_lines = []
num_hdrs = 0
for line in in_lines:
if is_header(line):
secs = epoch_secs(ref_dt - timedelta(seconds=15) * num_hdrs)
out_lines.append('%d\r\n' % secs)
num_hdrs += 1
out_lines.append(line)
return num_hdrs, out_lines
def init_ceilo(portdev):
"""Initialize ceilometer by sending default configuration values to
instrument. When this completes the instrument should be in autosend mode
and generating messages.
"""
port = serial.Serial(port=portdev,
baudrate=2400,
bytesize=7,
parity='E',
stopbits=1,
timeout=1)
init_commands = ("\r\r\r",
"OPEN\r\n",
"SET MESSAGE MODE AUTOSEND\r\n",
"SET MESSAGE PORT DATA\r\n",
"SET MESSAGE TYPE MSG2\r\n",
"SET OPER_MODE CONTINUOUS\r\n",
"CLOSE\r\n")
for line in init_commands:
LOG.log(9, "SEND: %s", line.strip())
port.write(line)
port.flush()
lines = port.readlines()
for l in lines:
LOG.log(9, "RECV: %s", l.strip())
port.close()
return serial.Serial(port=portdev,
baudrate=2400,
bytesize=7,
parity='E',
stopbits=1,
timeout=7.5)
def read_cfg(cfgfile):
from ConfigParser import SafeConfigParser
parser = SafeConfigParser()
parser.read(cfgfile)
return dict(parser.items('ct25k'))
def main():
from argparse import ArgumentParser
parser = ArgumentParser()
levels = {'trace':9, 'debug':logging.DEBUG, 'info':logging.INFO,
'warn':logging.WARN, 'error':logging.ERROR}
parser.add_argument('-v', dest="loglvl", choices=levels.keys(),
default='info')
parser.add_argument('-o', dest='outdir', default='.')
parser.add_argument('-f', dest='fmt', default='rig_ceilo-%Y-%m-%d.ascii',
help="output filename (supports date formatting)")
parser.add_argument('-p', dest='port', help="serial device")
parser.add_argument('-c', dest='cfgfile',
help="INI style config. If provided all other options are ignored")
args = parser.parse_args()
if args.cfgfile:
from logging.config import fileConfig
fileConfig(args.cfgfile)
config = read_cfg(args.cfgfile)
portdev = config.get('port')
filefmt = config.get('filefmt')
outdir = config.get('outdir')
else:
outdir = args.outdir
portdev = args.port
filefmt = args.fmt
loglvl = levels[args.loglvl]
logging.basicConfig(level=loglvl)
for name in ['portdev', 'filefmt', 'outdir']:
if not locals().get(name):
parser.print_usage()
parser.exit(1)
def datalog():
now = datetime.now()
if not datalog.fptr or now.date() > datalog.date:
datalog.date = now.date()
fn = datalog.date.strftime(filefmt)
fpth = os.path.join(outdir, fn)
if datalog.fptr:
LOG.info("closing %s", datalog.fptr.name)
datalog.fptr.close()
datalog.fptr = open(fpth, 'a')
LOG.info("opened %s", datalog.fptr.name)
return datalog.fptr
datalog.fptr = None
def handle_signal(*args, **kwargs):
LOG.warn("received TERM or INT")
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
LOG.info("initializing ceilometer...")
port = init_ceilo(portdev)
LOG.info("starting ingest")
while True:
fptr = datalog()
LOG.log(9, "got log %s", fptr.name)
try:
in_lines = port.readlines()
LOG.debug("read %s lines", len(in_lines))
num_hdrs, out_lines = process_lines(in_lines, datetime.now())
LOG.debug("found %s potential messages", num_hdrs)
LOG.log(9, ''.join(out_lines))
LOG.debug("writing %s lines", len(out_lines))
fptr.write(''.join(out_lines))
fptr.flush()
except Exception as err:
if err.args and err.args[0] == 4:
# interrupted syscall
break
raise
try:
port.close()
except:
pass
if __name__ == '__main__':
main()
"""
For processing Ceilometer CT25K Messages
"""
import logging
from calendar import timegm
from datetime import datetime
#from urllib2 import urlopen
from metobs.util.gen_open import open_ascii as urlopen
from numpy import array
LOG = logging.getLogger(__name__)
#: Value to use for NetCDF missing_value attribute
missing_value = -9999
#: Function to determine if a value is a missing_value or not
is_missing = lambda val: val == missing_value
#: number of allowable missing values before backscatter is considered invalid
BK_THRESH = 256 * .25
START_OF_HEADING = '\x01'
START_OF_TEXT = '\x02'
END_OF_TEXT = '\x03'
MANUAL_BLOWER_CONTROL = 2
HIGH_BACKGROUND_RADIANCE = 4
TILT_ANGLE_45_PLUS = 8
MANUAL_SETTINGS_EFF = 16
SINGLE_SEQ_ON = 32
WORKING_FROM_BATT = 64
POLLING_ON = 128
UNITS_IN_METERS = 256
INTERNAL_HEATER_ON = 512
BLOWER_HEATER_ON = 1024
BLOWER_ON = 2048
BLOWER_SUSPECT = 32768
RCV_OPTICAL_XTLK_POOR = 65536
REL_HUMIDITY_HIGH = 131072
VOLTAGE_ABNORMAL = 262144
INT_TEMP_HIGH = 524288
LASER_TEMP_HIGH = 1048576
LASER_PWR_LOW = 2097152
BATTERY_LOW = 4194304
WINDOW_CONTAMINATED = 8388608
VOLTAGE_FAILURE = 268435456
RECEIVER_FAILURE = 536870912
LASER_FAILURE = 1073741824
LASER_TEMP_SHUTOFF = 2147483648
WARNING_FLAGS = [BLOWER_SUSPECT,
RCV_OPTICAL_XTLK_POOR, REL_HUMIDITY_HIGH, VOLTAGE_ABNORMAL,
INT_TEMP_HIGH, LASER_TEMP_HIGH, LASER_PWR_LOW, BATTERY_LOW,
WINDOW_CONTAMINATED ]
WARNING_MSGS = [
"Blower suspect", "Receiver optical cross-talk poor",
"Relative humidity is greater than 85%", "Voltage is low or high",
"Internal temperature is high or low", "Laser temperature is high",
"Laser power is low", "Battery is low", "Window is contaminated" ]
INFO_FLAGS = [
MANUAL_BLOWER_CONTROL, HIGH_BACKGROUND_RADIANCE,
TILT_ANGLE_45_PLUS, MANUAL_SETTINGS_EFF, SINGLE_SEQ_ON,
WORKING_FROM_BATT, POLLING_ON, UNITS_IN_METERS, INTERNAL_HEATER_ON,
BLOWER_HEATER_ON, BLOWER_ON ]
INFO_MSGS = [
"Using manual blower control", "Background radiance is high",
"Tilt angle is greater than 45 degrees",
"Manual settings are effective", "Single sequence mode is ON",
"Working from the battery", "Polling mode is ON",
"Units are in Meters", "Internal heater is ON",
"Blower heater is ON", "Blower is ON" ]
ALARM_FLAGS = [VOLTAGE_FAILURE,
RECEIVER_FAILURE, LASER_FAILURE, LASER_TEMP_SHUTOFF]
ALARM_MSGS = [
"VOLTAGE FAILURE", "RECEIVER FAILURE", "LASER FAILURE",
"LASER TEMPERATURE SHUT_OFF"]
class MessageError(StandardError):
"""General message error."""
class Message2(object):
NUM_LINES = 20
def __init__(self, lines, stamp):
"""Initialize this message, disecting the various message components
according to the CT25K Message 2 format.
The first line in a message is the header and it must start with a
SOH and end with SOT, and the entire message must end with EOT
ASCII chars to be valid. Assertion errors are raised if these
conditions are not met.
The backscatter is initially filled with L{missing_value} so if a
line is the incorrect length all of it's values will be missing_value.
Likewise, if a value cannot be parsed as a HEX string it's value will
be missing_value.
@param lines: Exactly the 20 lines that comprise a valid message
@type lines: list
@param stamp: The message time. If the 'stamp' is a naieve datetime the
tzinfo will be set to metobs.util.time.UTC, otherwise time operations
will proceed with provided tzinfo.
@type stamp: datetime
@raises MessageError: If this instance cannot be created due to an error
parsing.
"""
assert len(lines) == self.NUM_LINES, \
"A Message2 must contain %s lines" % self.NUM_LINES
self._epoch = timegm(stamp.utctimetuple())
self._lines = lines
self._stamp = stamp
self._header = lines[0]
# strip non-printables
self._header = self._header.replace(START_OF_HEADING, '')
self._header = self._header.replace(START_OF_TEXT, '')
self._msg_num = int(self.header[5])
if self._msg_num != 2:
raise MessageError("Invalid message number", self.header)
self._status_string = lines[1][21:29]
self._status_flag = lines[1][1]
self._detection_status = _int(lines[1][0])
self._first_cbh = missing_value
self._second_cbh = missing_value
self._third_cbh = missing_value
if self.detection_status == 3:
self._third_cbh = _int(lines[1][15:20])
if self.detection_status == 2:
self._second_cbh = _int(lines[1][9:14])
if self.detection_status == 1:
self._first_cbh = _int(lines[1][3:8])
if self.detection_status == 4:
self._vertical_visibility = _int(lines[1][3:8])
self._alt_highest_signal = _int(lines[1][9:14])
else:
self._vertical_visibility = missing_value
self._alt_highest_signal = missing_value
meas_params = lines[2].split()
if len(meas_params) < 10:
LOG.warn("Invalid measurement parameters for message with time %s", self.epoch)
self._scale = _int(meas_params[0])
self._measurement_mode = _str(meas_params[1])
self._laser_pulse_energy = _float(meas_params[2])
self._laser_temperature = _float(meas_params[3])
self._receiver_sensitivity = _float(meas_params[4])
self._window_contamination = _float(meas_params[5])
self._tilt_angle = _float(meas_params[6])
self._background_light = _float(meas_params[7])
self._measurement_parameters = _str(meas_params[8])
self._sum_backscatter = _float(meas_params[9])
self._backscatter = parse_backscatter(lines[3:])
# backscatter can contain no more than BK_THRESH missing values
missing = [b for b in self._backscatter if is_missing(b)]
if len(missing) >= BK_THRESH:
raise MessageError("Backscatter errors exceeds threshold")
@property
def epoch(self):
return self._epoch
@property
def lines(self):
return self._lines
@property
def stamp(self):
return self._stamp
@property
def header(self):
return self._header
@property
def msg_num(self):
return self._msg_num
@property
def status_string(self):
return self._status_string
@property
def status_flag(self):
return self._status_flag
@property
def detection_status(self):
return self._detection_status
@property
def cbh(self):
return (self._first_cbh, self._second_cbh, self._third_cbh)
@property
def first_cbh(self):
return self._first_cbh
@property
def second_cbh(self):
return self._second_cbh
@property
def third_cbh(self):
return self._third_cbh
@property
def vertical_visibility(self):
return self._vertical_visibility
@property
def alt_highest_signal(self):
return self._alt_highest_signal
@property
def scale(self):
return self._scale
@property
def measurement_mode(self):
return self._measurement_mode
@property
def laser_pulse_energy(self):
return self._laser_pulse_energy
@property
def laser_temperature(self):
return self._laser_temperature
@property
def receiver_sensitivity(self):
return self._receiver_sensitivity
@property
def window_contamination(self):
return self._window_contamination
@property
def tilt_angle(self):
return self._tilt_angle
@property
def background_light(self):
return self._background_light
@property
def measurement_parameters(self):
return self._measurement_parameters
@property
def sum_backscatter(self):
return self._sum_backscatter
@property
def backscatter(self):
return self._backscatter
def warning(self):
"""
@return: true if there is a warning indicated in the detection status string.
"""
return str(self.detection_status) in 'wW'
def alarm(self):
"""
@return: true if there is a alarm indicated in the detection status string.
"""
return str(self.detection_status) in 'aA'
def check_status(self, flag):
"""
Check the status of a particular flag.
@param flag: One of the flag constants
@type flag: number
@return: 0 if the flag is not set, otherwise the value of the flag.
"""
return long(self.status_string, 16) & flag
def get_status_messages(self):
"""
Get all status messages indicated in the device status string.
@return: 'alarms', 'warnings', and 'informational' messages.
"""
messages = {'alarms':[], 'warnings':[], 'informational':[]}
for idx in range(len(WARNING_FLAGS)):
if long(self.status_string, 16) & WARNING_FLAGS[idx]:
messages['warnings'].append(WARNING_MSGS[idx])
for idx in range(len(ALARM_FLAGS)):
if long(self.status_string, 16) & ALARM_FLAGS[idx]:
messages['alarms'].append(ALARM_MSGS[idx])
for idx in range(len(INFO_FLAGS)):
if long(self.status_string, 16) & INFO_FLAGS[idx]:
messages['informational'].append(INFO_MSGS[idx])
return messages
def __cmp__(self, that):
"""
Comparisons are done on message timestamps.
"""
if self.stamp > that.stamp:
return 1
if self.stamp == that.stamp:
return 0
if self.stamp < that.stamp:
return -1
def __str__(self):
return '\n'.join([str(self.epoch)] + self.lines)
class Message7(Message2):
"""Same as message 2 with additional sky condition line.
"""
NUM_LINES = 21
def __init__(self, lines, stamp):
super(Message7, self).__init__(lines, stamp)
self._sky_cond = parse_sky_condition(self.lines[-1])
@property
def sky_condition(self):
return self._sky_cond
def hex_to_int(hval):
# convert from 2's complement
val = int(hval, 16)
if val > 32767:
val = -65536 + val
return val
def _get(fcn, v, d):
try:
return fcn(v)
except:
pass
return d
def _float(v):
return _get(float, v, missing_value)
def _int(v):
return _get(int, v, missing_value)
def _str(v):
return _get(str, v, '')
def parse_sky_condition(line, feet=False):
parts = line.strip().split()
sky_cond = []
for idx in range(0, 8, 2):
amt = int(parts[idx])
if amt > 0 and amt < 9:
hght = int(parts[idx+1])
if feet: # feet
hght *= 100
else: # meters
hght *= 10
sky_cond.append((amt, hght))
elif amt == 0: # no clouds
sky_cond.append((0, missing_value))
elif amt == 9: # vertical visibility
# FIXME: ignoring this because we already have it
# from line 2, right?
pass
else: # amt in (-1, 99): # no data
sky_cond.append((missing_value, missing_value))
return sky_cond
def parse_backscatter(data):
if isinstance(data, (str, unicode)):
lines = data.split('\n')
else:
lines = data
if len(lines) < 16:
raise MessageError("Expected 16 lines for backscatter")
backscatter = [missing_value] * 256
bidx = 0
for line in lines[:16]:
if len(line) < 67: # 3 chars for height, 64 for data
bidx += 16
continue
for idx in range(3, 67, 4):
hval = line[idx:idx+4]
try:
backscatter[bidx] = hex_to_int(hval)
except:
pass
bidx += 1
return backscatter
def get_message_times(messages):
times = array([missing_value]*len(messages), 'l')
for i in range(len(messages)):
times[i] = timegm(messages[i].stamp.utctimetuple())
return times
def get_message_met_data(messages):
"""Compile message data into a single data dictionary.
@param messages: messages
@type messages: list
@return: Ceilometer data where keys are variable names from the netcdf file.
"""
data = dict(
vertical_visibility = array([missing_value]*len(messages)),
alt_highest_signal = array([missing_value]*len(messages)),
first_cbh = array([missing_value]*len(messages)),
second_cbh = array([missing_value]*len(messages)),
third_cbh = array([missing_value]*len(messages)),
backscatter = array([[missing_value]*256 for idx in range(len(messages))])
)
for i in range(len(messages)):
data['alt_highest_signal'][i] = messages[i].highest_signal
data['vertical_visibility'][i] = messages[i].vertical_vis
data['first_cbh'][i] = messages[i].bases[0]
data['second_cbh'][i] = messages[i].bases[1]
data['third_cbh'][i] = messages[i].bases[2]
data['backscatter'][i][:] = array(messages[i].backscatter)
return data
def get_message_hk_data(messages):
"""
"""
return {}
def load_messages(url, on_error=None):
"""Load messages from a message data file. All lines in the data file
are skipped until a proper start of message is encounterd.
@param url: URL to a message data file as generated by the Ceilometer
ingest software
@type url: str
@returns: Sequence of Messages sorted from oldest to newest.
"""
fp = urlopen(url)
line = fp.readline()
# readline returned '', so there are no lines
if not line: return tuple()
# message must start with epoch time, so disregard partial
# messages until we reach one
while line and not line.strip().isdigit():
line = fp.readline()
# start of first complete message
messages = []
done = not line # if we have a line, we're not done
while not done:
lines = [line] + [fp.readline() for i in range(Message2.NUM_LINES)]
if '' in lines:
done = True
continue
stamp = datetime.fromtimestamp(int(lines[0]))
try:
messages.append(Message2([l.strip() for l in lines[1:]], stamp))
except Exception, e:
if hasattr(on_error, '__call__'):
on_error(lines, e)
# disregard possible garbage between messages
line = fp.readline()
while line and not line.strip().isdigit():
line = fp.readline()
messages.sort()
return tuple(messages)
"""
Library for creating and manipulating Viasala CT25K Ceilometer NetCDF files.
"""
__author__ = 'Bruce Flynn, SSEC'
__version__ = '$Revision: 1.15 $'
#$Source: /cvsroot/TOOLS/dev/metobs/python/instruments/ceilometer/metobs/ceilo/nc.py,v $
__docformat__ = 'Epytext'
from os import path, makedirs
import logging
from time import gmtime
from calendar import timegm
from datetime import datetime, timedelta
from urllib2 import HTTPError#, urlopen
import pkg_resources
from numpy import array, zeros, int32, int8, uint16
from pycdf import NC
from metobs import mytime
from metobs.util.gen_open import open_ascii as urlopen
from metobs.util.nc import create_nc_from_ncml
from metobs.ceilo.CONFIG import get_ascii_url, get_nc_filename, get_cache_dir as _get_destination_dir
from message import Message2, load_messages
#: valid RIG ceilometer variables
_ceilo_variables = [
'laser_pulse_energy',
'third_cbh',
'base_time',
'background_light',
'tilt_angle',
'second_cbh',
'alt',
'receiver_sensitivity',
'measurement_parameters',
'detection_status',
'lon',
'laser_temperature',
'qc_laser_pulse_energy',
'qc_alt_highest_signal',
'sum_backscatter',
'first_cbh',
'alt_highest_signal',
'backscatter',
'qc_time',
'vertical_visibility',
'window_contamination',
'qc_first_cbh',
'lat',
'qc_laser_temperature',
'status_flag',
'qc_second_cbh',
'status_string',
'range',
'time',
'qc_third_cbh',
'qc_tilt_angle',
'time_offset',
'qc_vertical_visibility']
_log = logging.getLogger(__name__)
#: geo location of 1225 W. Dayton St, Madison WI 53706 (according to google maps).
ssec_loc = (43.08543, -89.271632)
def _get_value(var, value):
"""Get a variables fill value if present, otherwise use message.missing_value.
"""
if value != value and hasattr(var, '_FillValue'):
return var._FillValue
return value
def _get_message_times(nc, messages):
times = zeros(len(messages), 'l')
for i in range(len(messages)):
times[i] = timegm(messages[i].stamp.utctimetuple())
return times
def _get_message_met_data(nc, messages):
"""Compile message data into a singal data dictionary.
@param nc:
@param messages: List of Messages
@return: Dictionary of ceilometer data where keys are variable names from the
netcdf file.
"""
data = dict(
vertical_visibility = zeros(len(messages)),
alt_highest_signal = zeros(len(messages)),
first_cbh = zeros(len(messages)),
second_cbh = zeros(len(messages)),
third_cbh = zeros(len(messages)),
backscatter = array([zeros(256) for idx in range(len(messages))])
)
for i in range(len(messages)):
data['alt_highest_signal'][i] = \
_get_value(nc.var('alt_highest_signal'), messages[i].alt_highest_signal)
data['vertical_visibility'][i] = \
_get_value(nc.var('vertical_visibility'), messages[i].vertical_visibility)
data['first_cbh'][i] = \
_get_value(nc.var('first_cbh'), messages[i].first_cbh)
data['second_cbh'][i] = \
_get_value(nc.var('second_cbh'), messages[i].second_cbh)
data['third_cbh'][i] = \
_get_value(nc.var('third_cbh'), messages[i].third_cbh)
data['backscatter'][i][:] = array(messages[i].backscatter)
return data
def _get_message_hk_data(nc, messages):
"""
"""
"""Compile message data into a singal data dictionary.
@param nc:
@param messages: List of Messages
@return: Dictionary of ceilometer data where keys are variable names from the
netcdf file.
"""
var_names = (
'laser_pulse_energy',
'laser_temperature',
'receiver_sensitivity',
'window_contamination',
'tilt_angle',
'background_light',
'sum_backscatter',
'tilt_angle'
)
# FIXME there is a bug with dealing with strings where pycdf complains
# that it cannot cast to the required type.
str_names = [
# 'status_string',
# 'measurement_parameters'
]
data = {}
str_len = nc.dim('string_len').inq()[1]
data['detection_status'] = array([m.detection_status for m in messages], int32)
data['range'] = array(range(15, 256*30, 30)) # center of range buckets. Buckets are 30m
# so start at 15 and count 256 buckets.
for var_name in str_names:
data[var_name] = array(zeros((len(messages), str_len)))
for var_name in var_names:
data[var_name] = zeros(len(messages))
for i in range(len(messages)):
for var_name in var_names:
data[var_name][i] = \
_get_value(nc.var(var_name), getattr(messages[i], var_name))
for var_name in str_names:
s = getattr(messages[i], var_name)
data[var_name][i][:] = map(ord, list(s.ljust(str_len)))
return data
def create(filename, lat, lon):
"""Create a new NetCDF file.
@param filename: Name of the file
@param lat: Location of the instrument
@param lon:
@return: Initialized file with all variables added and sync'd to disk. File
will be in automode.
"""
#
# TODO: eliminate the use of deprecated missing_value
#
ncml = pkg_resources.resource_string(__name__, "ceilo.ncml")
nc = create_nc_from_ncml(filename, ncml)
#
# Add the scalars
#
nc.var('lat').put(lat)
nc.var('lon').put(lon)
return nc
def make_ceilo_files(begin, basedir=None, end=None, loc=ssec_loc, site="rig", description=""):
"""
Make NetCDF files. All times are ignored and only the date
information is used. One file for each date is written to the
current directory. If basedir is None then a nc file will be created in
the destination directory specified by the _get_destination_dir function
with a '.' at the beginning. This module will not rename that file.
@type begin: datetime
@param begin: beginning datetime object
@type basedir: str
@param basedir: where files will be created.
@type end: datetime
@param end: datetime object
@type loc: tuple
@param loc: lat/lon of instrument at the time the data was ingested
"""
if not end: end = datetime.now(begin.tzinfo)
assert end > begin, "Ending date must be greater than beginning date"
days = (end - begin).days + 1
created_files = []
_log.debug("Making files for %d days between %s and %s", days, begin, end)
for i in range(days):
dt = end - timedelta(days=i)
fn = get_nc_filename(dt, site=site, description=description)
if not basedir:
pth = _get_destination_dir('nc', dt)
fn = '.' + fn
else:
pth = path.join(basedir, str(dt.year), '%02d' % dt.month)
if not path.exists(pth):
makedirs(pth)
fpth = path.join(pth, fn)
_log.debug('Checking that file exists')
url = get_ascii_url(dt, site=site, description=description)
try:
urlopen(url)
except StandardError, e:
_log.warn('%s: %s', e, url)
continue
_log.debug('Creating %s', fpth)
cdf = create(fpth, *loc)
_log.debug('Filling %s', fpth)
fill_from_msg_files(cdf, url)
cdf.sync()
created_files.append(fpth)
return created_files
def fill_from_msg_files(nc, url):
"""File a NetCDF file from the message files given.
It is assumed that the times from the data file are in UTC.
@param nc: pycdf.CDF returned from L{create} to fill.
@param url: URL to a ceilometer message data file where each message
is proceded by an epoch time in UTC.
"""
messages = load_messages(url)
if not messages:
return
base = mytime.datetime_to_epoch(messages[0].stamp)
var = nc.var('base_time')
var.units = 'seconds since 1970-01-01 00:00:00 0:00'
var.put(base)
# messages were sorted by stamp in parse_messages
times = _get_message_times(nc, messages)
met_data = _get_message_met_data(nc, messages)
hk_data = _get_message_hk_data(nc, messages)
# use middle stamp to avoid point from previous day at the beginning (if present)
midnight = mytime.datetime_to_epoch(mytime.day_begin(messages[len(messages)/2].stamp))
offsets = times - midnight # use scalar array subtraction
if offsets.dtype != int32:
offsets = array(offsets, int32)
var = nc.var('time')
var[:len(offsets)] = offsets
var.units = messages[len(messages)/2].stamp.strftime('seconds since %Y-%m-%d 00:00:00 0:00')
# time offsets from base_time
base_secs = mytime.datetime_to_epoch(mytime.day_begin(messages[0].stamp))
offsets = times - base_secs
var = nc.var('time_offset')
var.units = messages[0].stamp.strftime('seconds since %Y-%m-%d %H:%M:%S 0:00')
if offsets.dtype != int32:
offsets = array(offsets, int32)
var[:len(offsets)] = offsets
for val_dict in [met_data, hk_data]:
for var_name in val_dict.keys():
arr = val_dict[var_name]
var = nc.var(var_name)
if len(arr.shape) == 1:
var[:arr.shape[0]] = arr
else:
var[:arr.shape[0],:arr.shape[1]] = arr
nc.sync()
#!/usr/bin/env python
# encoding: utf-8
"""
tidy.py
Routines to handle ceilometer files on tahiti on a regular basis
No URL or opendap paths should be used, this only does local file manipulation
However, calls to external product creation may not do remote file calls,
such as getting data files from an OpenDap url.
Environment variables:
CEILO_INCOMING_DIR
CEILO_RAW_DIR
CEILO_CACHE_DIR
CEILO_LATEST_DIR
CEILO_DIR_FORMAT=%Y/%m/%d
Created by davidh on 2009-10-08.
Copyright (c) 2009 University of Wisconsin SSEC. All rights reserved.
"""
import os, sys, logging, shutil
from subprocess import check_call as sh
from datetime import datetime, timedelta
from metobs.ceilo import quicklook, nc
from metobs.ceilo import CONFIG as c
from metobs.util import raw_manager
rel = raw_manager.relativedir
import re
RE_DIGITS = re.compile(r'\d+')
LOG = logging.getLogger(__name__)
site = "rig"
description=""
CEILO_INCOMING_DIR = c.CEILO_INCOMING_DIR
CEILO_PRAW_DIR = c.CEILO_PRAW_DIR
CEILO_CACHE_DIR = c.CEILO_CACHE_DIR
CEILO_LATEST_DIR = c.CEILO_LATEST_DIR
CEILO_DIR_FORMAT = c.CEILO_DIR_FORMAT
def create_quicklook(when=None):
"""run quicklook.py with the proper arguments to create various quicklooks."""
when = when or datetime.now()
created_files = quicklook.make_plot(dt=when, site=site, description=description)
if len(created_files) != 4:
LOG.error("Expected four quicklooks to be created, latest files were not linked, %d filenames were returned" % len(created_files))
raise ValueError("Latest quicklooks could not be linked because there were not the correct amount of images")
ptypes = [ c._handle_plot_type(t) for t in range(1, 2) ]
for file,ptype,tag in zip(created_files[1::2], ptypes+ptypes, ["","tn"]):
link_latest_quicklooks(c.get_latest_dir(), file, ptype, tag=tag)
def link_latest_quicklooks(latest_dir, filename, ptype, tag=""):
_,ext = os.path.splitext(filename)
tag = tag and "_" + tag or ""
ptype = ptype and "_" + ptype or ""
linkname = "latest_quicklook%s%s%s" % (ptype,tag,ext)
tmpname = "." + linkname
tmppath = os.path.join( latest_dir, tmpname )
linkpath = os.path.join( latest_dir, linkname )
if os.path.islink(tmppath): os.unlink(tmppath)
LOG.debug('symlink %s -> %s' % (linkpath, filename))
os.symlink( rel(latest_dir, filename), tmppath )
os.rename( tmppath, linkpath )
def create_nc(when=None):
"""run external nc_gen module with proper arguments to create a new netCDF
file.
"""
when = when or datetime.now()
# Run for past 3 days
begin = when - timedelta(days=2)
# Create temp nc files in cache location
temp_created_files = nc.make_ceilo_files(begin=begin, end=when, basedir=None, site=site, description=description)
# Check that some files were created
if not temp_created_files:
LOG.warning("No new nc files were created")
return
# Rename temp files to original files
created_files = [ os.path.join(d,f[1:]) for d,f in [ os.path.split(x) for x in temp_created_files ] ]
for t,f in zip(temp_created_files, created_files):
if os.path.exists(f): os.remove(f)
os.rename(t,f)
# Link latest files
link_latest_file(c.get_latest_dir(), created_files[0], data_type="nc")
def file_dirs(when = None, data_type='ascii'):
""" yield camera-name, incoming-dir, raw_dir, cache_dir, latest_dir for a given datetime object (default now)
"""
when = when or datetime.now()
if not(os.path.isabs(CEILO_INCOMING_DIR) or os.path.isabs(CEILO_PRAW_DIR) or \
os.path.isabs(CEILO_CACHE_DIR) or os.path.isabs(CEILO_LATEST_DIR)):
LOG.warning("Directories should be absolute paths")
return ( c.get_incoming_dir(),
c.get_praw_dir(when),
c.get_cache_dir(data_type, when),
c.get_latest_dir()
)
def _keyed_filename(f):
return tuple(int(x) for x in RE_DIGITS.findall(f)), f
def pending_incoming_files(incoming_dir):
"return list of ascii files remaining in a given incoming directory, sorted latest-first"
files = [ _keyed_filename(f)
for f in os.listdir(incoming_dir)
if "ascii" in f
]
return [ filename for _,filename in reversed(sorted(files)) ]
def link_latest_file(latest_dir, filename, data_type='ascii'):
"""link latest data_type files to latest_ascii.ascii, overwriting existing links
filename must be a absolute or relative path, not just the name of a file.
"""
_,ext = os.path.splitext(filename)
linkname = 'latest_%s%s' % (data_type, ext)
tmpname = '.' + linkname
tmppath = os.path.join( latest_dir, tmpname )
linkpath = os.path.join( latest_dir, linkname )
if os.path.islink(tmppath): os.unlink(tmppath)
LOG.debug('symlink %s -> %s' % (linkpath, filename))
os.symlink( rel(latest_dir, filename), tmppath )
os.rename( tmppath, linkpath )
def link_latest_directory( source, latest_dir ):
tmpname = os.path.join(latest_dir, ".latest_dir")
linkname = os.path.join(latest_dir, "latest_dir")
if os.path.islink(tmpname): os.unlink(tmpname)
LOG.debug('softlinking latest directory %s -> %s' % (linkname, source ))
os.symlink( rel(latest_dir, source), tmpname)
os.rename(tmpname, linkname)
def unload_incoming(incoming_dir, praw_dir, cache_dir, latest_dir):
"""handle new images in incoming:
check for/create raw, cache, and latest directories
copy originals to raw directory
move originals to cache directory
create multiple scaled images
update latest/ directory link
update latest_w_h.jpg image links
"""
if not os.path.exists(incoming_dir):
LOG.warning("No incoming directory at: '%s'" % (incoming_dir))
raise IOError
if not os.path.exists(praw_dir):
LOG.debug("Creating raw directory: '%s'" % praw_dir)
os.makedirs(praw_dir)
if not os.path.exists(cache_dir):
LOG.debug("Creating cache directory: '%s'" % cache_dir)
os.makedirs(cache_dir)
if not os.path.exists(latest_dir):
LOG.debug("Creating latest directory: '%s'" % latest_dir)
os.makedirs(latest_dir)
new_files = pending_incoming_files(incoming_dir)
if not new_files:
LOG.warning("no files found in %s" % incoming_dir)
return
raw_changes = [ c.rename_incoming(fn, site=site, description=description) for fn in new_files ]
(praw_dirs, cache_dirs, renames, removes) = [ list(x) for x in zip(*raw_changes) ]
raw_manager.daily_manage_raw(incoming_dir, praw_dirs, None, cache_dirs, new_files, renamed=renames, remove=removes)
link_latest_file( latest_dir, os.path.join(cache_dir, renames[0]), data_type='ascii' )
link_latest_directory( cache_dir, latest_dir )
def unload_incoming_all(when=None):
nfo = file_dirs(when)
try:
unload_incoming(*nfo)
except IOError:
pass
def main():
import optparse
usage = """
%prog [options]
run "%prog help" to list commands
"""
parser = optparse.OptionParser(usage)
parser.add_option('-t', '--test', dest="self_test",
action="store_true", default=False, help="run unit tests")
parser.add_option('-q', '--quiet', dest="quiet",
action="store_true", default=False, help="only error output")
parser.add_option('-v', '--verbose', dest="verbose",
action="store_true", default=False, help="enable more informational output")
parser.add_option('-w', '--debug', dest="debug",
action="store_true", default=False, help="enable debug output")
parser.add_option('--date', dest="date", default=None,
help="specify date to use in specified command, MM/DD/YYYY")
parser.add_option('--site', dest='site', default="rig",
help="used in product filename, see metobs.util for details")
parser.add_option('--description', dest='description', default='',
help="used in product filename, see metobs.util for details")
options, args = parser.parse_args()
if options.self_test:
import doctest
doctest.testmod()
sys.exit(2)
lvl = logging.WARNING
if options.debug: lvl = logging.DEBUG
elif options.verbose: lvl = logging.INFO
elif options.quiet: lvl = logging.ERROR
logging.basicConfig(level = lvl)
date = options.date
globals()['site'] = options.site
globals()['description'] = options.description
if date:
date = datetime.strptime(date, "%m/%d/%Y")
commands = {}
prior = None
prior = dict(locals())
def incoming(*args):
"""handle incoming images and directories
Does standard image size conversions, moves data to raw, copies data to cache, sets up latest links.
"""
unload_incoming_all(when=date)
def quicklook(*args):
"""Create set of quicklooks for date specified or today.
"""
create_quicklook(when=date)
def nc(*args):
"""Create multiple nc files for (date - 2 days) to date.
"""
create_nc(date)
def help(command=None):
"""print command help or list of commands
e.g. help help
"""
if command is None:
# print first line of docstring
for cmd in commands:
ds = commands[cmd].__doc__.split('\n')[0]
print "%-16s %s" % (cmd,ds)
else:
print commands[command].__doc__
def test():
"run tests"
#test1()
raise NotImplementedError("No Test")
commands.update(dict(x for x in locals().items() if x[0] not in prior))
if (not args) or (args[0] not in commands):
parser.print_help()
help()
return 9
else:
locals()[args[0]](*args[1:])
return 0
if __name__=='__main__':
sys.exit(main())
......@@ -4,3 +4,6 @@ AOSS Ceilometer
Bash and python scripts for managing the AOSS Ceilometer's data ingest
and higher level product generation.
NOTE: Most of the code in this repository was copied from the old subversion
repository. This code was executed on python 2.6/2.7 environments, but will
be run from python 3.6 from now on. Some scripts may not work out of the box.
setup.py 0 → 100644
from setuptools import setup, find_packages
setup(
name='AossCeilo',
version='0.1',
description='UW AOSS Ceilometer',
url='http://metobs.ssec.wisc.edu',
install_requires=[
'numpy',
'pyserial',
'metobscommon',
],
dependency_links=['http://larch.ssec.wisc.edu/cgi-bin/repos.cgi'],
packages=find_packages(exclude=['aossceilo.tests']),
include_package_data=True,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment