Skip to content
Snippets Groups Projects
data_api.py 10.63 KiB
import logging
from datetime import datetime, timedelta
from xml.dom.minidom import Document

import numpy as np
import pandas as pd
from flask import render_template, jsonify, Response
from flask_json import as_json_p

from metobsapi.util import data_responses
from metobsapi.util.error_handlers import ERROR_HANDLERS
from metobsapi.util.query_influx import build_queries, query

LOG = logging.getLogger(__name__)


ROUNDING = {
    'rel_hum': 0,
    'wind_direction': 0,
}


def round_value(value, symbol):
    return np.round(value, ROUNDING.get(symbol, 1))


def handle_date(date):
    try:
        date_len = len(date)
        if date_len == 10:
            return datetime.strptime(date, "%Y-%m-%d")
        else:
            return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S")
    except ValueError:
        LOG.warning("Malformed date string '%s'", date)
        raise ValueError("Malformed date string '%s'" % (date,))


def handle_time_string(date_string):
    if date_string[0] == "-":
        times = [float(x) for x in date_string[1:].split(':')]
        diff = timedelta(
            hours=times[0],
            minutes=times[1],
            seconds=times[2])
        return diff
    return handle_date(date_string)


def handle_symbols(symbols):
    ret = {}

    add_winds = set()
    for symbol in symbols:
        try:
            site, inst, s = symbol.split('.')
            si = (site, inst)
        except ValueError:
            raise ValueError("Symbols must have 3 period-separated parts: {}".format(symbol))

        trans = data_responses.SYMBOL_TRANSLATIONS.get((site, inst))
        if not trans:
            raise ValueError("Unknown site/instrument: {},{}".format(site, inst))
        influx_name = trans.get(s)
        if s == "wind_direction":
            add_winds.add((site, inst))
        elif influx_name is None:
            raise ValueError("Unknown symbol: {}".format(symbol))

        # NOTE: if wind speed/dir is specified InfluxDB should provide it with
        #       all fill values so we can fill it in later
        ret.setdefault(si, ([], []))
        ret[si][0].append(symbol)
        ret[si][1].append(influx_name or s)

    # Add the symbols needed to compute the wind_speed and wind_direction
    for si in add_winds:
        ret[si][0].extend((None, None))
        ret[si][1].extend(('wind_east', 'wind_north'))

    return ret


def handle_influxdb_result(result, symbols, interval):
    frames = []
    for idx, (si, (req_syms, influx_symbs)) in enumerate(symbols.items()):
        if isinstance(result, list):
            # multiple query select statements results in a list
            res = result[idx]
        else:
            # single query statement results in a single ResultSet
            res = result

        columns = ['time'] + influx_symbs
        if not res:
            frame = pd.DataFrame(columns=columns)
        else:
            data_points = res.get_points('metobs_' + interval, tags={'site': si[0], 'inst': si[1]})
            frame = pd.DataFrame(data_points, columns=['time'] + influx_symbs)
        frame.set_index('time', inplace=True)
        frame.fillna(value=np.nan, inplace=True)
        # remove wind components
        if influx_symbs[-1] == 'wind_north' and 'wind_direction' in frame.columns:
            frame['wind_direction'] = np.rad2deg(np.arctan2(frame['wind_east'], frame['wind_north']))
            frame['wind_direction'] = frame['wind_direction'].where(frame['wind_direction'] > 0, frame['wind_direction'] + 360.)
            frame = frame.iloc[:, :-2]
            frame.columns = req_syms[:-2]
        else:
            frame.columns = req_syms
        frame = frame.round({s: ROUNDING.get(s, 1) for s in frame.columns})
        frames.append(frame)
    frame = pd.concat(frames, axis=1, copy=False)
    return frame


def calc_num_records(begin, end, interval):
    now = datetime.utcnow()
    if begin is None:
        begin = now
    elif isinstance(begin, timedelta):
        begin = now - begin
    if end is None:
        end = now
    elif isinstance(end, timedelta):
        end = now - end

    diff = (end - begin).total_seconds()
    return diff / data_responses.INTERVALS[interval]


def handle_csv(frame, epoch, sep=',',
               message='', code=200, status='success', **kwargs):
    output = """# status: {status}
# code: {code:d}
# message: {message}
# num_results: {num_results:d}
# fields: {epoch_str}{sep}{symbol_list}
{symbol_data}
"""

    data_lines = []
    line_format = sep.join(["{time}", "{symbols}"])
    for t, row in frame.iterrows():
        line = line_format.format(
            time=t,
            symbols=sep.join(str(x) for x in row.values))
        data_lines.append(line)

    if not epoch:
        epoch_str = '%Y-%m-%dT%H:%M:%SZ'
    else:
        epoch_str = data_responses.epoch_translation[epoch] + ' since epoch (1970-01-01 00:00:00)'

    output = output.format(
        sep=sep,
        status=status,
        code=code,
        message=message,
        num_results=frame.shape[0],
        epoch_str=epoch_str,
        symbol_list=sep.join(frame.columns),
        symbol_data="\n".join(data_lines),
    )

    return Response(output, mimetype='text/csv'), code


@as_json_p(optional=True)
def handle_json(frame, epoch, order='columns',
                message='', code=200, status='success', **kwargs):
    # force conversion to float types so they can be json'd
    for column, data_type in zip(frame.columns, frame.dtypes.values):
        if issubclass(data_type.type, np.integer):
            frame[column] = frame[column].astype(float)
    # replace NaNs with None
    frame = frame.where(pd.notnull(frame), None)
    output = {}
    output['status'] = status
    output['message'] = message
    output['code'] = code
    output['num_results'] = frame.shape[0]

    package = {
        'timestamps': frame.index.values,
    }

    if epoch:
        newStamps = []
        for stamp in package['timestamps']:
            newStamps.append(float(stamp))
        package['timestamps'] = newStamps

    if order == 'column':
        package['data'] = dict(frame)
    else:
        package['symbols'] = frame.columns
        package['data'] = [frame.iloc[i].values for i in range(frame.shape[0])]
        # package['data'] = frame.values
    output['results'] = package
    return output, code


def handle_xml(frame, epoch, sep=',',
               message='', code=200, status='success', **kwargs):
    doc = Document()
    header = 'metobs'

    head = doc.createElement(header)
    head.setAttribute('status', status)
    head.setAttribute('code', str(code))
    head.setAttribute('message', message)
    head.setAttribute('num_results', str(frame.shape[0]))
    head.setAttribute('seperator', sep)

    doc.appendChild(head)
    columns_elem = doc.createElement('symbols')

    time_elem = doc.createElement('symbol')
    time_elem.setAttribute('name', 'time')
    time_elem.setAttribute('short_name', 'time')
    if not epoch:
        time_elem.setAttribute('format', '%Y-%m-%dT%H:%M:%SZ')
    else:
        time_elem.setAttribute('format', data_responses.epoch_translation[epoch] + ' since epoch (1970-01-01 00:00:00)')
    columns_elem.appendChild(time_elem)

    for c in frame.columns:
        col_elem = doc.createElement('symbol')
        col_elem.setAttribute('name', c)
        parts = c.split('.')
        col_elem.setAttribute('short_name', parts[2])
        col_elem.setAttribute('site', parts[0])
        col_elem.setAttribute('inst', parts[1])
        columns_elem.appendChild(col_elem)
    head.appendChild(columns_elem)

    data_elem = doc.createElement('data')
    for idx, (t, row) in enumerate(frame.iterrows()):
        row_elem = doc.createElement('row')
        row_elem.setAttribute('id', str(idx))
        row_elem.appendChild(doc.createTextNode(str(t)))
        for point in row:
            row_elem.appendChild(doc.createTextNode(sep))
            row_elem.appendChild(doc.createTextNode(str(point)))
        data_elem.appendChild(row_elem)
    head.appendChild(data_elem)

    # txt = doc.toprettyxml(indent="     ", encoding="utf-8")
    txt = doc.toxml(encoding="utf-8")
    return Response(txt, mimetype='text/xml'), code


def handle_error(fmt, error_str):
    handler = ERROR_HANDLERS[fmt]
    err_code, err_msg = data_responses.ERROR_MESSAGES.get(error_str, (400, error_str))
    res = handler(err_code, err_msg)
    if fmt == 'json':
        res = jsonify(**res)
    return res, err_code


RESPONSE_HANDLERS = {
    'csv': handle_csv,
    'xml': handle_xml,
    'json': handle_json,
}


def modify_data(fmt, begin, end, site, inst, symbols, interval,
                sep=',', order='columns', epoch=None):
    if fmt not in RESPONSE_HANDLERS:
        return render_template('400.html', format=fmt), 400

    try:
        # these will be either datetime or timedelta objects
        begin = handle_time_string(begin) if begin else None
        end = handle_time_string(end) if end else None
    except (TypeError, ValueError):
        return handle_error(fmt, 'malformed_timestamp')

    if order not in ('column', 'row'):
        return handle_error(fmt, 'bad_order')
    if epoch and epoch not in data_responses.epoch_translation:
        return handle_error(fmt, 'bad_epoch')
    if not symbols:
        return handle_error(fmt, 'missing_symbols')
    if not interval:
        interval = '1m'
    elif interval not in data_responses.INTERVALS:
        return handle_error(fmt, 'bad_interval')

    if site and inst:
        # shorthand for symbols that all use the same site and inst
        short_symbols = symbols.split(':')
        symbols = ["{}.{}.{}".format(site, inst, s) for s in short_symbols]
    elif not site and not inst:
        # each symbol is fully qualified with site.inst.symbol
        short_symbols = None
        symbols = symbols.split(':')
    else:
        return handle_error(fmt, 'missing_site_inst')

    try:
        influx_symbols = handle_symbols(symbols)
    except ValueError as e:
        return handle_error(fmt, str(e))

    if calc_num_records(begin, end, interval) > data_responses.RESPONSES_LIMIT:
        message = "Request will return too many values, please use files API"
        code = 413
        status = 'fail'
        result = None
    else:
        message = ""
        code = 200
        status = 'success'
        queries = build_queries(influx_symbols, begin, end, interval)
        result = query(queries, epoch)

    frame = handle_influxdb_result(result, influx_symbols, interval)
    # order the resulting symbols the way the user requested
    # assume time is the first column
    frame = frame[symbols]
    if site:
        frame.columns = short_symbols

    handler = RESPONSE_HANDLERS[fmt]
    return handler(frame, epoch,
                   sep=sep, order=order,
                   status=status, code=code, message=message)