Skip to content
Snippets Groups Projects
message.py 14.64 KiB
"""
For processing Ceilometer CT25K Messages
"""
import logging
from calendar import timegm
from datetime import datetime
#from urllib2 import urlopen
from metobs.util.gen_open import open_ascii as urlopen

from numpy import array

LOG = logging.getLogger(__name__)

#: Value to use for NetCDF missing_value attribute
missing_value = -9999 
#: Function to determine if a value is a missing_value or not
is_missing = lambda val: val == missing_value 

#: number of allowable missing values before backscatter is considered invalid
BK_THRESH = 256 * .25

START_OF_HEADING = '\x01'
START_OF_TEXT = '\x02'
END_OF_TEXT = '\x03'

MANUAL_BLOWER_CONTROL = 2
HIGH_BACKGROUND_RADIANCE = 4
TILT_ANGLE_45_PLUS = 8
MANUAL_SETTINGS_EFF = 16
SINGLE_SEQ_ON = 32
WORKING_FROM_BATT = 64
POLLING_ON = 128
UNITS_IN_METERS = 256
INTERNAL_HEATER_ON = 512
BLOWER_HEATER_ON = 1024
BLOWER_ON = 2048

BLOWER_SUSPECT = 32768
RCV_OPTICAL_XTLK_POOR = 65536
REL_HUMIDITY_HIGH = 131072
VOLTAGE_ABNORMAL = 262144
INT_TEMP_HIGH = 524288
LASER_TEMP_HIGH = 1048576
LASER_PWR_LOW = 2097152
BATTERY_LOW = 4194304
WINDOW_CONTAMINATED = 8388608

VOLTAGE_FAILURE = 268435456
RECEIVER_FAILURE = 536870912
LASER_FAILURE = 1073741824
LASER_TEMP_SHUTOFF = 2147483648

WARNING_FLAGS = [BLOWER_SUSPECT,
        RCV_OPTICAL_XTLK_POOR, REL_HUMIDITY_HIGH, VOLTAGE_ABNORMAL,
        INT_TEMP_HIGH, LASER_TEMP_HIGH, LASER_PWR_LOW, BATTERY_LOW,
        WINDOW_CONTAMINATED ]

WARNING_MSGS = [
        "Blower suspect", "Receiver optical cross-talk poor",
        "Relative humidity is greater than 85%", "Voltage is low or high",
        "Internal temperature is high or low", "Laser temperature is high",
        "Laser power is low", "Battery is low", "Window is contaminated" ]

INFO_FLAGS = [
        MANUAL_BLOWER_CONTROL, HIGH_BACKGROUND_RADIANCE,
        TILT_ANGLE_45_PLUS, MANUAL_SETTINGS_EFF, SINGLE_SEQ_ON,
        WORKING_FROM_BATT, POLLING_ON, UNITS_IN_METERS, INTERNAL_HEATER_ON,
        BLOWER_HEATER_ON, BLOWER_ON ]

INFO_MSGS = [
        "Using manual blower control", "Background radiance is high",
        "Tilt angle is greater than 45 degrees",
        "Manual settings are effective", "Single sequence mode is ON",
        "Working from the battery", "Polling mode is ON",
        "Units are in Meters", "Internal heater is ON",
        "Blower heater is ON", "Blower is ON" ]

ALARM_FLAGS = [VOLTAGE_FAILURE,
        RECEIVER_FAILURE, LASER_FAILURE, LASER_TEMP_SHUTOFF]

ALARM_MSGS = [
        "VOLTAGE FAILURE", "RECEIVER FAILURE", "LASER FAILURE",
        "LASER TEMPERATURE SHUT_OFF"]

class MessageError(StandardError):
    """General message error."""


class Message2(object):
    NUM_LINES = 20
    def __init__(self, lines, stamp):
        """Initialize this message, disecting the various message components 
        according to the CT25K Message 2 format.

        The first line in a message is the header and it must start with a
        SOH and end with SOT, and the entire message must end with EOT 
        ASCII chars to be valid. Assertion errors are raised if these 
        conditions are not met.

        The backscatter is initially filled with L{missing_value} so if a
        line is the incorrect length all of it's values will be missing_value.
        Likewise, if a value cannot be parsed as a HEX string it's value will
        be missing_value.

        @param lines: Exactly the 20 lines that comprise a valid message
        @type lines: list
        @param stamp: The message time. If the 'stamp' is a naieve datetime the 
            tzinfo will be set to metobs.util.time.UTC, otherwise time operations
            will proceed with provided tzinfo.
        @type stamp: datetime

        @raises MessageError: If this instance cannot be created due to an error
            parsing.
        """
        assert len(lines) == self.NUM_LINES, \
            "A Message2 must contain %s lines" % self.NUM_LINES
        
        self._epoch = timegm(stamp.utctimetuple())

        self._lines = lines
        self._stamp = stamp
        
        self._header = lines[0]
        # strip non-printables
        self._header = self._header.replace(START_OF_HEADING, '')
        self._header = self._header.replace(START_OF_TEXT, '')

        self._msg_num = int(self.header[5])
        if self._msg_num != 2: 
            raise MessageError("Invalid message number", self.header)
        
        self._status_string = lines[1][21:29]
        self._status_flag = lines[1][1]
    
        self._detection_status = _int(lines[1][0])
        self._first_cbh = missing_value
        self._second_cbh = missing_value
        self._third_cbh = missing_value
        if self.detection_status == 3:
            self._third_cbh = _int(lines[1][15:20])
        if self.detection_status == 2:
            self._second_cbh = _int(lines[1][9:14])
        if self.detection_status == 1:
            self._first_cbh = _int(lines[1][3:8])
        
        if self.detection_status == 4:
            self._vertical_visibility = _int(lines[1][3:8])
            self._alt_highest_signal = _int(lines[1][9:14])
        else:
            self._vertical_visibility = missing_value
            self._alt_highest_signal = missing_value

        meas_params = lines[2].split()
        if len(meas_params) < 10:
            LOG.warn("Invalid measurement parameters for message with time %s", self.epoch)
        self._scale = _int(meas_params[0])
        self._measurement_mode = _str(meas_params[1])
        self._laser_pulse_energy = _float(meas_params[2])
        self._laser_temperature = _float(meas_params[3])
        self._receiver_sensitivity = _float(meas_params[4])
        self._window_contamination = _float(meas_params[5])
        self._tilt_angle = _float(meas_params[6])
        self._background_light = _float(meas_params[7])
        self._measurement_parameters = _str(meas_params[8])
        self._sum_backscatter = _float(meas_params[9])

        self._backscatter = parse_backscatter(lines[3:])

        # backscatter can contain no more than BK_THRESH missing values
        missing = [b for b in self._backscatter if is_missing(b)]
        if len(missing) >= BK_THRESH:
            raise MessageError("Backscatter errors exceeds threshold")

    @property
    def epoch(self):
        return self._epoch
    @property
    def lines(self):
        return self._lines
    @property
    def stamp(self):
        return self._stamp
    @property
    def header(self):
        return self._header
    @property
    def msg_num(self):
        return self._msg_num
    @property
    def status_string(self):
        return self._status_string
    @property
    def status_flag(self):
        return self._status_flag
    @property
    def detection_status(self):
        return self._detection_status
    @property
    def cbh(self):
        return (self._first_cbh, self._second_cbh, self._third_cbh)
    @property
    def first_cbh(self):
        return self._first_cbh
    @property
    def second_cbh(self):
        return self._second_cbh
    @property
    def third_cbh(self):
        return self._third_cbh
    @property
    def vertical_visibility(self):
        return self._vertical_visibility
    @property
    def alt_highest_signal(self):
        return self._alt_highest_signal
    @property
    def scale(self):
        return self._scale
    @property
    def measurement_mode(self):
        return self._measurement_mode
    @property
    def laser_pulse_energy(self):
        return self._laser_pulse_energy
    @property
    def laser_temperature(self):
        return self._laser_temperature
    @property
    def receiver_sensitivity(self):
        return self._receiver_sensitivity
    @property
    def window_contamination(self):
        return self._window_contamination
    @property
    def tilt_angle(self):
        return self._tilt_angle
    @property
    def background_light(self):
        return self._background_light
    @property
    def measurement_parameters(self):
        return self._measurement_parameters
    @property
    def sum_backscatter(self):
        return self._sum_backscatter
    @property
    def backscatter(self):
        return self._backscatter
             
    def warning(self):
        """
        @return: true if there is a warning indicated in the detection status string.
        """
        return str(self.detection_status) in 'wW'
        
    def alarm(self):
        """
        @return: true if there is a alarm indicated in the detection status string.
        """
        return str(self.detection_status) in 'aA'
    
    def check_status(self, flag):
        """
        Check the status of a particular flag.

        @param flag: One of the flag constants
        @type flag: number

        @return: 0 if the flag is not set, otherwise the value of the flag.
        """
        return long(self.status_string, 16) & flag
    
    def get_status_messages(self):
        """
        Get all status messages indicated in the device status string.

        @return: 'alarms', 'warnings', and 'informational' messages.
        """
        messages = {'alarms':[], 'warnings':[], 'informational':[]}
        for idx in range(len(WARNING_FLAGS)):
            if long(self.status_string, 16) & WARNING_FLAGS[idx]:
                messages['warnings'].append(WARNING_MSGS[idx])
        for idx in range(len(ALARM_FLAGS)):
            if long(self.status_string, 16) & ALARM_FLAGS[idx]:
                messages['alarms'].append(ALARM_MSGS[idx])
        for idx in range(len(INFO_FLAGS)):
            if long(self.status_string, 16) & INFO_FLAGS[idx]:
                messages['informational'].append(INFO_MSGS[idx])
        return messages
    
    def __cmp__(self, that):
        """
        Comparisons are done on message timestamps.
        """
        if self.stamp > that.stamp:
            return 1
        if self.stamp == that.stamp:
            return 0
        if self.stamp < that.stamp:
            return -1
    
    def __str__(self):
        return '\n'.join([str(self.epoch)] + self.lines)

class Message7(Message2):
    """Same as message 2 with additional sky condition line.
    """

    NUM_LINES = 21

    def __init__(self, lines, stamp):
        super(Message7, self).__init__(lines, stamp)
        self._sky_cond = parse_sky_condition(self.lines[-1])
    
    @property
    def sky_condition(self):
        return self._sky_cond


def hex_to_int(hval):
    # convert from 2's complement
    val = int(hval, 16)
    if val > 32767:
        val = -65536 + val
    return val

def _get(fcn, v, d):
    try:
        return fcn(v)
    except:
        pass
    return d
def _float(v):
    return _get(float, v, missing_value)
def _int(v):
    return _get(int, v, missing_value)
def _str(v):
    return _get(str, v, '')

def parse_sky_condition(line, feet=False):

    parts = line.strip().split()
    
    sky_cond = []
    for idx in range(0, 8, 2): 
        amt = int(parts[idx])
        if amt > 0 and amt < 9:
            hght = int(parts[idx+1])
            if feet: # feet
                hght *= 100
            else: # meters
                hght *= 10 
            sky_cond.append((amt, hght))
        elif amt == 0: # no clouds
            sky_cond.append((0, missing_value))
        elif amt == 9: # vertical visibility
            # FIXME: ignoring this because we already have it 
            # from line 2, right?
            pass
        else: # amt in (-1, 99): # no data
            sky_cond.append((missing_value, missing_value)) 

    return sky_cond

def parse_backscatter(data):

    if isinstance(data, (str, unicode)):
        lines = data.split('\n')
    else:
        lines = data

    if len(lines) < 16:
        raise MessageError("Expected 16 lines for backscatter")

    backscatter = [missing_value] * 256
    bidx = 0
    for line in lines[:16]:
        if len(line) < 67: # 3 chars for height, 64 for data
            bidx += 16
            continue
        for idx in range(3, 67, 4):
            hval = line[idx:idx+4]
            try:
                backscatter[bidx] = hex_to_int(hval)
            except:
                pass
            bidx += 1 

    return backscatter

def get_message_times(messages):
    times = array([missing_value]*len(messages), 'l')
    for i in range(len(messages)):
        times[i] = timegm(messages[i].stamp.utctimetuple())
    return times

def get_message_met_data(messages):
    """Compile message data into a single data dictionary.
    
    @param messages: messages
    @type messages: list

    @return: Ceilometer data where keys are variable names from the netcdf file.
    """
    data = dict(
        vertical_visibility = array([missing_value]*len(messages)),
        alt_highest_signal = array([missing_value]*len(messages)),
        first_cbh = array([missing_value]*len(messages)),
        second_cbh = array([missing_value]*len(messages)),
        third_cbh = array([missing_value]*len(messages)),
        backscatter = array([[missing_value]*256 for idx in range(len(messages))])
    )
    for i in range(len(messages)):
        data['alt_highest_signal'][i] = messages[i].highest_signal
        data['vertical_visibility'][i] = messages[i].vertical_vis
        data['first_cbh'][i] = messages[i].bases[0]
        data['second_cbh'][i] = messages[i].bases[1]
        data['third_cbh'][i] = messages[i].bases[2]
        data['backscatter'][i][:] = array(messages[i].backscatter)

    return data
    
def get_message_hk_data(messages):
    """
    """
    return {}

def load_messages(url, on_error=None):
    """Load messages from a message data file.  All lines in the data file
    are skipped until a proper start of message is encounterd.
    
    @param url: URL to a message data file as generated by the Ceilometer
       ingest software
    @type url: str

    @returns: Sequence of Messages sorted from oldest to newest.
    """
    fp = urlopen(url)
    line = fp.readline()

    # readline returned '', so there are no lines
    if not line: return tuple()

    # message must start with epoch time, so disregard partial
    # messages until we reach one
    while line and not line.strip().isdigit():
        line = fp.readline()
    
    # start of first complete message
    messages = []
    done = not line # if we have a line, we're not done
    while not done:

        lines = [line] + [fp.readline() for i in range(Message2.NUM_LINES)]
        if '' in lines:
            done = True
            continue
        
        stamp = datetime.fromtimestamp(int(lines[0]))
        try:
            messages.append(Message2([l.strip() for l in lines[1:]], stamp))
        except Exception, e:
            if hasattr(on_error, '__call__'):
                on_error(lines, e)
        # disregard possible garbage between messages
        line = fp.readline()
        while line and not line.strip().isdigit(): 
            line = fp.readline()
            
    messages.sort()
    
    return tuple(messages)