-
David Hoese authoredDavid Hoese authored
data_api.py 10.63 KiB
import logging
from datetime import datetime, timedelta
from xml.dom.minidom import Document
import numpy as np
import pandas as pd
from flask import render_template, jsonify, Response
from flask_json import as_json_p
from metobsapi.util import data_responses
from metobsapi.util.error_handlers import ERROR_HANDLERS
from metobsapi.util.query_influx import build_queries, query
LOG = logging.getLogger(__name__)
ROUNDING = {
'rel_hum': 0,
'wind_direction': 0,
}
def round_value(value, symbol):
return np.round(value, ROUNDING.get(symbol, 1))
def handle_date(date):
try:
date_len = len(date)
if date_len == 10:
return datetime.strptime(date, "%Y-%m-%d")
else:
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S")
except ValueError:
LOG.warning("Malformed date string '%s'", date)
raise ValueError("Malformed date string '%s'" % (date,))
def handle_time_string(date_string):
if date_string[0] == "-":
times = [float(x) for x in date_string[1:].split(':')]
diff = timedelta(
hours=times[0],
minutes=times[1],
seconds=times[2])
return diff
return handle_date(date_string)
def handle_symbols(symbols):
ret = {}
add_winds = set()
for symbol in symbols:
try:
site, inst, s = symbol.split('.')
si = (site, inst)
except ValueError:
raise ValueError("Symbols must have 3 period-separated parts: {}".format(symbol))
trans = data_responses.SYMBOL_TRANSLATIONS.get((site, inst))
if not trans:
raise ValueError("Unknown site/instrument: {},{}".format(site, inst))
influx_name = trans.get(s)
if s == "wind_direction":
add_winds.add((site, inst))
elif influx_name is None:
raise ValueError("Unknown symbol: {}".format(symbol))
# NOTE: if wind speed/dir is specified InfluxDB should provide it with
# all fill values so we can fill it in later
ret.setdefault(si, ([], []))
ret[si][0].append(symbol)
ret[si][1].append(influx_name or s)
# Add the symbols needed to compute the wind_speed and wind_direction
for si in add_winds:
ret[si][0].extend((None, None))
ret[si][1].extend(('wind_east', 'wind_north'))
return ret
def handle_influxdb_result(result, symbols, interval):
frames = []
for idx, (si, (req_syms, influx_symbs)) in enumerate(symbols.items()):
if isinstance(result, list):
# multiple query select statements results in a list
res = result[idx]
else:
# single query statement results in a single ResultSet
res = result
columns = ['time'] + influx_symbs
if not res:
frame = pd.DataFrame(columns=columns)
else:
data_points = res.get_points('metobs_' + interval, tags={'site': si[0], 'inst': si[1]})
frame = pd.DataFrame(data_points, columns=['time'] + influx_symbs)
frame.set_index('time', inplace=True)
frame.fillna(value=np.nan, inplace=True)
# remove wind components
if influx_symbs[-1] == 'wind_north' and 'wind_direction' in frame.columns:
frame['wind_direction'] = np.rad2deg(np.arctan2(frame['wind_east'], frame['wind_north']))
frame['wind_direction'] = frame['wind_direction'].where(frame['wind_direction'] > 0, frame['wind_direction'] + 360.)
frame = frame.iloc[:, :-2]
frame.columns = req_syms[:-2]
else:
frame.columns = req_syms
frame = frame.round({s: ROUNDING.get(s, 1) for s in frame.columns})
frames.append(frame)
frame = pd.concat(frames, axis=1, copy=False)
return frame
def calc_num_records(begin, end, interval):
now = datetime.utcnow()
if begin is None:
begin = now
elif isinstance(begin, timedelta):
begin = now - begin
if end is None:
end = now
elif isinstance(end, timedelta):
end = now - end
diff = (end - begin).total_seconds()
return diff / data_responses.INTERVALS[interval]
def handle_csv(frame, epoch, sep=',',
message='', code=200, status='success', **kwargs):
output = """# status: {status}
# code: {code:d}
# message: {message}
# num_results: {num_results:d}
# fields: {epoch_str}{sep}{symbol_list}
{symbol_data}
"""
data_lines = []
line_format = sep.join(["{time}", "{symbols}"])
for t, row in frame.iterrows():
line = line_format.format(
time=t,
symbols=sep.join(str(x) for x in row.values))
data_lines.append(line)
if not epoch:
epoch_str = '%Y-%m-%dT%H:%M:%SZ'
else:
epoch_str = data_responses.epoch_translation[epoch] + ' since epoch (1970-01-01 00:00:00)'
output = output.format(
sep=sep,
status=status,
code=code,
message=message,
num_results=frame.shape[0],
epoch_str=epoch_str,
symbol_list=sep.join(frame.columns),
symbol_data="\n".join(data_lines),
)
return Response(output, mimetype='text/csv'), code
@as_json_p(optional=True)
def handle_json(frame, epoch, order='columns',
message='', code=200, status='success', **kwargs):
# force conversion to float types so they can be json'd
for column, data_type in zip(frame.columns, frame.dtypes.values):
if issubclass(data_type.type, np.integer):
frame[column] = frame[column].astype(float)
# replace NaNs with None
frame = frame.where(pd.notnull(frame), None)
output = {}
output['status'] = status
output['message'] = message
output['code'] = code
output['num_results'] = frame.shape[0]
package = {
'timestamps': frame.index.values,
}
if epoch:
newStamps = []
for stamp in package['timestamps']:
newStamps.append(float(stamp))
package['timestamps'] = newStamps
if order == 'column':
package['data'] = dict(frame)
else:
package['symbols'] = frame.columns
package['data'] = [frame.iloc[i].values for i in range(frame.shape[0])]
# package['data'] = frame.values
output['results'] = package
return output, code
def handle_xml(frame, epoch, sep=',',
message='', code=200, status='success', **kwargs):
doc = Document()
header = 'metobs'
head = doc.createElement(header)
head.setAttribute('status', status)
head.setAttribute('code', str(code))
head.setAttribute('message', message)
head.setAttribute('num_results', str(frame.shape[0]))
head.setAttribute('seperator', sep)
doc.appendChild(head)
columns_elem = doc.createElement('symbols')
time_elem = doc.createElement('symbol')
time_elem.setAttribute('name', 'time')
time_elem.setAttribute('short_name', 'time')
if not epoch:
time_elem.setAttribute('format', '%Y-%m-%dT%H:%M:%SZ')
else:
time_elem.setAttribute('format', data_responses.epoch_translation[epoch] + ' since epoch (1970-01-01 00:00:00)')
columns_elem.appendChild(time_elem)
for c in frame.columns:
col_elem = doc.createElement('symbol')
col_elem.setAttribute('name', c)
parts = c.split('.')
col_elem.setAttribute('short_name', parts[2])
col_elem.setAttribute('site', parts[0])
col_elem.setAttribute('inst', parts[1])
columns_elem.appendChild(col_elem)
head.appendChild(columns_elem)
data_elem = doc.createElement('data')
for idx, (t, row) in enumerate(frame.iterrows()):
row_elem = doc.createElement('row')
row_elem.setAttribute('id', str(idx))
row_elem.appendChild(doc.createTextNode(str(t)))
for point in row:
row_elem.appendChild(doc.createTextNode(sep))
row_elem.appendChild(doc.createTextNode(str(point)))
data_elem.appendChild(row_elem)
head.appendChild(data_elem)
# txt = doc.toprettyxml(indent=" ", encoding="utf-8")
txt = doc.toxml(encoding="utf-8")
return Response(txt, mimetype='text/xml'), code
def handle_error(fmt, error_str):
handler = ERROR_HANDLERS[fmt]
err_code, err_msg = data_responses.ERROR_MESSAGES.get(error_str, (400, error_str))
res = handler(err_code, err_msg)
if fmt == 'json':
res = jsonify(**res)
return res, err_code
RESPONSE_HANDLERS = {
'csv': handle_csv,
'xml': handle_xml,
'json': handle_json,
}
def modify_data(fmt, begin, end, site, inst, symbols, interval,
sep=',', order='columns', epoch=None):
if fmt not in RESPONSE_HANDLERS:
return render_template('400.html', format=fmt), 400
try:
# these will be either datetime or timedelta objects
begin = handle_time_string(begin) if begin else None
end = handle_time_string(end) if end else None
except (TypeError, ValueError):
return handle_error(fmt, 'malformed_timestamp')
if order not in ('column', 'row'):
return handle_error(fmt, 'bad_order')
if epoch and epoch not in data_responses.epoch_translation:
return handle_error(fmt, 'bad_epoch')
if not symbols:
return handle_error(fmt, 'missing_symbols')
if not interval:
interval = '1m'
elif interval not in data_responses.INTERVALS:
return handle_error(fmt, 'bad_interval')
if site and inst:
# shorthand for symbols that all use the same site and inst
short_symbols = symbols.split(':')
symbols = ["{}.{}.{}".format(site, inst, s) for s in short_symbols]
elif not site and not inst:
# each symbol is fully qualified with site.inst.symbol
short_symbols = None
symbols = symbols.split(':')
else:
return handle_error(fmt, 'missing_site_inst')
try:
influx_symbols = handle_symbols(symbols)
except ValueError as e:
return handle_error(fmt, str(e))
if calc_num_records(begin, end, interval) > data_responses.RESPONSES_LIMIT:
message = "Request will return too many values, please use files API"
code = 413
status = 'fail'
result = None
else:
message = ""
code = 200
status = 'success'
queries = build_queries(influx_symbols, begin, end, interval)
result = query(queries, epoch)
frame = handle_influxdb_result(result, influx_symbols, interval)
# order the resulting symbols the way the user requested
# assume time is the first column
frame = frame[symbols]
if site:
frame.columns = short_symbols
handler = RESPONSE_HANDLERS[fmt]
return handler(frame, epoch,
sep=sep, order=order,
status=status, code=code, message=message)