Commit 9e2b9c6f authored by Bruce Flynn's avatar Bruce Flynn

Another round of changes for station config file.

parent 11d7898c
Pipeline #1399 passed with stage
in 1 minute and 49 seconds
......@@ -30,7 +30,7 @@ class DataForm(Schema):
symbols = Regex(r'^[a-zA-Z_0-9|\.]+$', not_empty=True)
start = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
end = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
avg = Int(min=10, max=1440, if_missing=None, if_empty=None)
avg = Int(min=10, max=86400, if_missing=None, if_empty=None)
def stamp_to_dt(val):
......@@ -79,7 +79,7 @@ def data_csv_view(request):
get_slice = getattr(db, 'get_{}_slice'.format(form['type']))
data = get_slice(stations, symbols, start, end, avg=avg)
return Response(app_iter=csv_response(stations, symbols, data),
return Response(app_iter=csv_response(stations, symbols, form, data),
content_type='text/plain')
......@@ -142,20 +142,32 @@ def data_netcdf_view(request):
@view_config(route_name='stations', renderer='json')
def stations_view(request):
db = request.registry.settings['db']
return [{'name': r['name'],
'min_available': util.unixtime(r['min']) if r['min'] else None,
'max_available': util.unixtime(r['max']) if r['max'] else None}
for r in db.get_stations()]
stations = db.get_stations()
for name, dat in stations.items():
for type_, dat2 in dat.items():
min_ = dat2.pop('min')
max_ = dat2.pop('max')
dat2['min_available'] = util.unixtime(min_) if min_ else None
dat2['max_available'] = util.unixtime(max_) if max_ else None
def csv_response(stations, sensors, rows):
return stations
def csv_response(stations, sensors, form, rows):
fields = ['%s.%s' % (k, v)
for k in stations
for v in sensors]
LOG.debug('got rows.%s for %s', rows.shape, fields)
LOG.debug('got shape %s for %s', rows.shape, fields)
def yielder():
yield '# AMRC AWS\n'
yield '# Stations: {}\n'.format(','.join(stations))
yield '# Sensors: {}\n'.format(','.join(sensors))
yield '# Start: {}\n'.format(form['start'])
yield '# End: {}\n'.format(form['end'])
yield '# Type: {}\n'.format(form['type'])
yield '# Avg: {}\n'.format(form['avg'])
yield '# Fields: stamp,{}\n'.format(','.join(fields))
# Rows should always be a multiple of the number of stations. Therefore,
# stamp_idx will always point to the first station row for a given
......
......@@ -9,7 +9,6 @@ the file.
import argparse
import logging
from concurrent import futures
from amrc_aws import parsers, db, exc
LOG = logging
......@@ -23,10 +22,14 @@ def main():
'--delete', action='store_true',
help=('Purge rows before inserting. This will effectively be a rebuild '
'of the database data for all file types.'))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--rdr', action='store_true', help='Input files are RDR type')
group.add_argument('--q1h', action='store_true', help='Input files are Q1H type')
parser.add_argument(
'filelist', type=argparse.FileType('r'),
help=('File containing input files, one per line. This can also be '
'specified as - to read files from stdin.'))
'specified as - to read files from stdin. All files should be '
'of the type specified as an argument.'))
args = parser.parse_args()
......@@ -36,48 +39,24 @@ def main():
db.init()
type_ = 'rdr' if args.rdr else 'q1h'
if args.delete:
LOG.info('deleting rdr data')
db.purge_data('rdr')
LOG.info('deleting q1h data')
db.purge_data('q1h')
db.purge_data(type_)
def parse_error(filepath, lineno, line):
LOG.error('Could not parse %s:%s\n%s', filepath, lineno, line)
def worker(filepath):
LOG.debug('working on %s', filepath)
db.init()
parser = parsers.get_parser(filepath)
if not parser:
raise RuntimeError('Cannot find parser for %s' % filepath)
for filepath in (l.strip() for l in args.filelist):
LOG.info(filepath)
parser = getattr(parsers, type_)
try:
frames = parser.frames(filepath, error_callback=parse_error)
db.insert_frames(frames)
except (db.IntegrityError, exc.ParseError) as err:
raise RuntimeError('Insert failed for %s: %s' % (filepath, err.__class__.__name__))
except (db.IntegrityError, exc.ParseError, ValueError) as err:
LOG.warning('File insert failed(%s): %s', filepath, err.__class__.__name__)
with futures.ThreadPoolExecutor(max_workers=5) as pool:
filepaths = [l.strip() for l in args.filelist]
jobs_to_fpath = {}
for fpath in filepaths:
LOG.info('submitting %s', fpath)
job = pool.submit(worker, fpath)
jobs_to_fpath[job] = fpath
for job in futures.as_completed(jobs_to_fpath):
filepath = jobs_to_fpath[job]
try:
job.result()
except RuntimeError:
LOG.warn('%s failed, use file_insert for details', filepath)
except (SystemExit, KeyboardInterrupt):
LOG.warn('Exiting early!!')
break
for job in jobs_to_fpath:
job.cancel()
LOG.info('refreshing station dates')
db.refresh_station_dates(type_)
if __name__ == '__main__':
main()
#!/usr/bin/env python
"""
AWS API Database Refresher
Refreshes database periodic data. This may include averaging of data and
calculation of statistics.
The current implementation upates the date range statistics as well as the list
of available station names.
"""
import argparse
import logging
from amrc_aws import db
LOG = logging
def main():
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format='%(asctime)s -- %(message)s')
db.init()
LOG.info('Refreshing station dates')
db.refresh_station_dates()
db.engine.execute("""
INSERT INTO stations(name)
SELECT sitename FROM aws_sites
WHERE NOT EXISTS (
SELECT 1 FROM stations
WHERE stations.name == aws_sites.sitename)
""")
if __name__ == '__main__':
main()
......@@ -16,8 +16,10 @@ from sqlalchemy import (
Float,
DateTime,
)
from sqlalchemy.sql import compiler
from sqlalchemy.pool import NullPool
from sqlalchemy.exc import IntegrityError
from psycopg2.extensions import adapt as sqlescape
from metobs.data.calc import wind_vector_degrees
......@@ -87,18 +89,17 @@ def sanitize_name(name):
def get_stations():
with connection() as conn:
conn.execute("""
SELECT x.station, d.max, d.min FROM (
SELECT d.data_type, x.station, d.max, d.min FROM (
SELECT unnest AS station FROM unnest(ARRAY[%s])) AS x
LEFT JOIN station_dates d ON d.station = x.station
""" % ','.join("'%s'" % x for x in get_station_names()))
dat = {r['station']: dict(r) for r in conn.execute('SELECT * FROM station_dates').fetchall()}
zult = []
for name in get_station_names():
row = dat.get(name, {'name': name, 'max': None, 'min': None})
if name in dat:
row['max'] = dat['max']
row['min'] = dat['min']
zult.append(row)
dat = [dict(r) for r in conn.execute('SELECT * FROM station_dates').fetchall()]
zult = {}
for row in dat:
if row['station'] not in zult:
zult[row['station']] = {}
zult[row['station']][row['data_type']] = {'max': row['max'], 'min': row['min']}
return zult
......@@ -108,7 +109,8 @@ def get_station_names():
def purge_data(type_):
with connection() as conn:
conn.execute('DELETE FROM %S', (_data_table_name(type_),))
LOG.info('deleting %s data', type_)
conn.execute('DELETE FROM {:s}'.format(_data_table_name(type_)))
def insert_frames(frames):
......@@ -174,20 +176,26 @@ def _get_slice(type_, stations, symbols, start, end, avg):
table_name = _data_table_name(type_)
select = sql.text("""
SELECT
series.stamp, air_temp, vtempdiff, rh, pressure,
_wind_vector_east, _wind_vector_north
series.stamp, air_temp, vtempdiff, rh, pressure, _wind_vector_east, _wind_vector_north
FROM (
SELECT stamp.* FROM
generate_series(:start, :end, :interval) AS stamp
) AS series
SELECT name as station, stamp FROM
(SELECT generate_series(:start, :end, :interval) as stamp) stamps
LEFT OUTER JOIN
(SELECT unnest AS name FROM unnest(ARRAY{stations})) stations
ON True) AS series
LEFT OUTER JOIN (
SELECT * FROM {data_table} d
WHERE stamp >= :start AND stamp <= :end
AND station IN :stations
) AS data
ON series.stamp = data.stamp
AND data.station = series.station
ORDER BY series.stamp, data.station
""".format(data_table=table_name))
""".format(data_table=table_name, stations=[str(s) for s in stations]))
LOG.debug(_compile_query(select.bindparams(
start=start, end=end,
interval=timedelta(seconds=avg),
stations=tuple(stations))))
rows = _fetchall(
select, start=start, end=end,
interval=timedelta(seconds=avg),
......@@ -213,22 +221,39 @@ def _get_slice(type_, stations, symbols, start, end, avg):
return dat[:, colidxs]
def refresh_station_dates():
def refresh_station_dates(type_):
with connection() as conn:
for name in stations.names():
for station in stations.names():
dates = _table('station_dates')
conn.execute(dates.delete().where(dates.c.name == station.name))
conn.execute(sql.text(
'DELETE FROM station_dates WHERE station=:station AND data_type=:type'
), station=station, type=type_)
conn.execute(dates.delete()
.where(dates.c.station == station))
result = conn.execute(sql.text("""
SELECT name, min(stamp), max(stamp)
FROM data
WHERE data.station = :name
GROUP BY name
"""), name=name).fetchall()
SELECT station, min(stamp), max(stamp)
FROM %s
WHERE station = :name
GROUP BY station
""" % _data_table_name(type_)), name=station).fetchall()
if result:
conn.execute(sql.text("""
INSERT INTO station_dates
VALUES (:name, :min, :max)
"""), name=name, min=result[0].min, max=result[0].max)
INSERT INTO station_dates (data_type, station, min, max)
VALUES (:type, :name, :min, :max)
"""), type=type_, name=station, min=result[0].min, max=result[0].max)
"""
SELECT
stations.name, stamp.*
FROM
(
SELECT unnest AS name FROM unnest(ARRAY['Byrd', 'Manning'])
) AS stations
LEFT OUTER JOIN (
SELECT * FROM generate_series('1985-01-01', '1985-02-01', '10 minutes'::INTERVAL)
) AS stamp
"""
def create_schema():
......@@ -262,6 +287,22 @@ def create_schema():
station_dates_t = Table(
'station_dates', metadata,
Column('station', Text, primary_key=True),
Column('data_type', Text, primary_key=True),
Column('min', DateTime),
Column('max', DateTime))
station_dates_t.create()
def _compile_query(stmt):
"""Compile an expressional statement to an actual SQL query.
"""
dialect = engine.dialect
comp = compiler.SQLCompiler(dialect, stmt)
comp.compile()
enc = dialect.encoding
params = {}
for k, v in comp.params.iteritems():
if isinstance(v, unicode):
v = v.encode(enc)
params[k] = sqlescape(v)
return (comp.string.encode(enc) % params).decode(enc)
......@@ -5,9 +5,12 @@ NaN = float('nan')
def printable(x):
"""
Filter any non printable chars from string.
Filter any non printable chars and return str.
"""
x = x.encode('ascii', errors='ignore')
if isinstance(x, str):
x = x.decode('ascii', errors='ignore').encode('ascii')
else:
x = x.encode('ascii', errors='ignore')
return ''.join(c for c in x if 31 < ord(c) < 127)
......
......@@ -34,7 +34,7 @@ port = 5010
###
[loggers]
keys = root, app, lib
keys = root, app, lib, sa
[handlers]
keys = console
......@@ -46,6 +46,11 @@ keys = generic
level = INFO
handlers = console
[logger_sa]
level = INFO
handlers = console
qualname = sqlalchemy.engine
[logger_app]
level = DEBUG
handlers =
......
......@@ -23,7 +23,6 @@ setup(
'waitress',
'netCDF4',
'pyyaml',
'futures'
],
extras_require={
'testing': [
......
......@@ -37,7 +37,7 @@ def mock_database(monkeypatch):
@staticmethod
def get_stations():
return [{'name': n, 'max': None, 'min': None} for n in Db.stations]
return {n: {'rdr': {'max': None, 'min': None}} for n in Db.stations}
@staticmethod
def get_rdr_slice(stations, symbols, start, end, **kwargs):
......
......@@ -96,7 +96,7 @@ def test_file_insert_skips_duplicates(database, rdr_file):
def test_batch_insert(database, rdr_file, tmpdir):
script = 'python -m amrc_aws.cli.batch_insert -v -'
script = 'python -m amrc_aws.cli.batch_insert --rdr -v -'
fptr = tmpdir.join('files.txt')
fptr.write(rdr_file + '\n')
check_call(script, shell=True, stdin=open(fptr.strpath))
......
......@@ -27,11 +27,20 @@ def database():
db.create_schema()
with db.connection() as conn:
conn.execute("INSERT INTO station_dates(station, max, min) VALUES ('Byrd', now(), now())")
conn.execute("INSERT INTO station_dates(data_type, station, max, min) VALUES ('rdr', 'Byrd', now(), now())")
return sa.create_engine(os.environ['AWSAPI_DB_URL'])
def test_get_stations(database):
stations = db.get_stations()
assert len(stations) == 1
assert 'Byrd' in stations
assert 'rdr' in stations['Byrd']
assert 'max' in stations['Byrd']['rdr']
assert 'min' in stations['Byrd']['rdr']
def test_get_rdr_slice(database):
with db.connection() as conn:
station = db.get_station_names()[0]
......@@ -47,14 +56,16 @@ def test_get_rdr_slice(database):
start = datetime(2016, 1, 1)
end = datetime(2016, 1, 1, 0, 59, 59)
data = db.get_rdr_slice(['STATION1'], ['air_temp'], start, end, avg)
# data should return 12 rows becasue there are 12 5min intervals in 1 hour
assert len(data) == 6
avg = 600
data = db.get_rdr_slice(['STATION1'], ['air_temp'], start, end, avg)
# data should return 12 rows becasue there are 12 5min intervals in 1 hour
assert len(data) == 6
avg = 1200
data = db.get_rdr_slice(['STATION1'], ['air_temp'], start, end, avg)
assert len(data) == 3
def test_get_q1h_slice(database):
with db.connection() as conn:
......
......@@ -8,6 +8,9 @@ def test_unixtime():
def test_printable():
# accepts unicode input
assert util.printable(u'\u0129') == ''
assert util.printable(chr(25)) == ''
# strips not printable ascii
assert util.printable('\x0a') == ''
# nominal
assert util.printable('~') == '~'
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment