Skip to content
Snippets Groups Projects
Commit 601cd472 authored by Alan De Smet's avatar Alan De Smet
Browse files

Progress info refactored into callbacks

instead of magical logging.progress level. #6
parent ff5dcd26
No related branches found
No related tags found
No related merge requests found
......@@ -54,17 +54,6 @@ if sys.version_info < MIN_PYTHON_TUPLE:
def logprogress(msg):
""" Pass to logging.progress if available, otherwise logging.info """
try:
logging.progress(msg)
return
except Exception:
pass
logging.info(msg)
class FileSet:
......@@ -166,10 +155,30 @@ def human_bytes(total_bytes):
class Progress:
def setup_done(self, stats):
""" Various initialization is done
You can rely on stats.max_attempts.
"""
pass
def start_attempt(self, stats):
""" stats.current_attempt """
pass
def start_fileset(self, stats):
""" stats.current_fileset_description """
pass
def finished_file(self, stats, local_path, is_cache_hit):
""" One or more files have finished """
pass
class ProgressDisabled(Progress):
# This exists for clarity; use when you don't want progress updates
pass
class DownloadStatistics:
def __init__(self):
def __init__(self, progress = None):
self.downloaded_size = 0
self.downloaded_files = set()
self.cached_size = 0
......@@ -177,6 +186,14 @@ class DownloadStatistics:
self.start = datetime.datetime.now()
self.end = None
self.max_attempts = 0
self.current_attempt = 0
self.current_fileset_description = ""
if progress is None:
progress = ProgressDisabled()
self.progress = progress
def finish(self):
if self.end is None:
self.end = datetime.datetime.now()
......@@ -228,6 +245,7 @@ class DownloadStatistics:
logging.debug(f"{dst} downloaded ({state}).")
self.downloaded_size += size
self.downloaded_files.add(dst)
self.progress.finished_file(self, dst, is_cache_hit=False)
def download_already_present(self, state, dst):
logging.debug("{0} already exists ({1}). Skipping.".format(dst, state))
......@@ -238,6 +256,15 @@ class DownloadStatistics:
if not self.already_have(dst):
self.cached_files.add(dst)
self.cached_size += os.path.getsize(dst)
self.progress.finished_file(self, dst, is_cache_hit=True)
def set_current_attempt(self, attempt_num):
self.current_attempt = attempt_num
self.progress.start_attempt(self)
def set_current_fileset_description(self, description):
self.current_fileset_description = description
self.progress.start_fileset(self)
def download_already_present(state, dst, download_stats, i_own_file):
......@@ -838,13 +865,17 @@ class Downloader:
logfunc(" "+problem)
last_problem = problem
download_stats.max_attempts = retries
for attempt in range(1, retries+1):
if retries > 1:
logprogress(f"Attempt {attempt}")
logging.info(f"Attempt {attempt}")
download_stats.set_current_attempt(attempt)
for fileset in filesets:
description = fileset.description
urls_to_files = fileset.urls_to_files
logprogress(f" Working on {description}")
logging.info(f" Working on {description}")
download_stats.set_current_fileset_description(description)
logging.info(f" files to get: {urls_to_files}")
for base_url in base_urls:
......@@ -887,7 +918,7 @@ class Downloader:
first = ""
raise DownloadsFailedException("Errors occurred during download, including "+first)
def download_for_time(self, time, dst, timeout=30, retries=3, retry_wait=20, do_download = True):
def download_for_time(self, time, dst, timeout=30, retries=3, retry_wait=20, do_download = True, progress = None):
""" Download files needed to process data a time, writing into dst
If a desired file is already present, the download of that file is
......@@ -908,6 +939,10 @@ class Downloader:
If False, if the desired files are not already present, it's a
failure
:param csppfetch.Progress progress: If not None, a
csppfetch.Progress-like object which will be called with
regular progress updates.
:return: csppfetch.DownloadStatistics
**timeout**, **retries**, and **retry_wait** are overriddeen by environment
......@@ -923,12 +958,12 @@ class Downloader:
retries = self.get_retries(retries)
else:
retries = 1
stats = DownloadStatistics()
stats = DownloadStatistics(progress=progress)
self.download_first_available(filesets, dst, timeout=timeout, retries=retries, retry_wait=retry_wait, do_download = do_download, download_stats = stats)
return stats
def update_cache(self, dst, start_time=None, end_time=None, timeout=30):
def update_cache(self, dst, start_time=None, end_time=None, timeout=30, progress = None):
""" Cache all files for processing times beween start_time and end_time
If a desired file is already present, the download of that file is
......@@ -944,10 +979,14 @@ class Downloader:
abandoning it as failed. Can be overriden by environment variable
package_env_id+"TIMEOUT"
:param csppfetch.Progress progress: If not None, a
csppfetch.Progress-like object which will be called with
regular progress updates.
:return: csppfetch.DownloadStatistics
"""
download_stats = DownloadStatistics()
download_stats = DownloadStatistics(progress=progress)
if end_time is not None and start_time is not None:
time = self._nearest_preceeding_time(end_time)
if time < start_time:
......
......@@ -25,6 +25,7 @@ import re
import aitf.ancil
import aitf.conlog
import aitf.progress
csppfetch = aitf.ancil.csppfetch
......@@ -84,6 +85,8 @@ def log_download_report(name, stats):
for x in stats.report():
logging.summary(" "+x)
def main():
aitf.conlog.setup_logging()
args = parse_args()
......@@ -98,17 +101,22 @@ def main():
if not do_download:
logging.info(f'Download disabled. Hopefully everything I need is available in "{cache_dir}".')
if args.want_progress:
progress = aitf.progress.Progress()
else:
progress = None
try:
logging.progress("Acquiring SST")
sststats = aitf.ancil.SST.download_for_time(args.scan_time, args.cache, do_download = do_download)
sststats = aitf.ancil.SST.download_for_time(args.scan_time, args.cache, do_download = do_download, progress=progress)
sst_files = sststats.all_files()
log_download_report("SST Download Summary", sststats)
# using SST instead of GFS to show that
# DownloadStatistics are interchangable.
logging.progress("Acquiring GFS")
gfsstats = aitf.ancil.GFS.download_for_time(args.scan_time, args.cache, do_download = do_download)
gfsstats = aitf.ancil.GFS.download_for_time(args.scan_time, args.cache, do_download = do_download, progress=progress)
gfs_files = gfsstats.all_files()
log_download_report("GFS Download Summary", gfsstats)
......
......@@ -24,6 +24,7 @@ import logging
import aitf.ancil
import aitf.conlog
import aitf.progress
def parse_args():
......@@ -183,9 +184,15 @@ def main():
aitf.conlog.setup_logging()
args = parse_args()
if args.want_progress:
progress = aitf.progress.Progress()
else:
progress = None
sststats = aitf.ancil.SST.update_cache(args.dir,
start_time=args.oldest,
end_time=args.newest,
progress=progress,
)
log_download_report("SST Download Summary", sststats)
......@@ -194,6 +201,7 @@ def main():
gfsstats = aitf.ancil.GFS.update_cache(args.dir,
start_time=args.oldest,
end_time=args.newest,
progress=progress,
)
log_download_report("GFS Download Summary", gfsstats)
......
......@@ -72,6 +72,10 @@ def process_verbosity(args):
args.log_level = logging_levels[args.verbosity][1]
if args.silent:
args.log_level = logging.ERROR
if args.verbosity >= 2:
args.want_progress = True
else:
args.want_progress = False
logging.getLogger().setLevel(args.log_level)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment