Skip to content
Snippets Groups Projects
Commit 605e315f authored by Alan De Smet's avatar Alan De Smet
Browse files

Factor out bracketed forecast downloads from GFS example

Reimplement GFS in terms of the new BracketedForecaseDownloader
parent ba430dc0
Branches develop
No related tags found
No related merge requests found
......@@ -36,6 +36,7 @@ import errno
from csppfetch.exclusivelockfile import AtomicCreateIfMissing
import csppfetch.daterange
from csppfetch.humanbytes import human_bytes
from csppfetch.roundtozero import roundtozero
ENV_URL_SUFFIX = "URL"
ENV_TIMEOUT_SUFFIX = "TIMEOUT"
......@@ -1224,4 +1225,132 @@ class Downloader:
#delete_old_files(dst, oldest_valid)
class BoundedForecastDownloader (Downloader):
""" Fetch a pair of forecasts bounding the processing time
Does not support {priority}.
Currently only supports forecasts specified in integer hours;
it should be extensible (See url_to_file_for_time).
Used by cspp-geo-aitf for retrieving GFS and RAP inputs.
"""
def __init__(self,
name,
package_env_id,
url_base,
url_relative,
local,
period,
forecast_step,
first_forecast,
shortest_valid_forecast,
longest_valid_forecast,
epoch_start = datetime.datetime(2000,1,1,0,0,0),
priorities = None,
oldest_usable = timedelta(days=7),
oldest_cache = timedelta(days=7),
expected_newest = None,
env_overrides = {},
):
super().__init__(
name=name,
package_env_id=package_env_id,
url_base=url_base,
url_relative=url_relative,
local=local,
period=period,
epoch_start=epoch_start,
priorities=priorities,
oldest_usable=oldest_usable,
oldest_cache=oldest_cache,
expected_newest=expected_newest,
env_overrides=env_overrides)
self.forecast_step = forecast_step
self.first_forecast = first_forecast
self.shortest_valid_forecast = shortest_valid_forecast
self.longest_valid_forecast = longest_valid_forecast
valid_multiple = (self.shortest_valid_forecast-self.first_forecast)/self.forecast_step
if not valid_multiple.is_integer():
raise RuntimeError("shortest_valid_forecast ({self.shortest_valid_forecast})-self.first_forecast ({self.first_forecast}) is not an integer multiple of forecast step ({self.forecast_step})")
def url_to_file_for_time(self, time, forecast_offset):
class ForecastOffset:
hours = 0
offset = ForecastOffset()
offset.hours = forecast_offset//timedelta(hours=1)
url = self.url_relative.format(f=offset)
url = self._expand(url, time, priority="")
local = self.local.format(f=offset)
local = self._expand(local, time, priority="")
return {url:local}
def get_cache_filesets(self, start = None, end = None):
# A copy of Downloader's implementation, but we don't use
# self._daterange; we call daterange.daterange directly so we can pass
# in a step of self.forecast_step instead of self.period. We do this
# because although our period (how often new forecasts are generated) is only every N hours, we change which
# files someone might want every forecast step hours.
#
# For example, for GFS (updated every 6 hours, forecasts are every 3
# hours): to process 5Z data, we'd use the 0Z GFS data's 3 and 6 hour
# forecasts, but for 7Z data, we'd use the 0Z GFS data's 6 and 9 hour
# forecasts.
if self.oldest_cache < timedelta(days=0):
raise ValueError(f"oldest_cache should be positive; it is {self.oldest_cache}")
if end is None: end = datetime.datetime.now()
if start is None: start = end - self.oldest_cache
if start > end:
raise ValueError(f"start ({start}) should be before end ({end})")
fileset_list_list = []
for time in csppfetch.daterange.daterange(end, start, -self.forecast_step, inclusive=True):
fileset_list = self.get_filesets_for_time(time)
fileset_list_list.append(fileset_list)
return fileset_list_list
def get_filesets_for_time(self, scan_time):
generated_time = self._nearest_preceeding_time(scan_time)
time_since_gen = scan_time - generated_time
if time_since_gen < timedelta(0):
raise RuntimeError(f"self._nearest_preceeding_time({scan_time}) returned {generated_time}, but that's AFTER the start; should be BEFORE or SAME.")
forecast_start = roundtozero(time_since_gen, self.forecast_step, self.first_forecast)
# There is a non-looping way to do this,
# but my brain is fried at the moment.
while forecast_start < self.shortest_valid_forecast:
forecast_start += self.period
generated_time -= self.period
filesets = []
while (forecast_start+self.forecast_step) <= self.longest_valid_forecast:
end = forecast_start + self.forecast_step
urls_to_files = {}
urls_to_files.update(self.url_to_file_for_time(generated_time, forecast_start))
urls_to_files.update(self.url_to_file_for_time(generated_time, end))
expected = self.is_expected(generated_time)
startstr = str(int(forecast_start.total_seconds()/(60*60)))
endstr = str(int( end.total_seconds()/(60*60)))
description = f"{self.name} for {generated_time} forecast {startstr} and {endstr} hours into the future"
fs = csppfetch.FileSet(urls_to_files, expected, description)
filesets.append(fs)
forecast_start += self.period
generated_time -= self.period
return filesets
......@@ -84,125 +84,24 @@ SST = csppfetch.Downloader(
# We can NOT use
# gfs.t06z.190527.pgrb2f15 + gfs.t06z.190527.pgrb2f18
# as we won't use a 18 hour and later forecasts.
class GFSDownloader(csppfetch.Downloader):
def __init__(self):
remote_filename = "gfs.t%Hz.%y%m%d.pgrb2f{priority}"
local_filename = "gfs.t%Hz.pgrbf{priority}"
localdir = "gfs_grib2_0.5deg/%Y/%m/%d/"
# Why 5 hours?
#
# Examination of geodb in 2019 covering 2019-03-01 through 2019-06-12
# and 2021 covering 2021-07-02 through 2021-09-01, suggests the data
# usually available within about 65 minutes, and is almost always
# available within 4 hours and 30 minutes. Round up to 5 arbitrarily.
#
# As we don't really have use for the data until 3 hours after
# nominal generation and don't _need_ it until 6 hours, this seems
# a reasonable number.
expected_newest = dt.timedelta(hours=5)
super().__init__(
name="Global Forecast System",
package_env_id = PACKAGE_ENV_ID,
url_base = CSPP_GEO_AITF_URL_BASE,
url_relative = CSPP_GEO_AITF_URL_DIR_PART+remote_filename,
local = localdir+local_filename,
period = dt.timedelta(hours=6),
epoch_start = dt.datetime(2010,1,1,0,0,0),
expected_newest = expected_newest,
env_overrides = ENV_OVERRIDES,
)
# Fields we're not using:
# oldest_usable - Not meaningful; the limit is how far into the future
# of a forecast we're willing to use, specified as
# "{priority}" in the "filename" above.
# Forecasts are available this often
self.forecast_step = dt.timedelta(hours=3)
# and the first forecast is (starting point for forecast_steps)
self.first_forecast = dt.timedelta(hours=0)
# Given the above, valid forecasts are are 0, 3, 6,... hours
# We can use forecasts from this far into the future
self.shortest_valid_forecast = dt.timedelta(hours=3)
# ...through this far into the future.
self.longest_valid_forecast = dt.timedelta(hours=12)
# Given that, valid forecasts are 3, 6, 9, and 12.
valid_multiple = (self.shortest_valid_forecast-self.first_forecast)/self.forecast_step
if not valid_multiple.is_integer():
raise RuntimeError("shortest_valid_forecast ({self.shortest_valid_forecast})-self.first_forecast ({self.first_forecast}) is not an integer multiple of forecast step ({self.forecast_step})")
def url_to_file_for_time(self, time, forecast_hours):
hours = int(forecast_hours / dt.timedelta(hours=1))
hours_str = f"{hours:02d}"
url = self._expand(self.url_relative, time, hours_str)
local = self._expand(self.local, time, hours_str)
return {url:local}
def get_cache_filesets(self, start = None, end = None):
# A copy of Downloader's implementation, but we don't use
# self._daterange; we call daterange.daterange directly so we can pass
# in a step of self.forecast_step instead of self.period. We do this
# because although our period is only every 6 hours, we change which
# files someone might want every _3_. That is to process 5Z data, we'd
# use the 0Z GFS data's 3 and 6 hour forecasts, but for 7Z data, we'd
# use the 0Z GFS data's 6 and 9 hour forecasts.
#
# Perhaps the ablity to override the period here should be in the
# parent class? Or maybe this case is too specialized to bother.
if self.oldest_cache < dt.timedelta(days=0):
raise ValueError(f"oldest_cache should be positive; it is {self.oldest_cache}")
if end is None: end = dt.datetime.now()
if start is None: start = end - self.oldest_cache
if start > end:
raise ValueError(f"start ({start}) should be before end ({end})")
fileset_list_list = []
for time in csppfetch.daterange.daterange(end, start, -self.forecast_step, inclusive=True):
fileset_list = self.get_filesets_for_time(time)
fileset_list_list.append(fileset_list)
return fileset_list_list
def get_filesets_for_time(self, scan_time):
generated_time = self._nearest_preceeding_time(scan_time)
time_since_gen = scan_time - generated_time
if time_since_gen < dt.timedelta(0):
raise RuntimeError(f"self._nearest_preceeding_time({scan_time}) returned {generated_time}, but that's AFTER the start; should be BEFORE or SAME.")
forecast_start = roundtozero(time_since_gen, self.forecast_step, self.first_forecast)
# There is a non-looping way to do this,
# but my brain is fried at the moment.
while forecast_start < self.shortest_valid_forecast:
forecast_start += self.period
generated_time -= self.period
filesets = []
while (forecast_start+self.forecast_step) <= self.longest_valid_forecast:
end = forecast_start + self.forecast_step
urls_to_files = {}
urls_to_files.update(self.url_to_file_for_time(generated_time, forecast_start))
urls_to_files.update(self.url_to_file_for_time(generated_time, end))
expected = self.is_expected(generated_time)
startstr = str(int(forecast_start.total_seconds()/(60*60)))
endstr = str(int( end.total_seconds()/(60*60)))
description = f"{self.name} for {generated_time} forecast {startstr} and {endstr} hours into the future"
fs = csppfetch.FileSet(urls_to_files, expected, description)
filesets.append(fs)
forecast_start += self.period
generated_time -= self.period
return filesets
GFS = GFSDownloader()
GFS_REMOTE_FILENAME = "gfs.t%Hz.%y%m%d.pgrb2f{f.hours:02d}"
GFS_LOCAL_FILENAME = "gfs.t%Hz.pgrbf{f.hours:02d}"
GFS_LOCALDIR = "gfs_grib2_0.5deg/%Y/%m/%d/"
GFS = csppfetch.BoundedForecastDownloader(
name="Global Forecast System",
package_env_id = PACKAGE_ENV_ID,
url_base = CSPP_GEO_AITF_URL_BASE,
url_relative = CSPP_GEO_AITF_URL_DIR_PART+GFS_REMOTE_FILENAME,
local = GFS_LOCALDIR+GFS_LOCAL_FILENAME,
period = dt.timedelta(hours=6),
epoch_start = dt.datetime(2010,1,1,0,0,0),
expected_newest = dt.timedelta(hours=5),
env_overrides = ENV_OVERRIDES,
forecast_step = dt.timedelta(hours=3),
first_forecast = dt.timedelta(hours=0),
shortest_valid_forecast = dt.timedelta(hours=3),
longest_valid_forecast = dt.timedelta(hours=12),
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment