Skip to content
Snippets Groups Projects
sat_db_functions.py 12.65 KiB
#!/home/ygao/miniconda/envs/py39env/bin/python
import os
import json
import logging
import pandas as pd
from datetime import datetime, timezone,timedelta
from sat_latency.interface import satellite_data_from_filters

# Custom JSON encoder to handle datetime objects
class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime, pd.Timestamp)):
            return obj.isoformat()
        return super().default(obj)
                               
# Set up logging
logger = logging.getLogger()

# Path to database
SATELLITE_DATA_DIR = "/data/sat_latency"  # Path to your latency database
RELATIONSHIPS_FILE = "satellite_relationships.json"  # Path to your prebuilt relationships file

# Hard-coded mapping of satellite ID variations to canonical IDs
# This makes it easy for future developers to add or modify mappings
SATELLITE_ID_MAPPINGS = {
    # Format: 'variant': 'canonical'
    'G16': 'G16',
    'g16': 'G16',
    'G18': 'G18',
    'g18': 'G18',
    'G19': 'G19',
    'g19': 'G19',
    'DMSP-17': 'DMSP-17',
    'dmsp17': 'DMSP-17',
    'DMSP-18': 'DMSP-18',
    'dmsp18': 'DMSP-18',
    'DMSP-16': 'DMSP-16',
    'dmsp16': 'DMSP-16',
    'NOAA-19': 'NOAA-19',
    'n19': 'NOAA-19',
    'NOAA-20': 'NOAA-20',
    'n20': 'NOAA-20',
    'NOAA-21': 'NOAA-21',
    'n21': 'NOAA-21'
}

# Create reverse mapping (canonical to variants)
CANONICAL_TO_VARIANTS = {}
for variant, canonical in SATELLITE_ID_MAPPINGS.items():
    if canonical not in CANONICAL_TO_VARIANTS:
        CANONICAL_TO_VARIANTS[canonical] = []
    CANONICAL_TO_VARIANTS[canonical].append(variant)

def get_canonical_id(satellite_id):
    """Get canonical ID for a satellite ID variant"""
    return SATELLITE_ID_MAPPINGS.get(satellite_id, satellite_id)

def get_all_variants(canonical_id):
    """Get all variants for a canonical satellite ID"""
    return CANONICAL_TO_VARIANTS.get(canonical_id, [canonical_id])

def consolidate_satellite_data(original_data):
    """
    Consolidate satellite data using the hard-coded mapping
    """
    if not original_data:
        return None
        
    normalized_data = {
        "satellites": [],
        "coverages": original_data.get("coverages", []),
        "instruments": original_data.get("instruments", []),
        "relationships": {},
        "satellite_variants": {}  # Maps canonical IDs to their variants
    }
    
    # Group satellites by canonical ID
    satellite_groups = {}
    
    for sat_id in original_data.get("satellites", []):
        canonical_id = get_canonical_id(sat_id)
        
        if canonical_id not in satellite_groups:
            satellite_groups[canonical_id] = []
        
        satellite_groups[canonical_id].append(sat_id)
    
    # Use canonical IDs as the satellite list
    normalized_data["satellites"] = sorted(satellite_groups.keys())
    
    # Store variant mapping
    normalized_data["satellite_variants"] = satellite_groups
    
    # Merge relationships for each canonical ID
    for canonical_id, variants in satellite_groups.items():
        normalized_data["relationships"][canonical_id] = {
            "coverages": [],
            "instruments": [],
            "coverage_instruments": {}
        }
        
        # Merge relationship data from all variants
        for variant_id in variants:
            if variant_id not in original_data.get("relationships", {}):
                continue
                
            original_relationship = original_data["relationships"][variant_id]
            
            # Merge coverages
            for coverage in original_relationship.get("coverages", []):
                if coverage not in normalized_data["relationships"][canonical_id]["coverages"]:
                    normalized_data["relationships"][canonical_id]["coverages"].append(coverage)
            
            # Merge instruments
            for instrument in original_relationship.get("instruments", []):
                if instrument not in normalized_data["relationships"][canonical_id]["instruments"]:
                    normalized_data["relationships"][canonical_id]["instruments"].append(instrument)
            
            # Merge coverage_instruments
            for coverage, instruments in original_relationship.get("coverage_instruments", {}).items():
                if coverage not in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
                    normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage] = []
                
                for instrument in instruments:
                    if instrument not in normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage]:
                        normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].append(instrument)
        
        # Sort arrays for consistent output
        normalized_data["relationships"][canonical_id]["coverages"].sort()
        normalized_data["relationships"][canonical_id]["instruments"].sort()
        
        for coverage in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
            normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].sort()
    
    return normalized_data

def load_relationship_data():
    """
    Load prebuilt satellite relationship data from JSON file and consolidate duplicates.
    """
    try:
        if os.path.exists(RELATIONSHIPS_FILE):
            with open(RELATIONSHIPS_FILE, 'r') as f:
                relationships = json.load(f)
                
            # Consolidate satellite data to merge variants
            consolidated = consolidate_satellite_data(relationships)
            
            if consolidated:
                logger.info(f"Loaded and consolidated {len(consolidated['satellites'])} unique satellites from relationships")
                return consolidated
            else:
                logger.warning("Failed to consolidate relationships data")
                return relationships
        else:
            logger.warning(f"Relationships file not found: {RELATIONSHIPS_FILE}")
            return None
    except Exception as e:
        logger.error(f"Error loading relationship data: {str(e)}")
        return None

def run_sat_latency_query(start_time, end_time, filters=None):
    """
    Query the satellite latency database using sat_latency.interface package
    
    Args:
        start_time (str): Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
        end_time (str): End time in ISO format (YYYY-MM-DDTHH:MM:SS)
        filters (dict): Optional filters for satellite_id, coverage, instrument, etc.
        
    Returns:
        list: List of latency records as dictionaries
    """
    try:
        logger.info(f"Querying satellite latency data from {start_time} to {end_time}")
        
        # Convert string ISO timestamps to datetime objects if they are strings
        if isinstance(start_time, str):
            start_datetime = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
        else:
            start_datetime = start_time
            
        if isinstance(end_time, str):
            end_datetime = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
        else:
            end_datetime = end_time
        
        # Ensure timezone is set
        if start_datetime.tzinfo is None:
            start_datetime = start_datetime.replace(tzinfo=timezone.utc)
        if end_datetime.tzinfo is None:
            end_datetime = end_datetime.replace(tzinfo=timezone.utc)
            
        logger.info(f"Converted timestamps: {start_datetime} to {end_datetime}")
        
       # Initialize filter parameters for the sat_latency API
        satellite_ids = None
        coverage = None
        instrument = None

        # Process filters
        if filters:
            # Expand satellite IDs to include all variants
            if "satellite-id" in filters:
                satellite_id = filters["satellite-id"]
                        
                # Handle list or comma-separated list of satellites
                if isinstance(satellite_id, list):
                    expanded_ids = []
                    for sat_id in satellite_id:
                        canonical_id = get_canonical_id(sat_id)
                        expanded_ids.extend(get_all_variants(canonical_id))
                    # Remove duplicates
                    satellite_ids = list(set(expanded_ids))
                elif isinstance(satellite_id, str) and ',' in satellite_id:
                    satellite_ids_list = [s.strip() for s in satellite_id.split(',')]
                    expanded_ids = []
                    for sat_id in satellite_ids_list:
                        canonical_id = get_canonical_id(sat_id)
                        expanded_ids.extend(get_all_variants(canonical_id))
                    satellite_ids = list(set(expanded_ids))
                elif isinstance(satellite_id, str):
                    canonical_id = get_canonical_id(satellite_id)
                    satellite_ids = get_all_variants(canonical_id)
                
            # Get coverage filter
            if "coverage" in filters:
                coverage_value = filters["coverage"]
                # Convert coverage to a list if it's a string
                if isinstance(coverage_value, str):
                    if ',' in coverage_value:
                        coverage = [c.strip() for c in coverage_value.split(',')]
                    else:
                        coverage = [coverage_value]
                else:
                    coverage = coverage_value  # Already a list or None
                
            # Get instrument filter
            if "instrument" in filters:
                instrument_value = filters["instrument"]
                # Convert instrument to a list if it's a string
                if isinstance(instrument_value, str):
                    if ',' in instrument_value:
                        instrument = [i.strip() for i in instrument_value.split(',')]
                    else:
                        instrument = [instrument_value]
                else:
                    instrument = instrument_value  # Already a list or None

        # Log the query parameters
        logger.info(f"Query parameters: database={SATELLITE_DATA_DIR}, start_date={start_datetime}, end_date={end_datetime}")
        logger.info(f"Filters: satellite_ids={satellite_ids}, coverage={coverage}, instrument={instrument}")

        # Call the sat_latency.interface function
        data = satellite_data_from_filters(
            SATELLITE_DATA_DIR,
            start_date=start_datetime,
            end_date=end_datetime,
            satellite_ids=satellite_ids,
            coverages=coverage,
            instruments=instrument
        )
        # Convert result to a list of dictionaries for JSON serialization
        if data is not None:
            try:
                # Convert Polars DataFrame to list of dictionaries
                records = data.to_dicts()
                
                # Process datetime objects for JSON serialization
                processed_records = []
                for record in records:
                    processed_record = {}
                    for key, value in record.items():
                        if isinstance(value, (datetime, pd.Timestamp)):
                            processed_record[key] = value.isoformat()
                        else:
                            processed_record[key] = value
                    processed_records.append(processed_record)
                
                logger.info(f"Successfully converted data: {len(processed_records)} records found")
                return processed_records
                
            except Exception as e:
                logger.error(f"Error converting Polars DataFrame to dict: {str(e)}")
                # Fallback method if to_dicts() is not available
                try:
                    pandas_df = data.to_pandas()
                    
                    # Convert datetime columns to strings
                    for col in pandas_df.select_dtypes(include=['datetime64']).columns:
                        pandas_df[col] = pandas_df[col].astype(str)
                    
                    records = pandas_df.to_dict(orient='records')
                    logger.info(f"Successfully converted data via pandas: {len(records)} records found")
                    return records
                except Exception as e2:
                    logger.error(f"Error in pandas conversion fallback: {str(e2)}")
                    return []
        else:
            logger.warning("Query returned None")
            return []
    except Exception as e:
        logger.error(f"Error executing satellite latency query: {str(e)}")
        import traceback
        logger.error(traceback.format_exc())
        return []