#!/usr/bin/env python # -*- coding: utf-8 -*- import numpy as np import os, sys, re from collections import namedtuple import logging try: from .HimawariInterface import HIMAWARI_INTERFACE except (ImportError, ValueError, SystemError): # non-module case ("classic" make instead of python setup.py install) from HimawariInterface import HIMAWARI_INTERFACE LOG = logging.getLogger(__name__) from cffi import FFI ffi = FFI() def libHimawariPath(name='libHimawari.so'): "FUTURE: use packaging tools to find a better location for this, or use LD_LIBRARY_PATH search" base,_ = os.path.split(__file__) opif = os.path.isfile opj = lambda b,d: os.path.abspath(os.path.join(b,d,name)) path = opj('.', '.') if opif(path): return path path = opj(base, '.') if opif(path): return path path = opj(base, '../include') if opif(path): return path path = opj(base, '../src') if opif(path): return path path = opj(base, '../lib') if opif(path): return path raise EnvironmentError('unable to find %s' % name) def libHimawariIncludes(): for fn in ('Types', 'Errors', 'Constants', 'Functions'): yield libHimawariPath('Himawari%s.h' % fn) # define C interfaces using header files # for inc in libHimawariIncludes(): # ffi.cdef(open(inc, 'r').read()) ffi.cdef(HIMAWARI_INTERFACE) try: import _himawari # if we built an embedded lib using setup.py _hsd = ffi.dlopen(_himawari.__file__) # we can grab it this way except ImportError: try: _hsd = ffi.dlopen('libHimawari.so') except OSError as notfound: _hsd = None if _hsd is None: _hsd = ffi.dlopen(libHimawariPath()) NAVOPT_CORRECT_DATA, NAVOPT_CORRECT_FGF = 1,2 DEFAULT_NAVOPT = NAVOPT_CORRECT_DATA fgf_yxmb = namedtuple('fgf', ['y', 'x', 'my', 'mx', 'by', 'bx']) class HimawariSceneError(Exception): def __init__(self, error_code, *args, **kwargs): super(HimawariSceneError, self).__init__(*args, **kwargs) self.error_code = error_code def __repr__(self): return "" % self.error_code class HimawariScene(object): _handle = None _nav_options = DEFAULT_NAVOPT _metadata = None @property def stem(self): return os.path.split(self._path)[-1] @property def path(self): return self._path @property def satellite_name(self): meta = self.metadata name = ffi.string(meta.satellite_name).decode('UTF-8').strip(' \0') return name @property def processing_center_name(self): meta = self.metadata return ffi.string(meta.processing_center_name).decode('UTF-8').strip(' \0') @property def observation_area(self): meta = self.metadata return ffi.string(meta.observation_area).decode('UTF-8').strip(' \0') def __init__(self, path, sentinel_invalid=None, sentinel_outside_scan=None, downsample_to_band=0, nav_options=DEFAULT_NAVOPT): sentinels = ffi.new('struct HimawariSentinels *') sentinels.count_invalid = 0 # use value specified in file sentinels.count_outside_scan = 0 # use value specified in file sentinels.derived_invalid = sentinel_invalid if (sentinel_invalid is not None) else np.NAN sentinels.derived_outside_scan = sentinel_outside_scan if (sentinel_outside_scan is not None) else np.NAN self._handle = _hsd.openHimawariScene(path.encode('UTF-8'), ffi.NULL, sentinels) self._path = path self._match_band_sampling = downsample_to_band self._nav_options = nav_options if not self._handle: self._handle = None raise IOError('unable to open %s' % path) def close(self): """ close the scene, rendering it invalid """ if self._handle is not None: handle = self._handle self._handle = None _hsd.closeHimawariScene(handle) def __del__(self): if self._handle is not None: _hsd.closeHimawariScene(self._handle) @property def metadata(self): if self._metadata is not None: # cache metadata since HRIT reader segfaults on multiple metadata queries return self._metadata LOG.debug('fetching metadata') meta = ffi.new('struct HimawariMetadata *') rc = _hsd.queryHimawariSceneMetadata(self._handle, meta) if rc==0: self._metadata = meta # print meta return meta elif rc==_hsd.ERROR_NOT_AVAILABLE: return None else: raise HimawariSceneError(error_code=rc) @property def calibration(self): LOG.debug('fetching calibration') cal = ffi.new('struct HimawariCalibration *') rc = _hsd.queryHimawariSceneCalibration(self._handle, cal) if rc==0: return cal elif rc==_hsd.ERROR_NOT_AVAILABLE: return None else: raise HimawariSceneError(error_code=rc) @property def navigation(self): LOG.debug('fetching navigation') nav = ffi.new('struct HimawariNavigation *') rc = _hsd.queryHimawariSceneNavigation(self._handle, nav) if rc==0: return nav elif rc==_hsd.ERROR_NOT_AVAILABLE: return None else: raise HimawariSceneError(error_code=rc) @property def shape(self): """shape of the scene from 0,0 """ meta = self.metadata return (meta.lines, meta.columns) @property def extents(self): """actual subset of shape which is available """ meta = self.metadata return (meta.end_line - meta.begin_line, meta.columns) @property def band(self): meta = self.metadata return meta.band @property def kind(self): meta = self.metadata return {_hsd.AHI_BAND_ALBEDO: 'albedo', _hsd.AHI_BAND_BRIGHTNESS_TEMP: "brightness_temp" }.get(meta.band_type, None) @property def fgf_y(self): y,x = self.coords(columns=1) return y[:,0].squeeze() @property def fgf_x(self): meta = self.metadata # y,x = self.coords() y,x = self.coords(line_offset=meta.begin_line, lines=1) return x[0,:].squeeze() @property def fgf(self): y,_,_ = self.coords(columns=1, unscaled=True) meta = self.metadata _,x,mb = self.coords(line_offset=meta.begin_line, lines=1, unscaled=True) return fgf_yxmb(y[:,0].squeeze(),x[0,:].squeeze(),mb.my,mb.mx,mb.by,mb.bx) @property def line_times(self): meta = self.metadata lines = meta.lines start = meta.start_time.unix_time data = np.empty(shape=(lines,), dtype=np.float64) dataptr = ffi.cast('double *', data.ctypes.data) LOG.debug('fetching line time offsets') rc = _hsd.queryHimawariSceneLineTimeOffsets(self._handle, dataptr) if rc<0: raise HimawariSceneError(rc) elif rc>0: LOG.warning("_copyData rc = %d" % rc) return start, data def _copyData(self, routine, line_stride=1, column_stride=1, line_offset=0, column_offset=0, lines=0, columns=0, downsample_to_band=0): if lines<=0: lines = int((self.shape[0] - line_offset) / line_stride) if columns<=0: columns = int((self.shape[1] - column_offset) / column_stride) # DEBUG LOG.debug('fetching %dx%d LxC data using %r' % (lines, columns, routine)) # data = np.empty(shape=(lines+100, columns), dtype=np.float32) data = np.empty(shape=(lines, columns), dtype=np.float32) LOG.debug('allocating shape %s' % repr(data.shape)) dataptr = ffi.cast('float *', data.ctypes.data) src = ffi.new('struct HimawariSourceLocation *') src.line_stride = line_stride src.column_stride = column_stride src.lines = lines src.columns = columns src.line_offset = line_offset src.column_offset = column_offset src.nav_options = self._nav_options dst = ffi.new('struct HimawariDestinationLocation *') # default, packed dst.downsample_to_match_band = downsample_to_band or self._match_band_sampling rc = routine(self._handle, src, dst, dataptr) if rc<0: raise HimawariSceneError(rc) elif rc>0: LOG.warning("_copyData rc = %d" % rc) return np.ma.masked_array(data, np.isnan(data), fill_value=np.NAN) def coords(self, line_stride=1, column_stride=1, line_offset=0, column_offset=0, lines=0, columns=0, unscaled=False, downsample_to_band=0, **extra_args): """ returns y, x, scale if unscaled=True else returns y,x prescaled to radians """ LOG.debug('fetching %dx%d LxC coords' % (lines, columns)) if lines==0: lines = int((self.shape[0] - line_offset) / line_stride) if columns==0: columns = int((self.shape[1] - column_offset) / column_stride) ydata = np.empty(shape=(lines, columns), dtype=np.float32) xdata = np.empty(shape=(lines, columns), dtype=np.float32) LOG.debug('allocating shape %s' % repr(ydata.shape)) ydataptr = ffi.cast('float *', ydata.ctypes.data) xdataptr = ffi.cast('float *', xdata.ctypes.data) src = ffi.new('struct HimawariSourceLocation *') src.line_stride = line_stride src.column_stride = column_stride src.line_offset = line_offset src.column_offset = column_offset src.lines = lines src.columns = columns src.nav_options = self._nav_options # JMA default dst = ffi.new('struct HimawariDestinationLocation *') # default, packed dst.downsample_to_match_band = downsample_to_band or self._match_band_sampling or 0 LOG.debug('copying x/y coordinate angles') if unscaled: scale = ffi.new('struct HimawariCoordsScale *') rc = _hsd.copyHimawariSceneCoords(self._handle, src, dst, ydataptr, xdataptr, scale) else: rc = _hsd.copyHimawariSceneCoords(self._handle, src, dst, ydataptr, xdataptr, ffi.NULL) if rc<0: raise HimawariSceneError(rc) elif rc>0: LOG.warning("copyHimawariSceneCoords rc = %d" % rc) # DEBUG # return data[:lines, :columns] if unscaled: return ydata, xdata, scale else: return ydata, xdata def geo(self, **kwargs): try: from .cgms_nav import cgms_nav except (ImportError, ValueError, SystemError): # non-module run case from cgms_nav import cgms_nav nav = self.navigation fgf_y, fgf_x = self.coords(**kwargs) args = dict( coff=nav.COFF, loff=nav.LOFF, cfac=nav.CFAC, lfac=nav.LFAC, sub_lon=nav.sub_lon, sat_height=nav.distance_from_earth_center_to_virtual_satellite, r_eq=nav.earth_equatorial_radius, r_pol=nav.earth_polar_radius ) for k in args.keys(): if k in kwargs: args[k] = kwargs[k] LOG.debug(args) LOG.debug(fgf_y) LOG.debug(fgf_x) return cgms_nav(fgf_y, fgf_x, **args) def counts(self, **kwargs): LOG.debug('copying raw counts') return self._copyData(_hsd.copyHimawariSceneCounts, **kwargs) def brightnessTemps(self, **kwargs): LOG.debug('copying brightness temps') return self._copyData(_hsd.copyHimawariSceneBrightnessTemps, **kwargs) def albedos(self, **kwargs): LOG.debug('copying albedos') return self._copyData(_hsd.copyHimawariSceneAlbedos, **kwargs) def radiances(self, **kwargs): LOG.debug('copying radiances') return self._copyData(_hsd.copyHimawariSceneRadiances, **kwargs) def translate(self, file_number, line, column): plin = ffi.new('int *') pcol = ffi.new('int *') plin[0] = line pcol[0] = column LOG.debug('debugging nav correction') fp = _hsd._debugHimawariFileFromScene(self._handle, file_number) _hsd._debugHimawariFileNavigationCorrection(fp, plin, pcol) return plin[0], pcol[0] # def fgf_nav(self, x, y): # """ # use geos_transform module to perform single point fixed grid format calculation # returns longitude, latitude # see ssh://git.ssec.wisc.edu/~rayg/repos/git/geos_transform.git # """ # from geos_transform import fgf_to_earth_, sat_to_earth # nav = self.navigation # # ref pixcoord2geocoord.pro, WCS3 # two16 = float(2**16) # lam = (x - nav.COFF) * two16 / nav.CFAC * (np.pi / 180.0) # tht = (y - nav.LOFF) * two16 / nav.LFAC * (np.pi / 180.0) # fgf = np.vectorize(sat_to_earth) # return fgf(lam, tht, nav.sub_lon) # # fgf = np.vectorize(fgf_to_earth_) # # return fgf(x,y, nav.CFAC, nav.COFF, nav.LFAC, nav.LOFF, nav.sub_lon) def report(self, line, column): self._repcount = counts = getattr(self, '_repcount', self.counts()) self._reprad = rad = getattr(self, '_reprad', self.radiances()) try: self._reptemp = temp = getattr(self, '_reptemp', self.brightnessTemps()) except AssertionError: self._reptemp = temp = None try: self._repalb = alb = getattr(self, '_repalb', self.albedos()) except AssertionError: self._repalb = alb = None print("counts[%d,%d] = %.4f" % (line, column, counts[line, column])) print("rad[%d,%d] = %.4f" % (line, column, rad[line, column])) if temp is not None: print("btemp[%d,%d] = %.4f" % (line, column, temp[line, column])) elif alb is not None: print("alb[%d,%d] = %.4f" % (line, column, alb[line, column])) def downsample(data, factor, sentinel=np.NAN, first_row=0, first_col=0): # http://stackoverflow.com/questions/16276268/how-to-pass-a-numpy-array-into-a-cffi-function-and-how-to-get-one-back-out pdata = ffi.cast("const float *", data.ctypes.data) new_shape = tuple(int(x / factor) for x in data.shape) zult = np.zeros(new_shape, dtype=np.float32) pzult = ffi.cast("float *", zult.ctypes.data) rc = _hsd.averageHimawariFloatArray(factor, np.float32(sentinel), pdata, data.shape[0], data.shape[1], # lines, columns data.shape[1], 1, # line_incr, col_incr first_row, first_col, # first line, first column 0, # source_band pzult, zult.shape[0], zult.shape[1], # destination lines, columns zult.shape[1], 1, # line and column increment 0 # destination band ) return zult def test_partial_scene(stem, remove_count=2): """ create a temporary directory of softlinks to some (but not all) files matching a pattern open a HimawariScene with that subset of files unlink and remove the directory """ from random import randint from tempfile import mktemp from shutil import rmtree import os, glob dirname = mktemp() os.mkdir(dirname) tolink = [os.path.abspath(x) for x in glob.glob(stem + '*.DAT')] while remove_count>0: remove_count -= 1 gbye = randint(0, len(tolink)-1) print("dropping %s" % tolink[gbye]) del tolink[gbye] for apath in sorted(tolink): _,fn = os.path.split(apath) lpath = os.path.join(dirname, fn) os.symlink(apath, lpath) print("%s -> %s" % (apath, lpath)) toload = os.path.join(dirname, os.path.split(stem)[-1]) fob = HimawariScene(toload) rmtree(dirname) return fob def test_stride_scene(stem, stride=2): hime = HimawariScene(stem) if isinstance(stem,str) else stem radA = hime.radiances() assert(radA.shape == hime.shape) r,c = hime.shape # in cases where stride does not evenly divide, [::n] slice results in extra row/column r -= r % stride c -= c % stride radA = np.ma.masked_array(radA, np.isnan(radA)) radA = radA[:r:stride, :c:stride] radB = hime.radiances(line_stride=stride, column_stride=stride) radB = np.ma.masked_array(radB, np.isnan(radB)) # from pprint import pprint # pprint(hime.shape) # pprint(radA.shape) # pprint(radB.shape) # pprint(radA[1500:1504,1500:1504]) # pprint(radB[1500:1504,1500:1504]) assert(radB.shape[0] == int(hime.shape[0]/stride)) assert(radB.shape[1] == int(hime.shape[1]/stride)) assert(radA.shape[0] == int(hime.shape[0]/stride)) assert(radA.shape[1] == int(hime.shape[1]/stride)) return np.all(np.equal(radA, radB).ravel()) def test_offset_scene(stem, offset=3, line_offset=0, column_offset=0): line_offset = line_offset or offset column_offset = column_offset or offset hime = HimawariScene(stem) if isinstance(stem,str) else stem radA = hime.radiances()[line_offset:, column_offset:] radA = np.ma.masked_array(radA, np.isnan(radA)) radB = hime.radiances(line_offset=line_offset, column_offset=column_offset) radB = np.ma.masked_array(radB, np.isnan(radB)) # from pprint import pprint # pprint(hime.shape) # pprint(radA.shape) # pprint(radB.shape) # pprint(radA[1500:1504,1500:1504]) # pprint(radB[1500:1504,1500:1504]) assert(radA.shape == radB.shape) assert(radA.shape[0] == hime.shape[0]-line_offset) assert(radA.shape[1] == hime.shape[1]-column_offset) return True # FIXME np.all(np.equal(radA, radB).ravel()) def test_quicklook_radiance(stem='sample/HS_H08_20130710_0300_B01_FLDK_R10', offset=0, line_offset=0, column_offset=0, factor=4, downsample_to_band=0): from cspp_quicklooks.core import ql_common as qlc line_offset = line_offset or offset column_offset = column_offset or offset hime = HimawariScene(stem) if isinstance(stem,str) else stem down = lambda q: downsample(q, factor=factor) if factor>1 else q rad = hime.radiances(line_offset=line_offset, column_offset=column_offset, downsample_to_band=downsample_to_band) #, line_stride=4, column_stride=4) rad = down(rad) rad = np.ma.masked_array(rad, np.isnan(rad)) geo = hime.geo(line_offset=line_offset, column_offset=column_offset, downsample_to_band=downsample_to_band) #, line_stride=4, column_stride=4) lat,lon = geo.latitude, geo.longitude lat = lat[::factor, ::factor] lon = lon[::factor, ::factor] qlc.map_image('test.png', swath=rad, lat=lat, lon=lon) # swath=rad[2000:3000,2000:3000], # lat=lat[2000:3000,2000:3000], # lon=lon[2000:3000,2000:3000]) def test_downsample_scene(stem, factor=2): hime = HimawariScene(stem) if isinstance(stem,str) else stem radA = radOrig = hime.radiances() r,c = radA.shape # in cases where stride does not evenly divide, [::n] slice results in extra row/column x = np.ma.masked_array(radA, np.isnan(radA)) radA = (x[0::2,0::2] + x[1::2,1::2] + x[0::2,1::2] + x[1::2,0::2]) / 4.0 radB = downsample(radOrig, factor=2) radB = np.ma.masked_array(radB, np.isnan(radB)) assert(radA.shape == radB.shape) # tst = np.abs(radA-radB) < 0.001 tst = np.any((radA - radB) < 0.001, np.ma.is_masked(radA) | np.ma.is_masked(radB)) assert(np.isfinite(radA[radA.shape[0]/2, radA.shape[1]/2])) return np.all(tst.ravel()) def test_coords(stem, stride=1, debug=None): hime = HimawariScene(stem) if isinstance(stem,str) else stem sy,sx = hime.coords(line_stride=stride, column_stride=stride) ty,tx = hime.coords() ty = ty[::stride, ::stride] tx = tx[::stride, ::stride] y_anti_tst = np.abs(ty - sy) > 0.00001 x_anti_tst = np.abs(tx - sx) > 0.00001 if debug is not None: debug['sx'] = sx debug['sy'] = sy debug['tx'] = tx debug['ty'] = ty return np.any(y_anti_tst) == False and np.any(x_anti_tst) == False def test_coords_unscaled(stem, stride=1, debug=None): hime = HimawariScene(stem) if isinstance(stem,str) else stem sy,sx,scale = hime.coords(line_stride=stride, column_stride=stride, unscaled=True) ty,tx = hime.coords(line_stride=stride, column_stride=stride) sx = sx * scale.mx + scale.bx sy = sy * scale.my + scale.by y_anti_tst = np.abs(ty - sy) > 0.00001 x_anti_tst = np.abs(tx - sx) > 0.00001 if debug is not None: debug['sx'] = sx debug['sy'] = sy debug['tx'] = tx debug['ty'] = ty debug['scale'] = scale return np.any(y_anti_tst) == False and np.any(x_anti_tst) == False def test_rad_refl(stem, debug=None): hime = HimawariScene(stem) if isinstance(stem,str) else stem if hime.kind=='albedo': LOG.debug('reading albedos') data = hime.albedos() elif hime.kind=='brightness_temp': LOG.debug('reading brightness temps') return True def test_downsample_scene_cxx(stem, stride=2): # FIXME: this has an assertion failure since the downsampling APIs need work raise NotImplementedError("this feature requires ENABLE_DOWNSAMPLE_TO_BAND be turned on after an API review") hime = HimawariScene(stem) if isinstance(stem,str) else stem radA = hime.radiances() r,c = radA.shape # in cases where stride does not evenly divide, [::n] slice results in extra row/column x = np.ma.masked_array(radA, np.isnan(radA)) radA = (x[0::2,0::2] + x[1::2,1::2] + x[0::2,1::2] + x[1::2,0::2]) / 4.0 radB = hime.radiances(downsample_to_band=5) radB = np.ma.masked_array(radB, np.isnan(radB)) # FIXME: find a better way to convey the downsampled size lines,columns = radA.shape radB = radB[:lines, :columns] # assert(radA.shape == radB.shape) return np.all(np.equal(radA, radB).ravel()) def test_radiance_scaling(stem, debug=None): hime = HimawariScene(stem) if isinstance(stem,str) else stem radA = hime.radiances() cal = hime.calibration radB = np.require(hime.counts(), dtype=np.float32) * cal.rad_m + cal.rad_b tst = np.any((radA - radB) < 0.00001, np.ma.is_masked(radA) | np.ma.is_masked(radB)) if debug: debug['radA'] = radA debug['radB'] = radB debug['cal'] = cal return np.any(tst.ravel()) # column, line values from test pattern when run on # /mnt/presto/data/users/rayg/Workspace/himawari_broken_case/HS_H08_20150120_0710_B03_FLDK_R05_S0101.DAT # MD5(/data/users/rayg/Workspace/himawari_broken_case/HS_H08_20150120_0710_B03_FLDK_R05_S0101.DAT)= 7a00fbeccea6a92a63f1ecdef7f35d21 # convert from (pixel,line) to (longitude,latitude) --- # (Pix,Lin)( 7332.0, 3667.0) ==> (Lon,Lat)( 118.412, 37.462) # (Pix,Lin)( 9776.0, 7333.0) ==> (Lon,Lat)( 134.891, 16.987) # (Pix,Lin)( 12220.0, 10999.0) ==> (Lon,Lat)( 146.191, 0.007) # (Pix,Lin)( 14664.0, 14665.0) ==> (Lon,Lat)( 158.488, -17.121) # (Pix,Lin)( 2575.0, 501.0) ==> (Lon,Lat)(-9999.000,-9999.000) # # convert from (longitude,latitude) to (pixel,line) --- # (Lon,Lat)( 118.412, 37.462) ==> (Pix,Lin)(7332.0,3667.0) # (Lon,Lat)( 134.891, 16.987) ==> (Pix,Lin)(9776.0,7333.0) # (Lon,Lat)( 146.191, 0.007) ==> (Pix,Lin)(12220.0,10999.0) # (Lon,Lat)( 158.488, -17.121) ==> (Pix,Lin)(14664.0,14665.0) # (Lon,Lat)(-9999.000,-9999.000) ==> (Pix,Lin)(-9999.0,-9999.0) # # Digital count of (longitude,latitude) --- # hPix=7332.000000 hLin=3667.000000 ll=3666 pp=7332 # (Lon,Lat)( 118.412, 37.462) ==> count value = 337 # hPix=9776.000000 hLin=7333.000000 ll=7332 pp=9776 # (Lon,Lat)( 134.891, 16.987) ==> count value = 328 # hPix=12220.000000 hLin=10999.000000 ll=10998 pp=12220 # (Lon,Lat)( 146.191, 0.007) ==> count value = 45 # hPix=14664.000000 hLin=14665.000000 ll=14664 pp=14664 # (Lon,Lat)( 158.488, -17.121) ==> count value = 79 # (Lon,Lat)(-9999.000,-9999.000) ==> count value = 65534 # # Radiance of (longitude,latitude) --- # (Lon,Lat)( 118.412, 37.462) ==> radiance = 82.3163 # (Lon,Lat)( 134.891, 16.987) ==> radiance = 79.9792 # (Lon,Lat)( 146.191, 0.007) ==> radiance = 6.4918 # (Lon,Lat)( 158.488, -17.121) ==> radiance = 15.3207 # (Lon,Lat)(-9999.000,-9999.000) ==> radiance = -10000000000.0000 TEST_COORDS = [ (7332,3667), (9776,7333), (12220,10999), (14664,14665), (2575,501)] def test_jma_nav(stem): hime = HimawariScene(stem) if isinstance(stem,str) else stem print("extracting coords") sy,sx,scale = hime.coords(unscaled=True) print("computing geo") geo = hime.geo() print("extracting counts") counts = hime.counts() print("extracting radiances") rad = hime.radiances() lines,columns = hime.shape coords = [(c,l) for (c,l) in TEST_COORDS if (c (Lon,Lat)(%9.3f,%9.3f)" % ( sx[pl, pc], sy[pl, pc], geo.longitude[pl, pc], geo.latitude[pl, pc])) for c,l in coords: pc,pl = (c-1),(l-1) # convert to offsets properly # pc,pl = (c-0),(l-1) # FIXME convert to offsets with extra WTF print("hPix=%10.5f hLin=%10.5f pl=%d pc=%d" % (sx[pl, pc], sy[pl, pc], pl, pc)) print("(Lon,Lat)(%9.3f,%9.3f) --> count value = %s" % ( geo.longitude[pl, pc], geo.latitude[pl, pc], counts[pl, pc])) for c,l in coords: pc,pl = (c-1),(l-1) # convert to offsets properly # pc,pl = (c-0),(l-1) # FIXME convert to offsets with extra WTF print( "(Lon,Lat)(%9.3f,%9.3f) --> radiance = %s" % (geo.longitude[pl, pc], geo.latitude[pl, pc], rad[pl, pc])) return True def test_line_times(stem): hime = HimawariScene(stem) if isinstance(stem,str) else stem from functools import reduce base,lt = hime.line_times print(lt) return(np.all(lt[1:]>=lt[:-1])) # for a,b in zip(lt[1:], lt[:-1]): # assert(a>=b) def test_all(args): # FUTURE: make this a real test pattern for fn in (args or ['sample/HS_H08_20130710_0300_B01_FLDK_R10']): hs = HimawariScene(fn) print(fn, hs.shape) del hs # check decompression cache hs = HimawariScene(fn) print("Satellite: %r" % hs.satellite_name) assert(test_line_times(hs)) assert(test_rad_refl(hs)) assert(test_jma_nav(hs)) assert(test_radiance_scaling(hs)) assert(test_coords_unscaled(hs, stride=2)) assert(test_stride_scene(hs,3)) assert(test_stride_scene(hs,2)) assert(test_offset_scene(hs)) assert(test_coords(hs, stride=4)) assert(test_downsample_scene(hs)) def _debug(type, value, tb): "enable with sys.excepthook = debug" if not sys.stdin.isatty(): sys.__excepthook__(type, value, tb) else: import traceback, pdb traceback.print_exception(type, value, tb) # …then start the debugger in post-mortem mode. pdb.post_mortem(tb) # more “modern” def _update_interface_file(path, text): with open(path, 'wt') as fp: fp.write('HIMAWARI_INTERFACE = """' + text + '\n"""\n') def main(): import argparse parser = argparse.ArgumentParser( description="PURPOSE", epilog="", fromfile_prefix_chars='@') parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0, help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG') parser.add_argument('-d', '--debug', dest='debug', action='store_true', help="enable interactive PDB debugger on exception") parser.add_argument('-I', '--interface', dest='interface', help="update interface file given preprocessor output on stdin (internal use only)") # http://docs.python.org/2.7/library/argparse.html#nargs # parser.add_argument('--stuff', nargs='5', dest='my_stuff', # help="one or more random things") parser.add_argument('inputs', nargs='*', help="input files to process") args = parser.parse_args() levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG] logging.basicConfig(level=levels[min(3, args.verbosity)]) if args.debug: sys.excepthook = _debug if args.interface: _update_interface_file(args.interface, sys.stdin.read()) return 0 test_all(args.inputs) return 0 if __name__ == '__main__': sys.exit(main())