Commit ab84e520 authored by Bruce Flynn's avatar Bruce Flynn

Merge branch 'master' of gitlab.ssec.wisc.edu:brucef/AmrcAws

parents a0d5ed2c d977a497
Pipeline #1371 failed with stage
in 1 minute and 20 seconds
......@@ -6,5 +6,5 @@ env
build
dist
*.sw?
.coverage
.coverage*
.cache
......@@ -30,7 +30,7 @@ class DataForm(Schema):
symbols = Regex(r'^[a-zA-Z_0-9|\.]+$', not_empty=True)
start = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
end = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
avg = Int(min=10, max=1440, if_missing=10, if_empty=10)
avg = Int(min=10, max=1440, if_missing=60, if_empty=60)
def stamp_to_dt(val):
......@@ -73,7 +73,7 @@ def data_csv_view(request):
symbols = validate_symbols(form)
start = stamp_to_dt(form['start'])
end = stamp_to_dt(form['end'])
avg = int(form['avg']) * 60
avg = int(form['avg']) * 60 if form['avg'] else None
db = request.registry.settings['db']
get_slice = getattr(db, 'get_{}_slice'.format(form['type']))
......@@ -109,19 +109,29 @@ def data_netcdf_view(request):
symbols = validate_symbols(form)
start = stamp_to_dt(form['start'])
end = stamp_to_dt(form['end'])
avg = int(form['avg']) * 60
avg = int(form['avg']) * 60 if form['avg'] else None
# fetch data from database
db = request.registry.settings['db']
get_slice = getattr(db, 'get_{}_slice'.format(form['type']))
data = get_slice(stations, symbols, start, end, avg=avg)
if not data:
if not data.any():
raise HTTPNotFound('No data available matching your parameters')
# write rows to NetCDF
dest = mkstemp(suffix='.nc')[1]
nc.write_slice_to_netcdf(stations, symbols, data, dest)
attrs = {
'data_type': form['type'],
'start_time': form['start'],
'end_time': form['end'],
}
if form['avg']:
attrs.update({
'average_interval': form['avg'],
'average_units': 'minutes',
})
nc.write_slice_to_netcdf(stations, symbols, data, dest, attrs=attrs)
response = Response(app_iter=RemovingFileIter(open(dest)))
response.content_type = 'application/x-netcdf'
......
......@@ -5,8 +5,8 @@ AWS API Database Refresher
Refreshes database periodic data. This may include averaging of data and
calculation of statistics.
Currently, it's just updating the date range statistics so they do not need to
be recomputed every time which can be very expensive.
The current implementation upates the date range statistics as well as the list
of available station names.
"""
import argparse
import logging
......@@ -15,6 +15,7 @@ from amrc_aws import db
LOG = logging
def main():
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
......@@ -30,6 +31,13 @@ def main():
LOG.info('Refreshing station dates')
db.refresh_station_dates()
db.engine.execute("""
INSERT INTO stations(name)
SELECT sitename FROM aws_sites
WHERE NOT EXISTS (
SELECT 1 FROM stations
WHERE stations.name == aws_sites.sitename)
""")
if __name__ == '__main__':
main()
#!/usr/bin/env python
"""
Update the ``stations`` table to include all stations listed in the ``aws_sites``
table maintained by SamB.
"""
import argparse
import logging
from amrc_aws import db
LOG = logging
def main():
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
args = parser.parse_args()
db.init()
db.execute("""
INSERT INTO stations(name)
SELECT sitename FROM aws_sites
WHERE NOT EXISTS (
SELECT 1 FROM stations
WHERE stations.name == aws_sites.sitename)
""")
if __name__ == '__main__':
main()
......@@ -136,22 +136,21 @@ def insert_frames(frames):
return inserted
DEFAULT_AVG_SECS = 300
def _data_table_name(type_):
return 'data' if type_ == 'rdr' else 'data_{:s}'.format(type_)
def get_rdr_slice(stations, symbols, start, end, avg=DEFAULT_AVG_SECS):
def get_rdr_slice(stations, symbols, start, end, avg=None):
avg = max(avg, 600) if avg else 600
return _get_slice('rdr', stations, symbols, start, end, avg)
def get_q1h_slice(stations, symbols, start, end, avg=DEFAULT_AVG_SECS):
def get_q1h_slice(stations, symbols, start, end, avg=None):
avg = min(avg, 3600) if avg else 3600
return _get_slice('q1h', stations, symbols, start, end, avg)
def _get_slice(type_, stations, symbols, start, end, avg=300):
def _get_slice(type_, stations, symbols, start, end, avg):
"""
Get a slice of data for all station/symbol combinations. Data will be returned
for all combinations whether present or not and filled with NaN where not avialable.
......@@ -162,6 +161,7 @@ def _get_slice(type_, stations, symbols, start, end, avg=300):
'rh', 'pressure', 'wind_dir', 'wind_spd'.
:param start: Inclusive start datetime
:param end: Inclusive end datetime
:param avg: Averaging interval in seconds
"""
table_name = _data_table_name(type_)
select = sql.text("""
......
import logging
import numpy as np
import netCDF4
LOG = logging.getLogger(__name__)
NAME_STRLEN = 48
......@@ -100,14 +103,14 @@ variables = {
def _fill_dataset(stations, symbols, data, dataset):
num_stations = len(stations)
# data[0:data.shape[0]:2,2]
for symidx, symbol in enumerate(symbols):
# prefill with -999.9
arr = np.ones((len(stations), data.shape[0]/len(stations))) * -999.0
num_stations = len(stations)
for staidx, station in enumerate(stations):
# to get the rows for a particular station
# staidx:data.shape[0]:len(stations)
# staidx:data.shape[0]:num_stations
# to skip over the stamp colum
# symidx+1
arr[staidx,:] = data[staidx:data.shape[0]:num_stations,symidx+1].astype(float)
......@@ -115,15 +118,25 @@ def _fill_dataset(stations, symbols, data, dataset):
# set to fill where currently NaN
arr[np.where(arr != arr)] = var._FillValue
var[:] = arr
# set station names
var = dataset.variables['station_name']
var[:] = np.array(stations)
def write_slice_to_netcdf(stations, symbols, data, dest):
def write_slice_to_netcdf(stations, symbols, data, dest, attrs=None):
attrs = attrs or {}
dataset = netCDF4.Dataset(dest, mode='w') #, diskless=True)
# global attrs
for name, value in schema['globals'].items():
setattr(dataset, name, value)
for name, value in attrs.items():
setattr(dataset, name, value)
LOG.debug("data.shape: %s", data.shape)
LOG.debug(data)
# dimensions
for name, size in schema['dimensions'].items():
if size is not None:
......
......@@ -16,4 +16,6 @@ def is_nan(val):
def unixtime(dt):
if dt is None:
return None
return (dt - datetime(1970, 1, 1)).total_seconds()
......@@ -37,6 +37,5 @@ setup(
[console_scripts]
file_insert = amrc_aws.cli.file_insert:main
batch_insert = amrc_aws.cli.batch_insert:main
update_stations = amrc_aws.cli.update_stations
""",
)
......@@ -12,9 +12,7 @@ services:
db:
environment:
- POSTGRES_USER=docker
- POSTGRES_DB=docker
- POSTGRES_DB=__test
image: postgres:9.6
ports:
- 5432:5432
volumes:
- ./test_db_init.sh:/docker-entrypoint-initdb.d/test_db_init.sh
- 15432:5432
......@@ -52,7 +52,7 @@ Lat: 76.15S Lon: 168.40E Elev: 262m
def test_file_insert_q1h(database, q1h_file):
script = 'python -m amrc_aws.cli.file_insert -v {}'.format(q1h_file)
script = './env/bin/python -m amrc_aws.cli.file_insert -v {}'.format(q1h_file)
check_call(script, shell=True)
with db.connection() as conn:
......@@ -78,7 +78,7 @@ Lat : 82.32S Long : 75.99E Elev : 4027 M
def test_file_insert_rdr(database, rdr_file):
script = 'python -m amrc_aws.cli.file_insert -v {}'.format(rdr_file)
script = './env/bin/python -m amrc_aws.cli.file_insert -v {}'.format(rdr_file)
check_call(script, shell=True)
with db.connection() as conn:
......@@ -87,10 +87,10 @@ def test_file_insert_rdr(database, rdr_file):
def test_file_insert_skips_duplicates(database, rdr_file):
script = 'python -m amrc_aws.cli.file_insert -v {}'.format(rdr_file)
script = './env/bin/python -m amrc_aws.cli.file_insert -v {}'.format(rdr_file)
check_call(script, shell=True)
script = 'python -m amrc_aws.cli.file_insert -v {}'.format(rdr_file)
script = './env/bin/python -m amrc_aws.cli.file_insert -v {}'.format(rdr_file)
check_call(script, shell=True)
with db.connection() as conn:
......@@ -99,7 +99,7 @@ def test_file_insert_skips_duplicates(database, rdr_file):
def test_batch_insert(database, rdr_file, tmpdir):
script = 'python -m amrc_aws.cli.batch_insert -v -'
script = './env/bin/python -m amrc_aws.cli.batch_insert -v -'
fptr = tmpdir.join('files.txt')
fptr.write(rdr_file + '\n')
check_call(script, shell=True, stdin=open(fptr.strpath))
......
......@@ -21,7 +21,6 @@ def database():
with engine.connect() as conn:
conn.execute("DROP SCHEMA IF EXISTS {:s} CASCADE".format(schema))
conn.execute("CREATE SCHEMA {:s}".format(schema))
conn.execute("COMMIT")
engine.dispose()
db.init(sa.create_engine(os.environ['AWSAPI_DB_URL']))
......@@ -29,21 +28,27 @@ def database():
with db.connection() as conn:
conn.execute("INSERT INTO stations(name) VALUES ('STATION1')")
conn.execute("INSERT INTO station_dates(name, max, min) VALUES ('STATION1', now(), now())")
conn.execute("INSERT INTO stations(name) VALUES ('STATION2')")
return sa.create_engine(os.environ['AWSAPI_DB_URL'])
def test_station_names(database):
assert db.get_station_names() == ['STATION1']
assert db.get_station_names() == ['STATION1', 'STATION2']
def test_get_stations(database):
stations = db.get_stations()
assert len(stations) == 1
assert len(stations) == 2
assert isinstance(stations[0], dict)
assert stations[0].get('id')
assert stations[0].get('name') == 'STATION1'
# make sure None values for stations that don't have dates
assert stations[1].get('max') is None
assert stations[1].get('min') is None
def test_get_rdr_slice(database):
with db.connection() as conn:
......@@ -56,12 +61,12 @@ def test_get_rdr_slice(database):
('2016-01-01 00:30:00', :station_id, 3.0, 3.0, 3.0, 3.0, 0.0, 0.0)
"""), station_id=station_id)
avg = 300
avg = None
start = datetime(2016, 1, 1)
end = datetime(2016, 1, 1, 0, 59, 59)
data = db.get_rdr_slice(['STATION1'], ['air_temp'], start, end, avg)
# data should return 12 rows becasue there are 12 5min intervals in 1 hour
assert len(data) == 12
assert len(data) == 6
avg = 600
data = db.get_rdr_slice(['STATION1'], ['air_temp'], start, end, avg)
......
#!/usr/bin/env bash
set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" <<EOSQL
CREATE DATABASE __test;
GRANT ALL PRIVILEGES ON DATABASE __test TO docker;
EOSQL
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment