Commit 00de7b71 authored by Bruce Flynn's avatar Bruce Flynn

Changes to how station_availaility works.

Rather than keep max/min for stations instead keep a record of which
stations are available for which days.
parent 1f694c14
Pipeline #1446 failed with stage
in 1 minute and 25 seconds
......@@ -21,6 +21,14 @@ symbol_names = ('air_temp', 'vtempdiff', 'pressure', 'rh',
'wind_spd', 'wind_dir', 'wind_dir_east', 'wind_dir_north')
def get_form(schema, request):
try:
return schema().to_python(request.params)
except Invalid as err:
raise HTTPBadRequest('Invalid request parameters: {:s}'
.format(err))
class DataForm(Schema):
allow_extra_fields = True
filter_extra_fields = False
......@@ -33,16 +41,15 @@ class DataForm(Schema):
avg = Int(min=10, max=86400, if_missing=None, if_empty=None)
class StationsAvailableForm(Schema):
class StationsForm(Schema):
allow_extra_fields = True
filter_extra_fields = False
type = OneOf(['rdr', 'q1h'], if_missing='q1h', if_empty='q1h')
start = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
end = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
start = Regex(r'^\d\d\d\d-\d\d-\d\d$', if_empty=None, if_missing=None)
end = Regex(r'^\d\d\d\d-\d\d-\d\d$', if_empty=None, if_missing=None)
def stamp_to_dt(val):
return datetime.strptime(val, '%Y-%m-%dT%H:%M:%SZ')
def stamp_to_dt(val, fmt='%Y-%m-%dT%H:%M:%SZ'):
return datetime.strptime(val, fmt)
def validate_stations(form):
......@@ -72,10 +79,7 @@ def validate_symbols(form):
@view_config(route_name='data_form_csv')
def data_csv_view(request):
# query parameter parsing
try:
form = DataForm.to_python(request.params)
except Invalid as err:
raise HTTPBadRequest('Invalid request params: %s' % str(err))
form = get_form(DataForm, request)
stations = validate_stations(form)
symbols = validate_symbols(form)
......@@ -106,10 +110,7 @@ class RemovingFileIter(FileIter):
@view_config(route_name='data_form_netcdf')
def data_netcdf_view(request):
# query parameter parsing
try:
form = DataForm.to_python(request.params)
except Invalid as err:
raise HTTPBadRequest('Invalid request params: %s' % str(err))
form = get_form(DataForm, request)
# more form validation stuff that was easier to do outside the context of
# formencode.
......@@ -150,27 +151,10 @@ def data_netcdf_view(request):
@view_config(route_name='stations', renderer='json')
def stations_view(request):
db = request.registry.settings['db']
stations = db.get_stations()
for name, dat in stations.items():
for type_, dat2 in dat.items():
min_ = dat2.pop('min')
max_ = dat2.pop('max')
dat2['min_available'] = util.unixtime(min_) if min_ else None
dat2['max_available'] = util.unixtime(max_) if max_ else None
return stations
@view_config(route_name='stations_available', renderer='json')
def stations_available(request):
db = request.registry.settings['db']
# query parameter parsing
try:
form = StationsAvailableForm.to_python(request.params)
except Invalid as err:
raise HTTPBadRequest('Invalid request params: %s' % str(err))
return db.stations_available(form['type'], form['start'], form['end'])
form = get_form(StationsForm, request)
start = stamp_to_dt(form['start'] or '1958-01-01', '%Y-%m-%d')
end = stamp_to_dt(form['end'] or datetime.utcnow().strftime('%Y-%m-%d'), '%Y-%m-%d')
return db.get_stations(start, end)
def csv_response(stations, sensors, form, rows):
......
......@@ -55,8 +55,5 @@ def main():
except (db.IntegrityError, exc.ParseError, ValueError) as err:
LOG.warning('File insert failed(%s): %s', filepath, err.__class__.__name__)
LOG.info('refreshing station dates')
db.refresh_station_dates(type_)
if __name__ == '__main__':
main()
import logging
from datetime import datetime
from collections import defaultdict
from datetime import timedelta
from contextlib import contextmanager
......@@ -86,36 +87,16 @@ def sanitize_name(name):
return stations.cononical_name(name)
def get_stations():
with connection() as conn:
conn.execute("""
SELECT d.data_type, x.station, d.max, d.min FROM (
SELECT unnest AS station FROM unnest(ARRAY[%s])) AS x
LEFT JOIN station_dates d ON d.station = x.station
""" % ','.join("'%s'" % x for x in get_station_names()))
dat = [dict(r) for r in conn.execute('SELECT * FROM station_dates').fetchall()]
zult = {}
for row in dat:
if row['station'] not in zult:
zult[row['station']] = {}
zult[row['station']][row['data_type']] = {'max': row['max'], 'min': row['min']}
return zult
def stations_available(type_, start, end):
def get_stations(start=None, end=None):
start = start or datetime(1958, 1, 1)
end = end or datetime.utcnow()
with connection() as conn:
query = sql.text('''
SELECT count(*), station
FROM %s
WHERE stamp >= :start
AND stamp < :end
GROUP BY station
HAVING count(*) > 0
SELECT distinct(station) FROM station_dates
WHERE stamp >= :start AND stamp <= :end
ORDER BY station
''' % _data_table_name(type_))
return [{'name': r['station'], 'count': r['count']}
for r in conn.execute(query, start=start, end=end).fetchall()]
''')
return [r[0] for r in conn.execute(query, start=start, end=end)]
def get_station_names():
......@@ -236,39 +217,23 @@ def _get_slice(type_, stations, symbols, start, end, avg):
return dat[:, colidxs]
def refresh_station_dates(type_):
def refresh_station_dates():
"""
Refresh data date's available for each station, i.e., `station_dates`
table. Dates are based only on the RDR data.
"""
with connection() as conn:
for station in stations.names():
dates = _table('station_dates')
conn.execute(sql.text(
'DELETE FROM station_dates WHERE station=:station AND data_type=:type'
), station=station, type=type_)
conn.execute(dates.delete()
.where(dates.c.station == station))
result = conn.execute(sql.text("""
SELECT station, min(stamp), max(stamp)
FROM %s
WHERE station = :name
GROUP BY station
""" % _data_table_name(type_)), name=station).fetchall()
if result:
conn.execute(sql.text("""
INSERT INTO station_dates (data_type, station, min, max)
VALUES (:type, :name, :min, :max)
"""), type=type_, name=station, min=result[0].min, max=result[0].max)
"""
SELECT
stations.name, stamp.*
FROM
(
SELECT unnest AS name FROM unnest(ARRAY['Byrd', 'Manning'])
) AS stations
LEFT OUTER JOIN (
SELECT * FROM generate_series('1985-01-01', '1985-02-01', '10 minutes'::INTERVAL)
) AS stamp
"""
conn.execute('DELETE FROM station_dates')
conn.execute("""
INSERT INTO station_dates
SELECT station, stamp
FROM (
SELECT date_trunc('day', stamp) AS stamp, station, count(*)
FROM data
GROUP BY stamp, station
HAVING count(*) > 0) AS x
GROUP BY stamp, station;
""")
def create_schema():
......@@ -302,9 +267,7 @@ def create_schema():
station_dates_t = Table(
'station_dates', metadata,
Column('station', Text, primary_key=True),
Column('data_type', Text, primary_key=True),
Column('min', DateTime),
Column('max', DateTime))
Column('stamp', DateTime, primary_key=True))
station_dates_t.create()
......
......@@ -38,5 +38,6 @@ setup(
[console_scripts]
file_insert = amrc_aws.cli.file_insert:main
batch_insert = amrc_aws.cli.batch_insert:main
refresh_stations = amrc_aws.cli.batch_insert:main
""",
)
......@@ -36,8 +36,8 @@ def mock_database(monkeypatch):
return Db.stations
@staticmethod
def get_stations():
return {n: {'rdr': {'max': None, 'min': None}} for n in Db.stations}
def get_stations(start=None, end=None):
return Db.stations
@staticmethod
def get_rdr_slice(stations, symbols, start, end, **kwargs):
......@@ -92,12 +92,9 @@ class Test(object):
def test_get_stations(self, pyramid_app):
pyramid_app.get('/stations.json', status=200)
def test_stations_available(self, pyramid_app):
pyramid_app.get('/stations_available.json', status=400)
pyramid_app.get((
'/stations_available.json?'
'type=rdr&'
'start=2016-01-01T00:00:00Z&'
'end=2017-01-01T00:00:00Z'
'/stations.json?'
'start=2016-01-01&'
'end=2017-01-01'
), status=200)
......@@ -104,3 +104,12 @@ def test_batch_insert(database, rdr_file, tmpdir):
with db.connection() as conn:
count = conn.execute('select count(*) from data').fetchone().count
assert count == 6
def test_refresh_stations(database, rdr_file, tmpdir):
script = 'python -m amrc_aws.cli.refresh_stations -v'
check_call(script, shell=True)
with db.connection() as conn:
count = conn.execute('select count(*) from station_dates').fetchone().count
assert count == 0
import os
from datetime import datetime
from datetime import datetime, timedelta
import pytest
import sqlalchemy as sa
......@@ -27,7 +27,7 @@ def database():
db.create_schema()
with db.connection() as conn:
conn.execute("INSERT INTO station_dates(data_type, station, max, min) VALUES ('rdr', 'Byrd', now(), now())")
conn.execute("INSERT INTO station_dates(station, stamp) VALUES ('Byrd', now())")
return sa.create_engine(os.environ['AWSAPI_DB_URL'])
......@@ -36,10 +36,15 @@ def test_get_stations(database):
stations = db.get_stations()
assert len(stations) == 1
assert 'Byrd' in stations
assert 'rdr' in stations['Byrd']
assert 'max' in stations['Byrd']['rdr']
assert 'min' in stations['Byrd']['rdr']
now = datetime.utcnow()
oneday = timedelta(days=1)
stations = db.get_stations(now - oneday, now + oneday)
assert len(stations) == 1
tendays = timedelta(days=10)
stations = db.get_stations(now + tendays, now + tendays)
assert len(stations) == 0
def test_get_rdr_slice(database):
with db.connection() as conn:
......@@ -90,25 +95,3 @@ def test_get_q1h_slice(database):
# data should return 12 rows becasue there are 12 5min intervals in 1 hour
assert len(data) == 6
def test_stations_available(database):
with db.connection() as conn:
station = db.get_station_names()[0]
conn.execute(sa.text("""
INSERT INTO data VALUES
('2016-01-01 00:00:00', :station_id, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0),
('2016-01-01 00:10:00', :station_id, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0),
('2016-01-01 00:20:00', :station_id, 2.0, 2.0, 2.0, 2.0, 0.0, 0.0),
('2016-01-01 00:30:00', :station_id, 3.0, 3.0, 3.0, 3.0, 0.0, 0.0)
"""), station_id=station)
dat = db.stations_available('rdr', datetime(2016, 1, 1), datetime(2017, 1, 1))
assert len(dat) == 1
assert dat[0]['name']
assert dat[0]['count'] == 4
dat = db.stations_available('rdr', datetime(2015, 1, 1), datetime(2016, 1, 1))
assert len(dat) == 0
dat = db.stations_available('q1h', datetime(2016, 1, 1), datetime(2017, 1, 1))
assert len(dat) == 0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment