Skip to content
Snippets Groups Projects
data_util.py 9.92 KiB
#!/usr/bin/env python
# encoding: utf-8
"""
data_util.py

Purpose: Provide data related utility methods that should work for multiple
types of products.

Created by Eva Schiffer <eva.schiffer@ssec.wisc.edu> on 2017-09-08.
Copyright (c) 2017 University of Wisconsin Regents. All rights reserved.

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""

# Note this only handles GOES-R related output at the moment

import logging, datetime, os

import netCDF4
import numpy

from constants import *
import framework_data, lvl1_data

# every module should have a LOG object
LOG = logging.getLogger(__file__)

# the types of files we expect
AIT_FRAMEWORK_FILE_TYPE     = "AIT Framework Output File"
GOESR_LEVEL1_FILE_TYPE      = "GOES 16 Level 1 Output File"

# the time when GOES-16 drifted to a new position
# Note: this is not an exact time, but it's during the period when the satellite was not transmitting data
TRANSITION_DATETIME = datetime.datetime.strptime("12 01 2017", "%m %d %Y")

def determine_file_type (input_file_name) :
    """
    Given an input file name, figure out what kind of file it is.

    :param input_file_name: the name of the file (not the full path)
    :return: type constant for this file and the module to use to process it
    """

    type_to_return   = None
    module_to_return = None

    # does this looks like a PUG formatted Framework file
    if PUG_FORMAT_FW_PATTERN.match(input_file_name) is not None :

        type_to_return   = framework_data.determine_file_type(input_file_name)
        module_to_return = framework_data

    # if it isn't the PUG formatted FW files, it might still be the CMIChanger output
    elif TEMP_CMI_PATTERN.match(input_file_name) is not None :

        type_to_return   = lvl1_data.determine_file_type(input_file_name)
        module_to_return = lvl1_data

    elif BASIC_LVL1_PATTERN.match(input_file_name) is not None :

        type_to_return   = lvl1_data.determine_file_type(input_file_name)
        module_to_return = lvl1_data

    return type_to_return, module_to_return

def get_satellite_name (input_file_path) :
    """
    Given an input file path, figure out the name of the satellite it's data came from

    :param input_file_path: the full path of the file
    :return: "GOES-16", "GOES-17", or "" #Future, if we use this for more satellites, do that here!
    """

    basename = os.path.basename(input_file_path)

    to_return = ""

    # check the code in the name
    sat_code = re.split('[-_]', basename)[5]
    if sat_code == "G16" :
        to_return = "GOES-16"
    if sat_code == "G17" :
        to_return = "GOES-17"

    return to_return

# a formatting pattern for parsing PUG time strings like "date_created" and "time_coverage_start"
PUG_TIME_FORMAT_PATTERN = "%Y-%m-%dT%H:%M:%S.%fZ" # the fractional seconds are given as 1/10ths, so we must add or remove zeros when using this pattern
DISP_TIME_FORMAT_PATTERN = "%Y-%m-%d %H:%M:%S.%fZ"

# the name of the global attribute with the starting time in PUG formatted files
START_TIME_ATTR = "time_coverage_start"

def get_datetime_from_PUG_time_format (pug_formatted_time_str) :
    """
    Given a string with a pug formatted time string, return a datetime object

    :param pug_formatted_time_str: A time str in the format "2017-02-18T00:11:24.2Z"
    :return:
    """

    # The last part is fractional 1/10ths of a second, but we need to zero pad so we can parse it as microseconds
    temp_str = pug_formatted_time_str[:-1] + "00000" + pug_formatted_time_str[-1]

    datetime_to_return = datetime.datetime.strptime(temp_str, PUG_TIME_FORMAT_PATTERN)

    return datetime_to_return

def get_disp_datetime_str_from_datetime (datetime_obj) :
    """
    Given a datetime object, make a nice formatted string out of it.

    :param datetime_obj: The datetime object to format as a string
    :return: The string representing the formatted time.
    """

    # this should be in the format "2017-04-01 18:42:04.9Z"
    time_str = datetime_obj.strftime(DISP_TIME_FORMAT_PATTERN)
    time_str = time_str[:-6] + time_str[-1]  # trim off the extra microseconds

    return time_str

def get_start_datetime_from_file (full_file_path) :
    """
    Get the starting datetime from a PUG formatted FW file.

    :param full_file_path:   The full path to the file.
    :return: A datetime object representing the date when the file's data was started being collected
    """

    time_to_return = None

    temp_nc_file = netCDF4.Dataset(full_file_path, 'r')

    all_attr_keys = temp_nc_file.ncattrs()
    if START_TIME_ATTR in all_attr_keys :
        time_to_return = get_datetime_from_PUG_time_format(getattr(temp_nc_file, START_TIME_ATTR))

    # close the file again
    temp_nc_file.close()

    return time_to_return

def get_navigation_info_from_file (full_file_path) :
    """
    Get navigation information from a PUG formatted FW file.

    :param full_file_path:   The full path to the file.
    :return: the bounds, satellite height, and satellite subpoint longitude
    """

    temp_nc_file = netCDF4.Dataset(full_file_path, 'r')
    variable_list = temp_nc_file.variables

    # load the projection specific info
    proj_var = variable_list["goes_imager_projection"]
    height_m     = getattr(proj_var, "perspective_point_height") # this is already in meters
    subpoint_lon = getattr(proj_var, "longitude_of_projection_origin")
    #height_m     = variable_list["nominal_satellite_height"][0] * 1000.0 # multiply by 1000.0 because we need m not km
    #subpoint_lon = variable_list["nominal_satellite_subpoint_lon"][0]

    # load the x and y and multiply them by the height in meters
    x_data = variable_list["x"][:] * height_m
    y_data = variable_list["y"][:] * height_m

    # calculate our bounds in x and y
    bounds_to_return = [min(x_data), max(x_data), min(y_data), max(y_data)]

    LOG.debug("navigation from file: bounds=" + str(bounds_to_return) + " height=" + str(height_m) + "m lon_0=" + str(subpoint_lon))

    # close the file again
    temp_nc_file.close()

    return bounds_to_return, height_m, subpoint_lon

def load_variable_data_from_PUG_nc_file (file_path, variable_name) :
    """
    Load variable data and related informational attributes from a PUG formatted netCDF file.
    This method will handle the scaling and offsets automatically.

    :param file_path: The path of the file to get data from
    :param variable_name: The name of the variable to load
    :return: A copy of the variable data, the long name of the variable, the units of the variable,
    the valid range of the data, a list of flag values or None if this is not a flag variable,
    a list of flag names or None if this is not a flag variable
    """

    temp_nc_file = netCDF4.Dataset(file_path, 'r')

    # get the variable object and use it to get our raw data
    variable_object = temp_nc_file.variables[variable_name]
    data_copy = variable_object[:] # Note: this should be a masked array, but in some cases it is not returning a masked array

    # grab a temp copy of the attrs list
    temp_var_attrs_list = variable_object.ncattrs()

    # get the fill value and long name
    fill_value = variable_object.getncattr("_FillValue")    if "_FillValue"    in temp_var_attrs_list else None
    long_name  = variable_object.getncattr("long_name")     if "long_name"     in temp_var_attrs_list else None

    # get other misc info about the attributes
    units_str  = variable_object.getncattr("units")         if "units"         in temp_var_attrs_list else None
    range_info = variable_object.getncattr("valid_range")   if "valid_range"   in temp_var_attrs_list else None
    flag_vals  = variable_object.getncattr("flag_values")   if "flag_values"   in temp_var_attrs_list else None
    flag_names = variable_object.getncattr("flag_meanings") if "flag_meanings" in temp_var_attrs_list else None
    scale      = variable_object.getncattr("scale_factor")  if "scale_factor"  in temp_var_attrs_list else None
    offset     = variable_object.getncattr("add_offset")    if "add_offset"    in temp_var_attrs_list else None

    # TODO, note this does not handle unsigned conversions, so it won't properly handle the operational L2 stuff

    # if we got the valid range, we may need to unpack it: u = p * scale + off
    if range_info is not None :
        if scale is not None :
            range_info = [x * scale for x in range_info]
        if offset is not None :
            range_info = [x + offset for x in range_info]

    LOG.debug("Expected range for this variable's data: " + str(range_info))

    # close the file again
    temp_nc_file.close()

    # mask the data based on the fill value
    if not numpy.ma.is_masked(data_copy) :
        fill_data_mask = data_copy == fill_value
        data_copy = numpy.ma.MaskedArray(data=data_copy, mask=fill_data_mask)

    return data_copy, long_name, units_str, range_info, flag_vals, flag_names


def get_attr_only (file_path, attr_name) :
    """
    Get a global attribute from a file.

    :param file_path: The full path to the nc file
    :param attr_name: The name of the global attribute we expect to get
    :return: The value of the global attribute or None if that attr doesn't exist in the file
    """

    temp_nc_file = netCDF4.Dataset(file_path, 'r')

    # try to get the global attribute if it exists
    temp_attr_keys = temp_nc_file.ncattrs()
    to_return = getattr(temp_nc_file, attr_name) if attr_name in temp_attr_keys else None

    # close the file again
    temp_nc_file.close()

    return to_return