#!/usr/bin/env python # -*- coding: utf-8 -*- """ goesr.l1b ========= PURPOSE Toolbox for extracting imagery from GOES-16 PUG L1B NetCDF4 files originally goesr_pug.py as part of SIFT. REFERENCES GOES-R Product User's Guide Rev E REQUIRES numpy netCDF4 :author: R.K.Garcia <rayg@ssec.wisc.edu> :copyright: 2016 by University of Wisconsin Regents, see AUTHORS for more details :license: GPLv3, see LICENSE for more details """ import os, sys import logging, unittest, argparse from collections import namedtuple import numpy as np # import numba # from numba.decorators import jit import re from datetime import datetime, timedelta import netCDF4 as nc4 __author__ = 'rayg' __docformat__ = 'reStructuredText' LOG = logging.getLogger(__name__) # default variable names for PUG L1b files DEFAULT_RADIANCE_VAR_NAME = 'Rad' DEFAULT_CMI_VAR_NAME = 'CMI' DEFAULT_Y_CENTER_VAR_NAME = 'y_image' DEFAULT_X_CENTER_VAR_NAME = 'x_image' geolocation = namedtuple('geolocation', ['lat', 'lon']) # ref https://gitlab.ssec.wisc.edu/scottm/QL_package/blob/master/cspp_ql/geo_proj.py def proj4_params(longitude_of_projection_origin=0.0, perspective_point_height=0.0, semi_major_axis=0.0, semi_minor_axis=0.0, sweep_angle_axis='x', latitude_of_projection_origin=0.0, y_0=0, x_0=0, **etc): """ Generate PROJ.4 parameters for Fixed Grid projection :param longitude_of_projection_origin: longitude at center of projection, related to sub-satellite point :param perspective_point_height: effective projection height in m :param semi_major_axis: ellipsoid semi-major axis in m :param semi_minor_axis: ellipsoid semi-minor axis in m :param sweep_angle_axis: 'x' for GOES-R, 'y' for original CGMS :param y_0: :param x_0: :return: tuple of (key,value) pairs """ assert(latitude_of_projection_origin==0.0) # assert your assumptions: need to revisit this routine if this is not the case return (('proj', 'geos'), ('lon_0', longitude_of_projection_origin), ('h', perspective_point_height), ('a', semi_major_axis), ('b', semi_minor_axis), ('x_0', x_0), ('y_0', y_0), ('sweep', sweep_angle_axis), ('units', 'm'), # ('ellps', 'GRS80'), # redundant given +a and +b, PUG states GRS80 and not WGS84 ) def proj4_string(**kwargs): """ see proj4_params() """ p4p = proj4_params(**kwargs) return ' '.join( ('+%s=%s' % q) for q in p4p ) # return '+proj=geos +sweep=x +lon_0=%f +h=%f +x_0=%d +y_0=%d +a=%f +b=%f +units=m +ellps=GRS80' % ( # lon_center_of_projection, # perspective_point_height, # x_plus, # y_plus, # semi_major_axis, # semi_minor_axis # ) # @jit((numba.float32[:,:], numba.double, numba.double, numba.double, numba.double)) def _calc_bt(L: np.ndarray, fk1: float, fk2: float, bc1: float, bc2: float): return (fk2 / np.log((fk1 / L) + 1.0) - bc1) / bc2 def calc_bt(L: (np.ndarray, np.ma.masked_array), fk1: float, fk2: float, bc1: float, bc2: float, mask_invalid: bool=True, **etc): """ convert brightness temperature bands from radiance ref PUG Vol4 7.1.3.1 Radiances Product : Description Note: marginally negative radiances can occur and will result in NaN values! We handle this by setting anything with radiance <=0.0 to return 0K :param L: raw radiance image data, preferably as masked array :param fk1: calibration constant :param fk2: calibration constant :param bc1: calibration constant :param bc2: calibration constant :param mask_invalid: bool, whether to include radiances <=0.0 in array mask - iff Lis masked array :return: BT converted radiance data, K; if input is masked_array, L<=0.0 will also be masked """ T = _calc_bt(L, fk1, fk2, bc1, bc2) if isinstance(L, np.ma.masked_array): if mask_invalid: T = np.ma.masked_array(T, (L.data <= 0.0) | L.mask) else: T = np.ma.masked_array(T, L.mask) else: # Tmin = -bc1 / bc2 # when L<=0 T[L <= 0.0] = 0.0 # Tmin, but for now truncate to absolute 0 return T # @jit((numba.float32[:,:], numba.double)) # def _calc_refl(L, kappa0): # return L * kappa0 def calc_refl(L: np.ndarray, kappa0: float, **etc): """ convert reflectance bands from radiance ref PUG Vol4 7.1.3.1 Radiances Product : Description :param L: raw radiance image data as masked array :param kappa0: conversion factor radiance to reflectance :return: """ return L * kappa0 # if isinstance(L, np.ma.masked_array): # refl = _calc_refl(L.data, kappa0) # return np.ma.masked_array(refl, L.mask) # return _calc_refl(L, kappa0) def is_band_refl_or_bt(band: int): """ :param band: band number, 1..16 :return: "refl", "bt" or None """ return 'refl' if (1 <= band <= 6) else 'bt' if (7 <= band <= 16) else None def nc_cal_values(nc: nc4.Dataset): """ extract a dictionary of calibration parameters to use :param nc: PUG netCDF4 instance :return: dict """ # grab = lambda x: x.filled(np.NAN)[...] if isinstance(x, np.ma.masked_array) else x[...] grab = lambda vn: np.ma.fix_invalid(nc[vn][:], fill_value=np.NAN)[...] return { 'fk1': grab('planck_fk1'), # nc['planck_fk1'][:].filled(np.NAN)[...], 'fk2': grab('planck_fk2'), # nc['planck_fk2'][:].filled(np.NAN)[...], 'bc1': grab('planck_bc1'), # nc['planck_bc1'][:].filled(np.NAN)[...], 'bc2': grab('planck_bc2'), # nc['planck_bc2'][:].filled(np.NAN)[...], 'kappa0': grab('kappa0'), # nc['kappa0'][:].filled(np.NAN)[...] } def nc_nav_values(nc, primary_var_name: str=None, proj_var_name: str=None, y_image_var_name: str=DEFAULT_Y_CENTER_VAR_NAME, x_image_var_name: str=DEFAULT_X_CENTER_VAR_NAME): """ extract a dictionary of navigation parameters to use :param primary_var_name: radiance variable name to follow .grid_mapping of, optional with default :param proj_var_name: alternately, projection variable to fetch parameters from :param y_image_var_name: variable holding center of image in FGF y/x radiance, if available :param x_image_var_name: variable holding center of image in FGF y/x radiance, if available :param nc: PUG netCDF4 instance :return: dict FUTURE: distinguish navigation from projection """ if proj_var_name is None: v = nc.variables.get(primary_var_name or DEFAULT_RADIANCE_VAR_NAME, None) if v is not None: a = getattr(v, 'grid_mapping', None) if a is not None: proj_var_name = a if proj_var_name is None: raise ValueError('unknown projection variable to read') proj = nc[proj_var_name] nav = dict((name, getattr(proj, name, None)) for name in ( 'grid_mapping_name', 'perspective_point_height', 'semi_major_axis', 'semi_minor_axis', 'inverse_flattening', 'latitude_of_projection_origin', 'longitude_of_projection_origin', 'sweep_angle_axis')) # x_0, y_0 = 0, 0 # false easting / false northing # if y_image_var_name in nc.variables.keys(): # y_0 = float(nc[y_image_var_name][:]) * nav['perspective_point_height'] # convert to nadir-meter for proj4 # if x_image_var_name in nc.variables.keys(): # x_0 = float(nc[x_image_var_name][:]) * nav['perspective_point_height'] # nav['y_0'], nav['x_0'] = y_0, x_0 return nav def nc_y_x_names(nc, primary_var_name: str=None): """ read .coordinates from variable to get names of y and x variables note that PUG files may have multiple named coordinate variables for band/time preceding :param nc: netcdf file :param primary_var_name: variable of interest, typically DEFAULT_RADIANCE_VAR_NAME or DEFAULT_CMI_VAR_NAME :return: """ v = nc.variables.get(primary_var_name or DEFAULT_RADIANCE_VAR_NAME, None) c = v.coordinates.split() return c[-2:] RE_PUG_ISO = re.compile(r'(\d{4})-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)\.(\d+)Z') def timecode_to_datetime(iso): m = RE_PUG_ISO.match(iso.strip()) nums = list(m.groups()) nums[6] += '0' * max(0,6-len(nums[6])) # .9 -> 900000µs yyyy, mm, dd, h, m, s, t = map(int, nums) return datetime(yyyy, mm, dd, h, m, s, t) DEFAULT_SNAP_SECONDS = 60 SNAP_SECONDS = {'Full Disk': 5*60, 'Mesoscale': 15, } def snap_scene_onto_schedule(start: datetime, end: datetime, scene_id: str=None, timeline_id: str=None): """ assign a "timeline" time to an image by looking at its coverage currently this is done by taking the center time and dropping it to the previous :param start: datetime object, time_coverage_start :param end: datetime :param scene_id: string, typically 'Full Disk' :param timeline_id: instrument mode, typically "Mode 3" or "Mode 4" :return: datetime of 'timeline' time """ c = start # + (end - start)/2 b = datetime(c.year, c.month, c.day, c.hour) d = (c - b).total_seconds() # seconds within hour of image start time sns = SNAP_SECONDS.get(scene_id.strip(), DEFAULT_SNAP_SECONDS) # resolution to snap to d = int(d) - (int(d) % sns) m = b + timedelta(seconds=d) return m def resolution_from_attr(spatial_resolution: str): return {'4km at nadir': 4000, '4.0km at nadir': 4000, '2km at nadir': 2000, '2.0km at nadir': 2000, '1km at nadir': 1000, '1.0km at nadir': 1000, '500m at nadir': 500, '0.5km at nadir': 500,}[spatial_resolution] def mode_from_timeline_id(timeline_id): m = re.match(r'ABI Mode (\d+)', timeline_id) return int(m.group(1)) def scene_from_dataset_name(dataset_name): """ M1, M2, F, C, or None """ m = re.search(r'-(?:CMIP|Rad)(\w+)-M', dataset_name or "") return None if m is None else m.group(1) class PugFile(object): """ PUG helper routines for a single band, given a single-band NetCDF4 file or a multi-band NetCDF4 file and an offset in the band dimension """ path = None # file or glob used to access this data nc = None # if we opened the file for you, it's here _band_offset = None band = None # band number central_wavelength = None # um platform_id = None # platform_ID as string sched_time = None # datetime object, nominal/normalized/snapped time of the image onto a 1min/5min/10min/15min timeline based on mode/scene time_span = None # (start,end) datetime pair timeline_id = None # string, instrument mode bit_depth = None # bit depth shape_m = None # nadir-meter dy,dx approximation : FUTURE: deprecate this toward cell_size, too confusing vs .shape being whole array scene_id = None # string, type of scene e.g. "Full Disk" resolution = None # approximate nadir meters: 500, 1000, 2000, 4000 display_time = None # str form of timeline YYYY-mm-dd HH:MM display_name = None # str form of platform + scene + band instrument_type = None # long identifier for instrument instrument_name = None # ABI, AHI, AMI, etc cal = None # dictionary of cal parameters, only present on l1b files nav = None # dictionary of nav parameters shape = None # (lines, elements) primary_var_name = None # Rad or CMI variable to focus on @property def bt_or_refl(self): return is_band_refl_or_bt(self.band) @property def kind(self): return self.bt_or_refl def __init__(self, nc: (nc4.Dataset, str), primary_var_name: str=None, band_offset: int=0): """ Initialize cal and nav helper object from a PUG NetCDF4 L1B file. Can be used as a cal/nav/metadata fetch, or as a file wrapper. :param nc: netCDF4 Dataset if only metadata is desired; netcdf pathname if file should be opened and retained :param primary_var_name: radiance variable to convert to BT/Refl :param band_offset: typically 0, eventually nonzero for multi-band files """ # FUTURE: handle band dimension more effectively, including surveying metadata for multi-band files super(PugFile, self).__init__() if not isinstance(nc, nc4.Dataset): # then we can open and retain the file for our use self.path = nc nc = nc4.Dataset(nc) self.nc = nc else: self.path = nc.filepath() self.primary_var_name = primary_var_name primary_var = nc[primary_var_name] self._band_offset = band_offset self.band = int(nc['band_id'][band_offset]) # FUTURE: this has a band dimension self.nav = nc_nav_values(nc, primary_var_name) self.shape = primary_var.shape[-2:] self.platform_id = nc.platform_ID self.instrument_type = nc.instrument_type self.instrument_name = "ABI" if "Himawari" not in self.instrument_type else "AHI" self.time_span = st, en = timecode_to_datetime(nc.time_coverage_start), timecode_to_datetime(nc.time_coverage_end) try: self.timeline_id = nc.timeline_id self.mode = mode_from_timeline_id(self.timeline_id) scene_from_dataset = scene_from_dataset_name(nc.dataset_name) self.meso_scene = int(scene_from_dataset[1:]) if scene_from_dataset.startswith('M') else None except AttributeError as probably_himawari: LOG.warning("no timeline_id; assuming AXI Mode 1 placeholder") self.timeline_id = "AXI Mode 1 presumed" self.mode = 1 self.meso_scene = None raise self.scene_id = nc.scene_id self.resolution = resolution_from_attr(nc.spatial_resolution) self.sched_time = snap_scene_onto_schedule(st, en, self.scene_id, self.timeline_id) self.display_time = st.strftime('%Y-%m-%d %H:%M:%S') self.display_name = '{} {} B{:02d} {} {}'.format( self.platform_id, self.instrument_name, self.band, self.scene_id, self.display_time) self.y_var_name, self.x_var_name = nc_y_x_names(nc, self.primary_var_name) self.bit_depth = nc[self.primary_var_name].sensor_band_bit_depth self.central_wavelength = nc['band_wavelength'][...] # um try: dy, dx = float(nc[self.y_var_name].scale_factor), float(nc[self.x_var_name].scale_factor) hgt = float(self.nav['perspective_point_height']) self.shape_m = hgt * dy, hgt * dx except AttributeError as not_scaled: LOG.warning('no scale_factor on y and x variables') pass @property def proj4(self): return proj4_string(**self.nav) @property def proj4_params(self): return proj4_params(**self.nav) @property def proj4_string(self): return proj4_string(**self.nav) @property def data(self): raise NotImplementedError('not implemented in PugFile') @property def bt(self): raise NotImplementedError('not implemented in PugFile') @property def refl(self): raise NotImplementedError('not implemented in PugFile') @property def rad(self): raise NotImplementedError('not implemented in PugFile') @property def y(self): if None is self.nc: return None return self.nc[self.y_var_name][:] @property def x(self): if None is self.nc: return None return self.nc[self.x_var_name][:] @property def yx_center(self): if None is self.nc: return None, None y, x = self.y, self.x return np.nanmean(y), np.nanmean(x) # return self.nc['y_image'][...], self.nc['x_image'][...] # GOTCHA: returns projection center, not scene center @property def perspective_point_height(self): return float(self.nav['perspective_point_height']) @property def proj_y(self): """ projection coordinate as cartesian nadir-meters as used by PROJ.4 ref http://proj4.org/projections/geos.html scanning_angle (radians) = projection_coordinate / h """ return np.require(self.y, dtype=np.float64) * self.perspective_point_height @property def proj_x(self): """ projection coordinate as cartesian nadir-meters as used by PROJ.4 ref http://proj4.org/projections/geos.html scanning_angle (radians) = projection_coordinate / h """ return np.require(self.x, dtype=np.float64) * self.perspective_point_height @property def cell_size(self): """ :return: (y, x) nadir-meters nominal size of a cell """ return self.shape_m # if None is self.nc: # return None, None # ym = float(self.nc[self.y_var_name].scale_factor) # xm = float(self.nc[self.x_var_name].scale_factor) # h = self.perspective_point_height # return ym * h, xm * h @property def origin(self): """ :return: (y, x) nadir-meters nominal size of a cell """ if None is self.nc: return None, None h = self.perspective_point_height yo, xo = h * self.y[0], h * self.x[0] return yo, xo @property def proj(self): from pyproj import Proj return Proj(projparams=dict(self.proj4_params)) @property def latlon_center(self): proj = self.proj h = float(dict(self.proj4_params)['h']) y, x = self.yx_center # gotcha: gives projection center for meso scenes lon, lat = proj(x * h, y * h, inverse=True) return geolocation(lon=lon, lat=lat) @property def geo(self): proj = self.proj y_m = self.proj_y # nadir-meter x_m = self.proj_x w,h = len(x_m), len(y_m) y = np.broadcast_to(y_m, (w, h)).transpose() x = np.broadcast_to(x_m, (h, w)) lon, lat = proj(x,y, inverse=True) return geolocation(lon=lon, lat=lat) def lat_lon_to_l_c(self, lat, lon, round=True): """ convert latitude-longitude all the way back to line-column by reversing projection and y/x scaling :param lat: latitude value or array in degrees :param lon: longitude value or array in degrees :return: line, column array indexed 0..height-1, 0..width-1 """ proj = self.proj # convert to nadir-meters using proj4 xm, ym = proj(lon, lat, inverse=False) # divide out height to get back to projection view angles h = self.perspective_point_height y, x = ym / h, xm / h # extract scale factor and add offset to back out to line-from-0, column-from-0 yvar, xvar = self.nc[self.y_var_name], self.nc[self.x_var_name] ysf, yao = yvar.scale_factor, yvar.add_offset xsf, xao = xvar.scale_factor, xvar.add_offset # reverse scale factor and add offset to get line and column line, column = (y - yao) / ysf, (x - xao) / xsf # round to nearest integer if round: return np.round(line).astype(np.int32), np.round(column).astype(np.int32) else: # return floating point values return line, column def walk(self, as_cmi=False): from goesr.rockfall import nc_walk # LOG.warning("as_cmi band_id dimension not yet implemented") for pvda in nc_walk(self.nc): yield pvda @staticmethod def attach(path): basename = os.path.split(path)[-1] is_l1b = 'L1b-Rad' in basename return PugL1bTools(path) if is_l1b else PugCmiTools(path) class PugL1bTools(PugFile): """ PUG helper routines for a single band, given a single-band NetCDF4 file or a multi-band NetCDF4 file and an offset in the band dimension """ def __init__(self, nc: (nc4.Dataset, str), primary_var_name: str=DEFAULT_RADIANCE_VAR_NAME, band_offset: int=0): """ Initialize cal and nav helper object from a PUG NetCDF4 L1B file. Can be used as a cal/nav/metadata fetch, or as a file wrapper. :param nc: netCDF4 Dataset if only metadata is desired; netcdf pathname if file should be opened and retained :param primary_var_name: radiance variable to convert to BT/Refl :param band_offset: typically 0, eventually nonzero for multi-band files """ owns = False if not isinstance(nc, nc4.Dataset): # then we can open and retain the file for our use nc = nc4.Dataset(nc) owns = True super(PugL1bTools, self).__init__(nc, primary_var_name, band_offset) if not owns: self.nc = None else: self.nc = nc self.cal = nc_cal_values(nc) def convert_radiances(self, rad): """ calculate BT or Refl based on band :param rad: radiance array from file :return: ('refl' or 'bt', measurement array, 'K' or '') """ kind = self.bt_or_refl if kind == 'refl': return kind, calc_refl(rad, **self.cal), '' elif kind == 'bt': return kind, calc_bt(rad, **self.cal), 'K' def convert_from_nc(self, nc: nc4.Dataset=None): nc = nc or self.nc if not nc: raise ValueError('must provide a valid netCDF file') rad = nc[self.primary_var_name][:] return self.convert_radiances(rad) @property def data(self): if None is self.nc: return None _, measurement_values, _ = self.convert_from_nc(self.nc) return measurement_values @property def bt(self): if None is self.nc: return None if 'bt' == self.bt_or_refl: return self.convert_from_nc(self.nc)[1] LOG.warning('cannot request bt from non-emissive band') return None @property def refl(self): if None is self.nc: return None if 'refl' == self.bt_or_refl: return self.convert_from_nc(self.nc)[1] LOG.warning('cannot request refl from non-reflectance band') return None @property def rad(self): if None is self.nc: return None return self.nc[self.primary_var_name][:] class PugCmiTools(PugFile): """ PUG helper routines for a single band, given a single-band NetCDF4 file or a multi-band NetCDF4 file and an offset in the band dimension """ def __init__(self, nc: (nc4.Dataset, str), primary_var_name: str=DEFAULT_CMI_VAR_NAME, band_offset: int=0): """ Initialize cal and nav helper object from a PUG NetCDF4 L1B file. Can be used as a cal/nav/metadata fetch, or as a file wrapper. :param nc: netCDF4 Dataset if only metadata is desired; netcdf pathname if file should be opened and retained :param primary_var_name: radiance variable to convert to BT/Refl :param band_offset: typically 0, eventually nonzero for multi-band files """ owns = False if not isinstance(nc, nc4.Dataset): # then we can open and retain the file for our use nc = nc4.Dataset(nc) owns = True super(PugCmiTools, self).__init__(nc, primary_var_name, band_offset) if not owns: self.nc = None else: self.nc = nc self.cal = {} # nc_cal_values(nc) def _data_for_band(self, band_offset=None): band_offset = band_offset or self._band_offset ncv = self.nc[self.primary_var_name] if len(ncv.shape)==3: return ncv[band_offset,:,:].squeeze() else: if band_offset != 0: raise ValueError("single-band file cannot have band_offset != 0") return ncv[:,:] @property def data(self): if None is self.nc: return None return self._data_for_band() @property def bt(self): if None is self.nc: return None if 'bt' == self.bt_or_refl: return self._data_for_band() LOG.warning('cannot request bt from non-emissive band') return None @property def refl(self): if None is self.nc: return None if 'refl' == self.bt_or_refl: return self._data_for_band() LOG.warning('cannot request refl from non-reflectance band') return None @property def rad(self): return None def goesr_to_fbf(filename): pug = PugFile.attach(filename) h,w = pug.shape geo = pug.geo latitude = np.memmap('latitude.real4.{}.{}'.format(w,h), dtype=np.float32, shape=(h,w), mode='write') longitude = np.memmap('longitude.real4.{}.{}'.format(w,h), dtype=np.float32, shape=(h,w), mode='write') latitude[:] = geo.lat longitude[:] = geo.lon latitude.flush() longitude.flush() if pug.bt_or_refl=='bt': data = pug.bt vn = 'brightness_temp' elif pug.bt_or_refl=='refl': data = pug.refl vn = 'reflectance' else: data = None output = np.memmap('{}.real4.{}.{}'.format(vn, w, h), dtype=np.float32, shape=(h,w), mode='write') output[:] = data output.flush() prad = pug.rad if prad is not None: rad = np.memmap('radiance.real4.{}.{}'.format(w, h), dtype=np.float32, shape=(h,w), mode='write') rad[:] = np.ma.fix_invalid(pug.rad, fill_value=np.NAN) rad.flush() def _debug(type, value, tb): "enable with sys.excepthook = debug" if not sys.stdin.isatty(): sys.__excepthook__(type, value, tb) else: import traceback, pdb traceback.print_exception(type, value, tb) # …then start the debugger in post-mortem mode. pdb.post_mortem(tb) # more “modern” def main(): parser = argparse.ArgumentParser( description="PURPOSE", epilog="", fromfile_prefix_chars='@') parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0, help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') parser.add_argument('-d', '--debug', dest='debug', action='store_true', help="enable interactive PDB debugger on exception") # http://docs.python.org/2.7/library/argparse.html#nargs # parser.add_argument('--stuff', nargs='5', dest='my_stuff', # help="one or more random things") parser.add_argument('inputs', nargs='*', help="input file to process") args = parser.parse_args() levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] logging.basicConfig(level=levels[min(3, args.verbosity)]) if args.debug: sys.excepthook = _debug if not args.inputs: unittest.main() return 0 for pn in args.inputs: goesr_to_fbf(pn) return 0 if __name__ == '__main__': sys.exit(main())