Skip to content
Snippets Groups Projects
Commit 81b0d1f5 authored by Alan De Smet's avatar Alan De Smet
Browse files

Add csppfetch::DownloaderWithFallbacks

Chains Downloaders, allowing different data sets to fulfill a requirement.
parent fc7d5cfb
Branches
No related tags found
No related merge requests found
......@@ -1425,4 +1425,100 @@ class BoundedForecastDownloader (Downloader):
class DownloaderWithFallbacks (Downloader):
def __init__(self, name, downloaders):
"""
:param str name: Brief human readable description, for logging
:param downloaders: Downloaders to try, in priority order (most preferred first)
:type downloaders: list of Downloaders
"""
assert len(downloaders) > 1, f"At least two Downloaders should be passed into DownloaderWithFallbacks"
url_base = downloaders[0].url_base
package_env_id = downloaders[0].package_env_id
for downloader in downloaders[1:]:
assert url_base == downloader.url_base, f"All Downloaders must share a common url_base, but both {url_base} and {downloader.url_base} are present"
assert package_env_id == downloader.package_env_id, f"All Downloaders must share a common package_env_id, but both {package_env_id} and {downloader.package_env_id} are present"
super().__init__(
name = name,
# While we are a type of Downloader, we don't use any of these;
# instead we hand responsibility off to our downloaders
package_env_id = package_env_id,
url_base = url_base,
url_relative = None,
local = None,
period = None)
self.downloaders = downloaders
def _find_latest_start_time(self, orig_start):
start = self.downloaders[0]._nearest_preceeding_time(orig_start)
for downloader in self.downloaders[1:]:
start = max(start, downloader._nearest_preceeding_time(orig_start))
return start
def _collect_unique_times(self, start, end):
assert start is not None
assert end is not None
assert end > start
times_list = []
for downloader in self.downloaders:
times_list += list(downloader._daterange(end, start, inclusive=True))
times_list.sort(reverse=True)
out = []
for time in times_list:
if len(out) == 0 or time != out[-1]:
out.append(time)
return out
def get_cache_filesets(self, start = None, end = None):
""" Return FileSets to create a local cache between start and end, inclusive
Like Downloader.get_cache_filesets, but returns FileSets merged from
the individual self.downloaders.
"""
oldest_cache = self.downloaders[0].oldest_cache
for downloader in self.downloaders[1:]:
if downloader.oldest_cache < timedelta(days=0):
raise ValueError(f"oldest_cache should be positive; it is {downloader.oldest_cache} in {downloader.name}")
assert oldest_cache == downloader.oldest_cache, "All child downloaders to DownloaderWithFallbacks should have a common oldest_cache, but that is not true."
if end is None: end = datetime.datetime.now()
if start is None: start = end - oldest_cache
if start > end:
raise ValueError(f"start ({start}) should be before end ({end})")
# Ensure we get data for the last one
# (Which is start, as we work backword)
start = self._find_latest_start_time(start)
times_list = self._collect_unique_times(start, end)
fileset_list_list = []
for time in times_list:
fileset_list = self.get_filesets_for_time(time)
fileset_list_list.append(fileset_list)
#print("fileset_list_list:", fileset_list)
return fileset_list_list
def is_expected(self, time):
for downloader in self.downloaders:
if downloader.is_expected(time):
return True
return False
def get_filesets_for_time(self, time):
""" Return list of FileSets suitable for use at time
Like Downloader.get_filesets_for_time, but returns FileSets merged from
the individual self.downloaders
"""
filesets = []
for downloader in self.downloaders:
filesets += downloader.get_filesets_for_time(time)
return filesets
......@@ -532,6 +532,14 @@ class MockFilesMixin(unittest.TestCase):
if actual != expected:
self.fail(f"contents of {path} are {actual} instead of expected {expected}")
def assertNoMockFile(self, path):
try:
open(path)
except FileNotFoundError:
return
self.fail(f"File {path} should not have been created, but was")
class DownloaderTests(DTestCase, MockFilesMixin):
# The test Downloader is initialized
......@@ -820,6 +828,149 @@ class DownloaderTests(DTestCase, MockFilesMixin):
self.assertMockFile(outdir+"/avhrr-only-v2.20170105.nc")
class DownloaderWithFallbacksTest(DTestCase, MockFilesMixin):
def make_test_downloader(self, url_base):
env_id = "EXAMPLE_"
first = csppfetch.Downloader(
name = "first attempt downloader",
package_env_id = env_id,
url_base = url_base,
url_relative = "%Y_%m_%d-first.nc",
local = "%Y_%m_%d-first.nc",
period = timedelta(days=1)
)
second = csppfetch.Downloader(
name = "second attempt downloader",
package_env_id = env_id,
url_base = url_base,
url_relative = "%Y_%m_%d-second.nc",
local = "%Y_%m_%d-second.nc",
period = timedelta(days=1)
)
chained = csppfetch.DownloaderWithFallbacks(
name = "merged downloader",
downloaders = [first, second])
return chained
def test_download_for_time_first(self):
# Tests download_for_time where the first downloader is able to supply the files
with TemporaryDirectory() as cachedir:
self.mock_file(f"{cachedir}/2017_01_01-first.nc")
self.mock_file(f"{cachedir}/2017_01_01-second.nc")
d = self.make_test_downloader(f"file://{cachedir}/")
with TemporaryDirectory() as outdir,\
ShadowEnvironment(EXAMPLE_RETRIES="1", EXAMPLE_RETRY_WAIT="0"):
stats = d.download_for_time(datetime.datetime(2017,1,2,0,0,0), outdir)
self.assertMockFile(f"{outdir}/2017_01_01-first.nc")
self.assertNoMockFile(f"{outdir}/2017_01_01-second.nc")
def test_download_for_time_first_old(self):
# Tests download_for_time where the first downloader is able to supply the files
with TemporaryDirectory() as cachedir:
self.mock_file(f"{cachedir}/2016_12_31-first.nc")
self.mock_file(f"{cachedir}/2017_01_01-second.nc")
d = self.make_test_downloader(f"file://{cachedir}/")
with TemporaryDirectory() as outdir,\
ShadowEnvironment(EXAMPLE_RETRIES="1", EXAMPLE_RETRY_WAIT="0"):
stats = d.download_for_time(datetime.datetime(2017,1,2,0,0,0), outdir)
self.assertMockFile(f"{outdir}/2016_12_31-first.nc")
self.assertNoMockFile(f"{outdir}/2017_01_01-second.nc")
with TemporaryDirectory() as cachedir:
d = self.make_test_downloader(f"file://{cachedir}/")
self.mock_file(f"{cachedir}/2017_01_10-first.nc")
self.mock_file(f"{cachedir}/2017_01_10-second.nc")
with TemporaryDirectory() as outdir,\
ShadowEnvironment(EXAMPLE_RETRIES="1", EXAMPLE_RETRY_WAIT="0"):
stats = d.download_for_time(datetime.datetime(2017,1,11,0,0,0), outdir)
self.assertMockFile(f"{outdir}/2017_01_10-first.nc")
self.assertNoMockFile(f"{outdir}/2017_01_10-second.nc")
def test_download_for_time_second(self):
with TemporaryDirectory() as cachedir:
self.mock_file(f"{cachedir}/2017_01_01-second.nc")
d = self.make_test_downloader(f"file://{cachedir}/")
with TemporaryDirectory() as outdir,\
ShadowEnvironment(EXAMPLE_RETRIES="1", EXAMPLE_RETRY_WAIT="0"):
stats = d.download_for_time(datetime.datetime(2017,1,2,0,0,0), outdir)
self.assertMockFile(f"{outdir}/2017_01_01-second.nc")
self.assertNoMockFile(f"{outdir}/2017_01_01-first.nc")
def test_download_for_time_404(self):
with TemporaryDirectory() as cachedir:
d = self.make_test_downloader(f"file://{cachedir}/")
with TemporaryDirectory() as outdir,\
ShadowEnvironment(EXAMPLE_RETRIES="1", EXAMPLE_RETRY_WAIT="0"):
with self.assertRaises(csppfetch.DownloadsFailedException) as raised:
with self.assertLogs(level='WARNING') as logs:
stats = d.download_for_time(datetime.datetime(2017,1,2,0,0,0), outdir)
self.assertNoMockFile(f"{outdir}/2017_01_01-second.nc")
self.assertNoMockFile(f"{outdir}/2017_01_01-first.nc")
def test_update_cache(self):
with TemporaryDirectory() as cachedir:
d = self.make_test_downloader(f"file://{cachedir}/")
self.mock_file(f"{cachedir}/2017_01_10-first.nc")
self.mock_file(f"{cachedir}/2017_01_10-second.nc")
self.mock_file(f"{cachedir}/2017_01_09-second.nc")
self.mock_file(f"{cachedir}/2017_01_08-first.nc")
self.mock_file(f"{cachedir}/2017_01_01-first.nc")
self.mock_file(f"{cachedir}/2017_01_01-second.nc")
with TemporaryDirectory() as outdir,\
ShadowEnvironment(EXAMPLE_RETRIES="1", EXAMPLE_RETRY_WAIT="0"):
stats = d.update_cache(outdir, datetime.datetime(2017,1,8,0,0,0), datetime.datetime(2017,1,11,0,0,0))
from glob import glob
self.assertMockFile(f"{outdir}/2017_01_10-first.nc")
self.assertNoMockFile(f"{outdir}/2017_01_10-second.nc")
self.assertNoMockFile(f"{outdir}/2017_01_09-second.nc")
self.assertMockFile(f"{outdir}/2017_01_08-first.nc")
self.assertNoMockFile(f"{outdir}/2017_01_01-first.nc")
self.assertNoMockFile(f"{outdir}/2017_01_01-second.nc")
with TemporaryDirectory() as cachedir:
d = self.make_test_downloader(f"file://{cachedir}/")
self.mock_file(f"{cachedir}/2017_01_10-first.nc")
self.mock_file(f"{cachedir}/2017_01_10-second.nc")
self.mock_file(f"{cachedir}/2017_01_09-second.nc")
with TemporaryDirectory() as outdir,\
ShadowEnvironment(EXAMPLE_RETRIES="1", EXAMPLE_RETRY_WAIT="0"):
stats = d.update_cache(outdir, datetime.datetime(2017,1,8,0,0,0), datetime.datetime(2017,1,11,0,0,0))
from glob import glob
self.assertMockFile(f"{outdir}/2017_01_10-first.nc")
self.assertNoMockFile(f"{outdir}/2017_01_10-second.nc")
self.assertMockFile(f"{outdir}/2017_01_09-second.nc")
#with TemporaryDirectory() as outdir,\
class DeleteOldFilesTests(DTestCase):
def touch(self, filename, timestamp):
with open(filename, "w") as f:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment