-
Eva Schiffer authoredEva Schiffer authored
data_util.py 9.92 KiB
#!/usr/bin/env python
# encoding: utf-8
"""
data_util.py
Purpose: Provide data related utility methods that should work for multiple
types of products.
Created by Eva Schiffer <eva.schiffer@ssec.wisc.edu> on 2017-09-08.
Copyright (c) 2017 University of Wisconsin Regents. All rights reserved.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
# Note this only handles GOES-R related output at the moment
import logging, datetime, os
import netCDF4
import numpy
from constants import *
import framework_data, lvl1_data
# every module should have a LOG object
LOG = logging.getLogger(__file__)
# the types of files we expect
AIT_FRAMEWORK_FILE_TYPE = "AIT Framework Output File"
GOESR_LEVEL1_FILE_TYPE = "GOES 16 Level 1 Output File"
# the time when GOES-16 drifted to a new position
# Note: this is not an exact time, but it's during the period when the satellite was not transmitting data
TRANSITION_DATETIME = datetime.datetime.strptime("12 01 2017", "%m %d %Y")
def determine_file_type (input_file_name) :
"""
Given an input file name, figure out what kind of file it is.
:param input_file_name: the name of the file (not the full path)
:return: type constant for this file and the module to use to process it
"""
type_to_return = None
module_to_return = None
# does this looks like a PUG formatted Framework file
if PUG_FORMAT_FW_PATTERN.match(input_file_name) is not None :
type_to_return = framework_data.determine_file_type(input_file_name)
module_to_return = framework_data
# if it isn't the PUG formatted FW files, it might still be the CMIChanger output
elif TEMP_CMI_PATTERN.match(input_file_name) is not None :
type_to_return = lvl1_data.determine_file_type(input_file_name)
module_to_return = lvl1_data
elif BASIC_LVL1_PATTERN.match(input_file_name) is not None :
type_to_return = lvl1_data.determine_file_type(input_file_name)
module_to_return = lvl1_data
return type_to_return, module_to_return
def get_satellite_name (input_file_path) :
"""
Given an input file path, figure out the name of the satellite it's data came from
:param input_file_path: the full path of the file
:return: "GOES-16", "GOES-17", or "" #Future, if we use this for more satellites, do that here!
"""
basename = os.path.basename(input_file_path)
to_return = ""
# check the code in the name
sat_code = re.split('[-_]', basename)[5]
if sat_code == "G16" :
to_return = "GOES-16"
if sat_code == "G17" :
to_return = "GOES-17"
return to_return
# a formatting pattern for parsing PUG time strings like "date_created" and "time_coverage_start"
PUG_TIME_FORMAT_PATTERN = "%Y-%m-%dT%H:%M:%S.%fZ" # the fractional seconds are given as 1/10ths, so we must add or remove zeros when using this pattern
DISP_TIME_FORMAT_PATTERN = "%Y-%m-%d %H:%M:%S.%fZ"
# the name of the global attribute with the starting time in PUG formatted files
START_TIME_ATTR = "time_coverage_start"
def get_datetime_from_PUG_time_format (pug_formatted_time_str) :
"""
Given a string with a pug formatted time string, return a datetime object
:param pug_formatted_time_str: A time str in the format "2017-02-18T00:11:24.2Z"
:return:
"""
# The last part is fractional 1/10ths of a second, but we need to zero pad so we can parse it as microseconds
temp_str = pug_formatted_time_str[:-1] + "00000" + pug_formatted_time_str[-1]
datetime_to_return = datetime.datetime.strptime(temp_str, PUG_TIME_FORMAT_PATTERN)
return datetime_to_return
def get_disp_datetime_str_from_datetime (datetime_obj) :
"""
Given a datetime object, make a nice formatted string out of it.
:param datetime_obj: The datetime object to format as a string
:return: The string representing the formatted time.
"""
# this should be in the format "2017-04-01 18:42:04.9Z"
time_str = datetime_obj.strftime(DISP_TIME_FORMAT_PATTERN)
time_str = time_str[:-6] + time_str[-1] # trim off the extra microseconds
return time_str
def get_start_datetime_from_file (full_file_path) :
"""
Get the starting datetime from a PUG formatted FW file.
:param full_file_path: The full path to the file.
:return: A datetime object representing the date when the file's data was started being collected
"""
time_to_return = None
temp_nc_file = netCDF4.Dataset(full_file_path, 'r')
all_attr_keys = temp_nc_file.ncattrs()
if START_TIME_ATTR in all_attr_keys :
time_to_return = get_datetime_from_PUG_time_format(getattr(temp_nc_file, START_TIME_ATTR))
# close the file again
temp_nc_file.close()
return time_to_return
def get_navigation_info_from_file (full_file_path) :
"""
Get navigation information from a PUG formatted FW file.
:param full_file_path: The full path to the file.
:return: the bounds, satellite height, and satellite subpoint longitude
"""
temp_nc_file = netCDF4.Dataset(full_file_path, 'r')
variable_list = temp_nc_file.variables
# load the projection specific info
proj_var = variable_list["goes_imager_projection"]
height_m = getattr(proj_var, "perspective_point_height") # this is already in meters
subpoint_lon = getattr(proj_var, "longitude_of_projection_origin")
#height_m = variable_list["nominal_satellite_height"][0] * 1000.0 # multiply by 1000.0 because we need m not km
#subpoint_lon = variable_list["nominal_satellite_subpoint_lon"][0]
# load the x and y and multiply them by the height in meters
x_data = variable_list["x"][:] * height_m
y_data = variable_list["y"][:] * height_m
# calculate our bounds in x and y
bounds_to_return = [min(x_data), max(x_data), min(y_data), max(y_data)]
LOG.debug("navigation from file: bounds=" + str(bounds_to_return) + " height=" + str(height_m) + "m lon_0=" + str(subpoint_lon))
# close the file again
temp_nc_file.close()
return bounds_to_return, height_m, subpoint_lon
def load_variable_data_from_PUG_nc_file (file_path, variable_name) :
"""
Load variable data and related informational attributes from a PUG formatted netCDF file.
This method will handle the scaling and offsets automatically.
:param file_path: The path of the file to get data from
:param variable_name: The name of the variable to load
:return: A copy of the variable data, the long name of the variable, the units of the variable,
the valid range of the data, a list of flag values or None if this is not a flag variable,
a list of flag names or None if this is not a flag variable
"""
temp_nc_file = netCDF4.Dataset(file_path, 'r')
# get the variable object and use it to get our raw data
variable_object = temp_nc_file.variables[variable_name]
data_copy = variable_object[:] # Note: this should be a masked array, but in some cases it is not returning a masked array
# grab a temp copy of the attrs list
temp_var_attrs_list = variable_object.ncattrs()
# get the fill value and long name
fill_value = variable_object.getncattr("_FillValue") if "_FillValue" in temp_var_attrs_list else None
long_name = variable_object.getncattr("long_name") if "long_name" in temp_var_attrs_list else None
# get other misc info about the attributes
units_str = variable_object.getncattr("units") if "units" in temp_var_attrs_list else None
range_info = variable_object.getncattr("valid_range") if "valid_range" in temp_var_attrs_list else None
flag_vals = variable_object.getncattr("flag_values") if "flag_values" in temp_var_attrs_list else None
flag_names = variable_object.getncattr("flag_meanings") if "flag_meanings" in temp_var_attrs_list else None
scale = variable_object.getncattr("scale_factor") if "scale_factor" in temp_var_attrs_list else None
offset = variable_object.getncattr("add_offset") if "add_offset" in temp_var_attrs_list else None
# TODO, note this does not handle unsigned conversions, so it won't properly handle the operational L2 stuff
# if we got the valid range, we may need to unpack it: u = p * scale + off
if range_info is not None :
if scale is not None :
range_info = [x * scale for x in range_info]
if offset is not None :
range_info = [x + offset for x in range_info]
LOG.debug("Expected range for this variable's data: " + str(range_info))
# close the file again
temp_nc_file.close()
# mask the data based on the fill value
if not numpy.ma.is_masked(data_copy) :
fill_data_mask = data_copy == fill_value
data_copy = numpy.ma.MaskedArray(data=data_copy, mask=fill_data_mask)
return data_copy, long_name, units_str, range_info, flag_vals, flag_names
def get_attr_only (file_path, attr_name) :
"""
Get a global attribute from a file.
:param file_path: The full path to the nc file
:param attr_name: The name of the global attribute we expect to get
:return: The value of the global attribute or None if that attr doesn't exist in the file
"""
temp_nc_file = netCDF4.Dataset(file_path, 'r')
# try to get the global attribute if it exists
temp_attr_keys = temp_nc_file.ncattrs()
to_return = getattr(temp_nc_file, attr_name) if attr_name in temp_attr_keys else None
# close the file again
temp_nc_file.close()
return to_return