Commit f0ca2a5e authored by Bruce Flynn's avatar Bruce Flynn

Stations from config file rather than db.

Get station names from a YAML file.
parent 934bf409
*.egg-info
*.pyc
.eggs
.idea
env
build
dist
......
dataSources*
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Chameleon" />
<option name="TEMPLATE_FOLDERS">
<list>
<option value="$MODULE_DIR$/env/lib/python2.7/site-packages/pyramid_debugtoolbar/templates" />
</list>
</option>
</component>
<component name="TestRunnerService">
<option name="projectConfiguration" value="py.test" />
<option name="PROJECT_TEST_RUNNER" value="py.test" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="dataSourceStorageLocal">
<data-source name="__test@localhost" uuid="9e3aea9e-c0d4-4732-92c6-8e2667359f33">
<database-info product="PostgreSQL" version="9.6.1" jdbc-version="4.0" driver-name="PostgreSQL Native Driver" driver-version="PostgreSQL 9.4 JDBC4 (build 1201)">
<identifier-quote-string>&quot;</identifier-quote-string>
</database-info>
<case-sensitivity plain-identifiers="lower" quoted-identifiers="exact" />
<secret-storage>master_key</secret-storage>
<user-name>docker</user-name>
<resolve-scope>__test:docker,public</resolve-scope>
</data-source>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourceManagerImpl" format="xml" multifile-model="true">
<data-source source="LOCAL" name="__test@localhost" uuid="9e3aea9e-c0d4-4732-92c6-8e2667359f33">
<driver-ref>postgresql</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>org.postgresql.Driver</jdbc-driver>
<jdbc-url>jdbc:postgresql://localhost:15432/__test</jdbc-url>
</data-source>
</component>
</project>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="useProjectProfile" value="false" />
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 2.7.13 (~/code/AmrcAws/env/bin/python)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/AmrcAws.iml" filepath="$PROJECT_DIR$/.idea/AmrcAws.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SqlDialectMappings">
<file url="file://$PROJECT_DIR$" dialect="PostgreSQL" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
This diff is collapsed.
......@@ -14,7 +14,7 @@ install::
curl -O https://gitlab.ssec.wisc.edu/brucef/AmrcAws/raw/<VERSION>/scripts/install.sh
This will install AWS API in the current direcory. In general I recommend
This will install AWS API in the current directory. In general I recommend
using ``/opt/awsapi`` and will assume this is where you are installing.
You then simply have to run the ``install.sh`` script, which will do the
......@@ -29,7 +29,7 @@ following:
#. Install supervisord_ process watcher which is used to keep the processes
running in the case of error.
#. Create a few directories
gt
Starting AWS API
----------------
......
......@@ -30,7 +30,7 @@ class DataForm(Schema):
symbols = Regex(r'^[a-zA-Z_0-9|\.]+$', not_empty=True)
start = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
end = Regex(r'^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$', not_empty=True)
avg = Int(min=10, max=1440, if_missing=60, if_empty=60)
avg = Int(min=10, max=1440, if_missing=None, if_empty=None)
def stamp_to_dt(val):
......
......@@ -9,6 +9,7 @@ the file.
import argparse
import logging
from concurrent import futures
from amrc_aws import parsers, db, exc
LOG = logging
......@@ -18,6 +19,10 @@ def main():
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument(
'--delete', action='store_true',
help=('Purge rows before inserting. This will effectively be a rebuild '
'of the database data for all file types.'))
parser.add_argument(
'filelist', type=argparse.FileType('r'),
help=('File containing input files, one per line. This can also be '
......@@ -31,22 +36,48 @@ def main():
db.init()
if args.delete:
LOG.info('deleting rdr data')
db.purge_data('rdr')
LOG.info('deleting q1h data')
db.purge_data('q1h')
def parse_error(filepath, lineno, line):
LOG.error('Could not parse %s:%s\n%s', filepath, lineno, line)
for filepath in (l.strip() for l in args.filelist):
LOG.info(filepath)
def worker(filepath):
LOG.debug('working on %s', filepath)
db.init()
parser = parsers.get_parser(filepath)
if not parser:
LOG.warn('Cannot find parser for %s', filepath)
continue
raise RuntimeError('Cannot find parser for %s' % filepath)
try:
frames = parser.frames(filepath, error_callback=parse_error)
db.insert_frames(frames)
except (db.IntegrityError, exc.ParseError) as err:
LOG.error('File insert failed(%s): %s', filepath, err.__class__.__name__)
LOG.debug('%s', filepath, exc_info=1)
raise RuntimeError('Insert failed for %s: %s' % (filepath, err.__class__.__name__))
with futures.ThreadPoolExecutor(max_workers=5) as pool:
filepaths = [l.strip() for l in args.filelist]
jobs_to_fpath = {}
for fpath in filepaths:
LOG.info('submitting %s', fpath)
job = pool.submit(worker, fpath)
jobs_to_fpath[job] = fpath
for job in futures.as_completed(jobs_to_fpath):
filepath = jobs_to_fpath[job]
try:
job.result()
except RuntimeError:
LOG.warn('%s failed, use file_insert for details', filepath)
except (SystemExit, KeyboardInterrupt):
LOG.warn('Exiting early!!')
break
for job in jobs_to_fpath:
job.cancel()
if __name__ == '__main__':
main()
import os
import logging
LOG = logging.getLogger(__name__)
DB_SCHEMA = os.environ.get('AWSAPI_DB_SCHEMA', 'awsapi')
DB_URL = os.environ.get('AWSAPI_DB_URL', 'postgresql://apiadmin@/aws')
def getenv(name, default=None):
if os.getenv(name) is None and default is None:
raise EnvironmentError('Environment variable {} is required.'.format(name))
elif os.getenv(name) is None:
LOG.info('%s not specified, using default %s', name, default)
return os.environ.get(name, default)
DB_SCHEMA = getenv('AWSAPI_DB_SCHEMA', 'awsapi')
DB_URL = getenv('AWSAPI_DB_URL', 'postgresql://apiadmin@/aws')
STATION_DB = getenv('AWSAPI_STATION_DB', 'stations.yaml')
if not os.path.exists(STATION_DB):
raise EnvironmentError('Station db {:s} does not exist'.format(STATION_DB))
......@@ -10,7 +10,7 @@ from sqlalchemy import (
MetaData,
Table,
Column,
ForeignKey,
String,
Text,
Integer,
Float,
......@@ -23,6 +23,7 @@ from metobs.data.calc import wind_vector_degrees
from amrc_aws.util import NaN
from amrc_aws import config
from amrc_aws.station_aliases import stations
LOG = logging.getLogger(__name__)
......@@ -45,6 +46,9 @@ def init(_engine=None):
engine = create_engine(
config.DB_URL, poolclass=NullPool,
connect_args={'application_name': 'AmrcAwsApi'})
engine.execute("CREATE SCHEMA IF NOT EXISTS {:s}".format(config.DB_SCHEMA))
metadata.bind = engine
metadata.reflect(schema=metadata.schema)
......@@ -73,41 +77,45 @@ def _fetchall(*args, **kwargs):
return conn.execute(*args, **kwargs).fetchall()
# Names with known issues that have to be mapped to their canonical name in
# the _code_from_name map
_dirty_names = {
'CapeBird': 'Cape Bird',
'MADISON': 'Madison',
'Manuela (Inex Is)': 'Manuela',
'Martha': 'Martha I',
'Martha 1': 'Martha I',
'Martha 2': 'Martha II',
'MarblePoint': 'Marble Point',
'MinnaBluff': 'Minna Bluff',
'PegasusNorth': 'Pegasus North',
'Siple': 'Siple Station',
'Ski Hi': 'Ski-Hi',
'Sky Blu': 'Sky-Blu',
'WhiteIsland': 'White Island',
'WillieField': 'Willie Field',
'WindlessBight': 'Windless Bight',
'Uranus Gl.': 'Uranus Glacier'
}
def sanitize_name(name):
"""
Map an un-formatted name from the RDR file to the canonical name.
"""
return _dirty_names.get(name, name)
return stations.cononical_name(name)
def get_stations():
with connection() as conn:
conn.execute("""
SELECT x.station, d.max, d.min FROM (
SELECT unnest AS station FROM unnest(ARRAY[%s])) AS x
LEFT JOIN station_dates d ON d.station = x.station
""" % ','.join("'%s'" % x for x in get_station_names()))
dat = {r['station']: dict(r) for r in conn.execute('SELECT * FROM station_dates').fetchall()}
zult = []
for name in get_station_names():
row = dat.get(name, {'name': name, 'max': None, 'min': None})
if name in dat:
row['max'] = dat['max']
row['min'] = dat['min']
zult.append(row)
return zult
def get_station_names():
return list(stations.names())
def purge_data(type_):
with connection() as conn:
conn.execute('DELETE FROM %S', (_data_table_name(type_),))
def insert_frames(frames):
stations = {s['name']:s for s in get_stations()}
inserts = defaultdict(list)
for frame in frames:
station_id = stations.get(frame['_site'], {}).get('id')
if not station_id:
station_name = stations.cononical_name(frame['_site'])
if not station_name:
LOG.warn('No valid station for site "%s", skipping', frame['_site'])
continue
stamp = frame['_stamp']
......@@ -123,7 +131,7 @@ def insert_frames(frames):
if nans == len(fields):
continue
dat.update({'stamp': stamp, 'station_id': station_id})
dat.update({'stamp': stamp, 'station': station_name})
inserts[table].append(dat)
inserted = 0
......@@ -169,18 +177,15 @@ def _get_slice(type_, stations, symbols, start, end, avg):
series.stamp, air_temp, vtempdiff, rh, pressure,
_wind_vector_east, _wind_vector_north
FROM (
SELECT stations.id as station_id, stamp.* FROM
SELECT stamp.* FROM
generate_series(:start, :end, :interval) AS stamp
LEFT JOIN stations ON name IN :stations
) AS series
LEFT OUTER JOIN (
SELECT s.name as station, d.* FROM {data_table} d, stations s
WHERE d.station_id = s.id
AND stamp >= :start AND stamp <= :end
AND s.name IN :stations
SELECT * FROM {data_table} d
WHERE stamp >= :start AND stamp <= :end
AND station IN :stations
) AS data
ON series.stamp = data.stamp
AND series.station_id = data.station_id
ORDER BY series.stamp, data.station
""".format(data_table=table_name))
rows = _fetchall(
......@@ -208,59 +213,32 @@ def _get_slice(type_, stations, symbols, start, end, avg):
return dat[:, colidxs]
def get_stations():
if get_stations.cache is None:
results = _fetchall(sql.text("""
SELECT s.name, s.id, d.min, d.max FROM stations s
LEFT JOIN station_dates d
ON s.name = d.name
"""))
get_stations.cache = [dict(r) for r in results]
return get_stations.cache
get_stations.cache = None
def get_station_names():
if get_station_names.cache is None:
results = _fetchall(_table('stations').select())
get_station_names.cache = [r.name for r in results]
return get_station_names.cache
get_station_names.cache = None
def refresh_station_dates():
with connection() as conn:
for station in _fetchall(_table('stations').select()):
for name in stations.names():
dates = _table('station_dates')
conn.execute(dates.delete().where(dates.c.name == station.name))
result = conn.execute(sql.text("""
SELECT name, min(stamp), max(stamp)
FROM stations, data
WHERE stations.id = data.station_id
AND stations.name = :name
GROUP BY stations.name
"""), name=station.name).fetchall()
FROM data
WHERE data.station = :name
GROUP BY name
"""), name=name).fetchall()
if result:
conn.execute(sql.text("""
INSERT INTO station_dates
VALUES (:name, :min, :max)
"""), name=station.name, min=result[0].min, max=result[0].max)
"""), name=name, min=result[0].min, max=result[0].max)
def create_schema():
"""
Create database schema. Must have already called init.
"""
stations_t = Table(
'stations', metadata,
Column('id', Integer, primary_key=True),
Column('name', Text, nullable=False))
stations_t.create()
data_t = Table(
'data', metadata,
Column('stamp', DateTime, primary_key=True),
Column('station_id', Integer, ForeignKey(stations_t.c.id), primary_key=True),
Column('station', String, primary_key=True),
Column('air_temp', Float),
Column('vtempdiff', Float),
Column('rh', Float),
......@@ -272,7 +250,7 @@ def create_schema():
data_q1h_t = Table(
'data_q1h', metadata,
Column('stamp', DateTime, primary_key=True),
Column('station_id', Integer, ForeignKey(stations_t.c.id), primary_key=True),
Column('station', String, primary_key=True),
Column('air_temp', Float),
Column('vtempdiff', Float),
Column('rh', Float),
......@@ -283,7 +261,7 @@ def create_schema():
station_dates_t = Table(
'station_dates', metadata,
Column('name', Text, primary_key=True),
Column('station', Text, primary_key=True),
Column('min', DateTime),
Column('max', DateTime))
station_dates_t.create()
......@@ -134,9 +134,6 @@ def write_slice_to_netcdf(stations, symbols, data, dest, attrs=None):
for name, value in attrs.items():
setattr(dataset, name, value)
LOG.debug("data.shape: %s", data.shape)
LOG.debug(data)
# dimensions
for name, size in schema['dimensions'].items():
if size is not None:
......
......@@ -143,17 +143,19 @@ def frames(filepath, frame_width=FRAME_WIDTH, error_callback=None):
lines = [printable(l).strip() for l in open(filepath)]
header = read_header(lines[:2])
aid = header['aid']
for idx, line in enumerate(lines[2:]):
try:
frame = make_frame(line)
except ParseError:
error_callback(filepath, idx - 1, line)
continue
frame.update({
'_type': 'q1h',
'_argosid': aid,
'_site': db.sanitize_name(header['name']),
'_stamp': get_frame_stamp(frame['year'], frame['month'], frame['day'], frame['hhmm'])
})
yield frame
station_name = db.sanitize_name(header['name'])
if station_name:
aid = header['aid']
for idx, line in enumerate(lines[2:]):
try:
frame = make_frame(line)
except ParseError:
error_callback(filepath, idx - 1, line)
continue
frame.update({
'_type': 'q1h',
'_argosid': aid,
'_site': station_name,
'_stamp': get_frame_stamp(frame['year'], frame['month'], frame['day'], frame['hhmm'])
})
yield frame
......@@ -146,16 +146,18 @@ def frames(filepath, frame_width=FRAME_WIDTH, error_callback=None):
lines = [printable(l).strip() for l in open(filepath)]
header = read_header(lines[:2])
aid = header['aid']
for idx, line in enumerate(lines[2:]):
try:
frame = make_frame(line)
except ParseError:
error_callback(filepath, idx - 1, line)
continue
frame.update({
'_type': 'rdr',
'_argosid': aid,
'_site': db.sanitize_name(header['name']),
'_stamp': get_frame_stamp(base_stamp, frame['jday'], frame['obnum'], frame_width)
})
yield frame
station_name = db.sanitize_name(header['name'])
if station_name:
for idx, line in enumerate(lines[2:]):
try:
frame = make_frame(line)
except ParseError:
error_callback(filepath, idx - 1, line)
continue
frame.update({
'_type': 'rdr',
'_argosid': aid,
'_site': station_name,
'_stamp': get_frame_stamp(base_stamp, frame['jday'], frame['obnum'], frame_width)
})
yield frame
import yaml
import config
def load_alias_map(dbname):
"""Load the station database file returning a map of aliases to their connonical name
"""
dat = yaml.load(open(dbname))
alias_map = {}
for name, aliases in dat.items():
alias_map[name] = name
if aliases:
alias_map.update({a: name for a in aliases})
alias_map.update({a.upper(): name for a in aliases})
return alias_map
class StationDb(object):
def __init__(self, dbname=None):
self._aliases = load_alias_map(dbname or config.STATION_DB)
def cononical_name(self, name):
return self._aliases.get(name)
def names(self):
return set(self._aliases.values())
stations = StationDb()
......@@ -22,6 +22,8 @@ setup(
'formencode',
'waitress',
'netCDF4',
'pyyaml',
'futures'
],
extras_require={
'testing': [
......
_dirty_names = {
'CapeBird': 'Cape Bird',
'MADISON': 'Madison',
'Manuela (Inex Is)': 'Manuela',
'Martha': 'Martha I',
'Martha 1': 'Martha I',
'Martha 2': 'Martha II',
'MarblePoint': 'Marble Point',
'MinnaBluff': 'Minna Bluff',
'PegasusNorth': 'Pegasus North',
'Siple': 'Siple Station',
'Ski Hi': 'Ski-Hi',
'Sky Blu': 'Sky-Blu',
'WhiteIsland': 'White Island',
'WillieField': 'Willie Field',
'WindlessBight': 'Windless Bight',
'Uranus Gl.': 'Uranus Glacier'
}
'Cape Bird' = ['CapeBird']
# AWS Station database
#
# Stations listed here will be supported by the API, any stations not listed
# will be ignored by the system.
#
# The minimum configuration for a station is just it's canonical name followed
# by a colon on a single line, like so:
#
# <station_name>:
#
# Station names are notoriously inconsistent, therefore each station is allowed
# to have a list of aliases which will be used to map a station to it's canonical
# name. Aliases can be specified like so:
#
# <station_name>:
# aliases:
# - <alias1>
# - <alias2>
# ... etc ...
#
# You should use the aliases for stations that have names from the data files that
# are different from the cononical names.
#
A028B:
A22-A:
Adelaide:
AGO-1:
AGO-2:
AGO-3:
AGO-4:
AGO-5:
AGO-6:
AGO-A81:
AGO-A84:
Alessandra:
Alexander Tall Tower!:
Allison:
Amery G3:
Amsler Island:
Arelis:
Asgard:
Atka:
Atka Bay:
Austin:
AWS 1:
AWS 14:
AWS 15:
AWS 17:
AWS 18:
AWS 18:
AWS 2:
AWS 3:
B-15K:
Baldrick:
BAS-AGO:
Bear Peninsula:
Berkner Island:
Biesiada Crevasse:
Bisco Islands:
Bonaparte Point:
Bowers:
Bratina Island:
Brianna:
Buckle Island:
Bull Pass:
Butler Island:
Byrd:
C-16:
Camp Maudheimvida:
Cape Adams:
Cape Bird:
aliases:
- Cape Bird
- CAPEBIRD
Cape Burks:
Cape Denison:
Cape Hallett:
Cape Poinsett:
Cape Royds:
Cape Spencer:
Cape Webb:
Carolyn:
Casey Skiway:
China:
Cierva Cove:
Clean Air:
Concordia:
Cones:
D-0:
D-1:
D-10:
D-17:
D-17 (FRA):
D-3:
D-47:
D-57:
D-80:
D-85:
Darwin Glacier - Central Valley:
Daughter 1 (B-15A):
Daughter 1 (B-15J):
DC-N:
DC-S:
Dismal Island:
Dolleman Island:
Dome A:
Dome C:
Dome C (FRA):
Dome C II:
Dome F (Fuji):
Dome Fuji:
Dome Fuji (JPN):
Doug:
Druznaya-4:
E-66:
Eagle:
Ekstrom:
Ekstrom Shelf Ice:
Elaine:
Elizabeth:
Emilia:
Emma:
Eneide:
Eric:
Erin:
Evans Knoll:
Evans Piedmont Glacier:
Ferrar:
Ferrell:
Ferrell II:
Filchner:
Fogle:
Ford Rock:
Fossil Bluff:
Fountain:
Garwood Valley:
Gerlache Strait:
Gill:
Giulia:
Granite Harbour:
Halley Bay:
Halley V:
Halvfarryggen:
Harry:
Haupt Nunataks:
Henry:
Herbie Alley:
Hugo Island:
Irene:
Janet:
Jang Bogo:
JASE2007:
J.C.:
Jimmy:
Joinville Is:
Jules:
Jurassic:
K1: