Something went wrong on our end
message.py 14.64 KiB
"""
For processing Ceilometer CT25K Messages
"""
import logging
from calendar import timegm
from datetime import datetime
#from urllib2 import urlopen
from metobs.util.gen_open import open_ascii as urlopen
from numpy import array
LOG = logging.getLogger(__name__)
#: Value to use for NetCDF missing_value attribute
missing_value = -9999
#: Function to determine if a value is a missing_value or not
is_missing = lambda val: val == missing_value
#: number of allowable missing values before backscatter is considered invalid
BK_THRESH = 256 * .25
START_OF_HEADING = '\x01'
START_OF_TEXT = '\x02'
END_OF_TEXT = '\x03'
MANUAL_BLOWER_CONTROL = 2
HIGH_BACKGROUND_RADIANCE = 4
TILT_ANGLE_45_PLUS = 8
MANUAL_SETTINGS_EFF = 16
SINGLE_SEQ_ON = 32
WORKING_FROM_BATT = 64
POLLING_ON = 128
UNITS_IN_METERS = 256
INTERNAL_HEATER_ON = 512
BLOWER_HEATER_ON = 1024
BLOWER_ON = 2048
BLOWER_SUSPECT = 32768
RCV_OPTICAL_XTLK_POOR = 65536
REL_HUMIDITY_HIGH = 131072
VOLTAGE_ABNORMAL = 262144
INT_TEMP_HIGH = 524288
LASER_TEMP_HIGH = 1048576
LASER_PWR_LOW = 2097152
BATTERY_LOW = 4194304
WINDOW_CONTAMINATED = 8388608
VOLTAGE_FAILURE = 268435456
RECEIVER_FAILURE = 536870912
LASER_FAILURE = 1073741824
LASER_TEMP_SHUTOFF = 2147483648
WARNING_FLAGS = [BLOWER_SUSPECT,
RCV_OPTICAL_XTLK_POOR, REL_HUMIDITY_HIGH, VOLTAGE_ABNORMAL,
INT_TEMP_HIGH, LASER_TEMP_HIGH, LASER_PWR_LOW, BATTERY_LOW,
WINDOW_CONTAMINATED ]
WARNING_MSGS = [
"Blower suspect", "Receiver optical cross-talk poor",
"Relative humidity is greater than 85%", "Voltage is low or high",
"Internal temperature is high or low", "Laser temperature is high",
"Laser power is low", "Battery is low", "Window is contaminated" ]
INFO_FLAGS = [
MANUAL_BLOWER_CONTROL, HIGH_BACKGROUND_RADIANCE,
TILT_ANGLE_45_PLUS, MANUAL_SETTINGS_EFF, SINGLE_SEQ_ON,
WORKING_FROM_BATT, POLLING_ON, UNITS_IN_METERS, INTERNAL_HEATER_ON,
BLOWER_HEATER_ON, BLOWER_ON ]
INFO_MSGS = [
"Using manual blower control", "Background radiance is high",
"Tilt angle is greater than 45 degrees",
"Manual settings are effective", "Single sequence mode is ON",
"Working from the battery", "Polling mode is ON",
"Units are in Meters", "Internal heater is ON",
"Blower heater is ON", "Blower is ON" ]
ALARM_FLAGS = [VOLTAGE_FAILURE,
RECEIVER_FAILURE, LASER_FAILURE, LASER_TEMP_SHUTOFF]
ALARM_MSGS = [
"VOLTAGE FAILURE", "RECEIVER FAILURE", "LASER FAILURE",
"LASER TEMPERATURE SHUT_OFF"]
class MessageError(StandardError):
"""General message error."""
class Message2(object):
NUM_LINES = 20
def __init__(self, lines, stamp):
"""Initialize this message, disecting the various message components
according to the CT25K Message 2 format.
The first line in a message is the header and it must start with a
SOH and end with SOT, and the entire message must end with EOT
ASCII chars to be valid. Assertion errors are raised if these
conditions are not met.
The backscatter is initially filled with L{missing_value} so if a
line is the incorrect length all of it's values will be missing_value.
Likewise, if a value cannot be parsed as a HEX string it's value will
be missing_value.
@param lines: Exactly the 20 lines that comprise a valid message
@type lines: list
@param stamp: The message time. If the 'stamp' is a naieve datetime the
tzinfo will be set to metobs.util.time.UTC, otherwise time operations
will proceed with provided tzinfo.
@type stamp: datetime
@raises MessageError: If this instance cannot be created due to an error
parsing.
"""
assert len(lines) == self.NUM_LINES, \
"A Message2 must contain %s lines" % self.NUM_LINES
self._epoch = timegm(stamp.utctimetuple())
self._lines = lines
self._stamp = stamp
self._header = lines[0]
# strip non-printables
self._header = self._header.replace(START_OF_HEADING, '')
self._header = self._header.replace(START_OF_TEXT, '')
self._msg_num = int(self.header[5])
if self._msg_num != 2:
raise MessageError("Invalid message number", self.header)
self._status_string = lines[1][21:29]
self._status_flag = lines[1][1]
self._detection_status = _int(lines[1][0])
self._first_cbh = missing_value
self._second_cbh = missing_value
self._third_cbh = missing_value
if self.detection_status == 3:
self._third_cbh = _int(lines[1][15:20])
if self.detection_status == 2:
self._second_cbh = _int(lines[1][9:14])
if self.detection_status == 1:
self._first_cbh = _int(lines[1][3:8])
if self.detection_status == 4:
self._vertical_visibility = _int(lines[1][3:8])
self._alt_highest_signal = _int(lines[1][9:14])
else:
self._vertical_visibility = missing_value
self._alt_highest_signal = missing_value
meas_params = lines[2].split()
if len(meas_params) < 10:
LOG.warn("Invalid measurement parameters for message with time %s", self.epoch)
self._scale = _int(meas_params[0])
self._measurement_mode = _str(meas_params[1])
self._laser_pulse_energy = _float(meas_params[2])
self._laser_temperature = _float(meas_params[3])
self._receiver_sensitivity = _float(meas_params[4])
self._window_contamination = _float(meas_params[5])
self._tilt_angle = _float(meas_params[6])
self._background_light = _float(meas_params[7])
self._measurement_parameters = _str(meas_params[8])
self._sum_backscatter = _float(meas_params[9])
self._backscatter = parse_backscatter(lines[3:])
# backscatter can contain no more than BK_THRESH missing values
missing = [b for b in self._backscatter if is_missing(b)]
if len(missing) >= BK_THRESH:
raise MessageError("Backscatter errors exceeds threshold")
@property
def epoch(self):
return self._epoch
@property
def lines(self):
return self._lines
@property
def stamp(self):
return self._stamp
@property
def header(self):
return self._header
@property
def msg_num(self):
return self._msg_num
@property
def status_string(self):
return self._status_string
@property
def status_flag(self):
return self._status_flag
@property
def detection_status(self):
return self._detection_status
@property
def cbh(self):
return (self._first_cbh, self._second_cbh, self._third_cbh)
@property
def first_cbh(self):
return self._first_cbh
@property
def second_cbh(self):
return self._second_cbh
@property
def third_cbh(self):
return self._third_cbh
@property
def vertical_visibility(self):
return self._vertical_visibility
@property
def alt_highest_signal(self):
return self._alt_highest_signal
@property
def scale(self):
return self._scale
@property
def measurement_mode(self):
return self._measurement_mode
@property
def laser_pulse_energy(self):
return self._laser_pulse_energy
@property
def laser_temperature(self):
return self._laser_temperature
@property
def receiver_sensitivity(self):
return self._receiver_sensitivity
@property
def window_contamination(self):
return self._window_contamination
@property
def tilt_angle(self):
return self._tilt_angle
@property
def background_light(self):
return self._background_light
@property
def measurement_parameters(self):
return self._measurement_parameters
@property
def sum_backscatter(self):
return self._sum_backscatter
@property
def backscatter(self):
return self._backscatter
def warning(self):
"""
@return: true if there is a warning indicated in the detection status string.
"""
return str(self.detection_status) in 'wW'
def alarm(self):
"""
@return: true if there is a alarm indicated in the detection status string.
"""
return str(self.detection_status) in 'aA'
def check_status(self, flag):
"""
Check the status of a particular flag.
@param flag: One of the flag constants
@type flag: number
@return: 0 if the flag is not set, otherwise the value of the flag.
"""
return long(self.status_string, 16) & flag
def get_status_messages(self):
"""
Get all status messages indicated in the device status string.
@return: 'alarms', 'warnings', and 'informational' messages.
"""
messages = {'alarms':[], 'warnings':[], 'informational':[]}
for idx in range(len(WARNING_FLAGS)):
if long(self.status_string, 16) & WARNING_FLAGS[idx]:
messages['warnings'].append(WARNING_MSGS[idx])
for idx in range(len(ALARM_FLAGS)):
if long(self.status_string, 16) & ALARM_FLAGS[idx]:
messages['alarms'].append(ALARM_MSGS[idx])
for idx in range(len(INFO_FLAGS)):
if long(self.status_string, 16) & INFO_FLAGS[idx]:
messages['informational'].append(INFO_MSGS[idx])
return messages
def __cmp__(self, that):
"""
Comparisons are done on message timestamps.
"""
if self.stamp > that.stamp:
return 1
if self.stamp == that.stamp:
return 0
if self.stamp < that.stamp:
return -1
def __str__(self):
return '\n'.join([str(self.epoch)] + self.lines)
class Message7(Message2):
"""Same as message 2 with additional sky condition line.
"""
NUM_LINES = 21
def __init__(self, lines, stamp):
super(Message7, self).__init__(lines, stamp)
self._sky_cond = parse_sky_condition(self.lines[-1])
@property
def sky_condition(self):
return self._sky_cond
def hex_to_int(hval):
# convert from 2's complement
val = int(hval, 16)
if val > 32767:
val = -65536 + val
return val
def _get(fcn, v, d):
try:
return fcn(v)
except:
pass
return d
def _float(v):
return _get(float, v, missing_value)
def _int(v):
return _get(int, v, missing_value)
def _str(v):
return _get(str, v, '')
def parse_sky_condition(line, feet=False):
parts = line.strip().split()
sky_cond = []
for idx in range(0, 8, 2):
amt = int(parts[idx])
if amt > 0 and amt < 9:
hght = int(parts[idx+1])
if feet: # feet
hght *= 100
else: # meters
hght *= 10
sky_cond.append((amt, hght))
elif amt == 0: # no clouds
sky_cond.append((0, missing_value))
elif amt == 9: # vertical visibility
# FIXME: ignoring this because we already have it
# from line 2, right?
pass
else: # amt in (-1, 99): # no data
sky_cond.append((missing_value, missing_value))
return sky_cond
def parse_backscatter(data):
if isinstance(data, (str, unicode)):
lines = data.split('\n')
else:
lines = data
if len(lines) < 16:
raise MessageError("Expected 16 lines for backscatter")
backscatter = [missing_value] * 256
bidx = 0
for line in lines[:16]:
if len(line) < 67: # 3 chars for height, 64 for data
bidx += 16
continue
for idx in range(3, 67, 4):
hval = line[idx:idx+4]
try:
backscatter[bidx] = hex_to_int(hval)
except:
pass
bidx += 1
return backscatter
def get_message_times(messages):
times = array([missing_value]*len(messages), 'l')
for i in range(len(messages)):
times[i] = timegm(messages[i].stamp.utctimetuple())
return times
def get_message_met_data(messages):
"""Compile message data into a single data dictionary.
@param messages: messages
@type messages: list
@return: Ceilometer data where keys are variable names from the netcdf file.
"""
data = dict(
vertical_visibility = array([missing_value]*len(messages)),
alt_highest_signal = array([missing_value]*len(messages)),
first_cbh = array([missing_value]*len(messages)),
second_cbh = array([missing_value]*len(messages)),
third_cbh = array([missing_value]*len(messages)),
backscatter = array([[missing_value]*256 for idx in range(len(messages))])
)
for i in range(len(messages)):
data['alt_highest_signal'][i] = messages[i].highest_signal
data['vertical_visibility'][i] = messages[i].vertical_vis
data['first_cbh'][i] = messages[i].bases[0]
data['second_cbh'][i] = messages[i].bases[1]
data['third_cbh'][i] = messages[i].bases[2]
data['backscatter'][i][:] = array(messages[i].backscatter)
return data
def get_message_hk_data(messages):
"""
"""
return {}
def load_messages(url, on_error=None):
"""Load messages from a message data file. All lines in the data file
are skipped until a proper start of message is encounterd.
@param url: URL to a message data file as generated by the Ceilometer
ingest software
@type url: str
@returns: Sequence of Messages sorted from oldest to newest.
"""
fp = urlopen(url)
line = fp.readline()
# readline returned '', so there are no lines
if not line: return tuple()
# message must start with epoch time, so disregard partial
# messages until we reach one
while line and not line.strip().isdigit():
line = fp.readline()
# start of first complete message
messages = []
done = not line # if we have a line, we're not done
while not done:
lines = [line] + [fp.readline() for i in range(Message2.NUM_LINES)]
if '' in lines:
done = True
continue
stamp = datetime.fromtimestamp(int(lines[0]))
try:
messages.append(Message2([l.strip() for l in lines[1:]], stamp))
except Exception, e:
if hasattr(on_error, '__call__'):
on_error(lines, e)
# disregard possible garbage between messages
line = fp.readline()
while line and not line.strip().isdigit():
line = fp.readline()
messages.sort()
return tuple(messages)