Skip to content
Snippets Groups Projects
Select Git revision
  • 7e29200f3209b6df8178dee80fae4434619ff796
  • main default protected
  • v1.0.1
3 results

sat_db_functions.py

Blame
  • ygao's avatar
    Yuan Gao authored
    7e29200f
    History
    sat_db_functions.py 9.91 KiB
    #!/home/ygao/latency/venv/bin/python
    import os
    import json
    import subprocess
    import logging
    import pandas as pd
    
    # Set up logging
    logger = logging.getLogger()
    
    # Path to conda environment and database
    CONDA_ENV_PATH = "/home/oper/.mdrexler_conda"
    SATELLITE_DATA_DIR = "/data/sat_latency"  # Path to your latency database
    RELATIONSHIPS_FILE = "satellite_relationships.json"  # Path to your prebuilt relationships file
    
    # Hard-coded mapping of satellite ID variations to canonical IDs
    # This makes it easy for future developers to add or modify mappings
    SATELLITE_ID_MAPPINGS = {
        # Format: 'variant': 'canonical'
        'G16': 'G16',
        'g16': 'G16',
        'G18': 'G18',
        'g18': 'G18',
        'G19': 'G19',
        'g19': 'G19',
        'DMSP-17': 'DMSP-17',
        'dmsp17': 'DMSP-17',
        'DMSP-18': 'DMSP-18',
        'dmsp18': 'DMSP-18',
        'DMSP-16': 'DMSP-16',
        'dmsp16': 'DMSP-16',
        'NOAA-19': 'NOAA-19',
        'n19': 'NOAA-19',
        'NOAA-20': 'NOAA-20',
        'n20': 'NOAA-20',
        'NOAA-21': 'NOAA-21',
        'n21': 'NOAA-21'
    }
    
    # Create reverse mapping (canonical to variants)
    CANONICAL_TO_VARIANTS = {}
    for variant, canonical in SATELLITE_ID_MAPPINGS.items():
        if canonical not in CANONICAL_TO_VARIANTS:
            CANONICAL_TO_VARIANTS[canonical] = []
        CANONICAL_TO_VARIANTS[canonical].append(variant)
    
    def get_canonical_id(satellite_id):
        """Get canonical ID for a satellite ID variant"""
        return SATELLITE_ID_MAPPINGS.get(satellite_id, satellite_id)
    
    def get_all_variants(canonical_id):
        """Get all variants for a canonical satellite ID"""
        return CANONICAL_TO_VARIANTS.get(canonical_id, [canonical_id])
    
    def consolidate_satellite_data(original_data):
        """
        Consolidate satellite data using the hard-coded mapping
        """
        if not original_data:
            return None
            
        normalized_data = {
            "satellites": [],
            "coverages": original_data.get("coverages", []),
            "instruments": original_data.get("instruments", []),
            "relationships": {},
            "satellite_variants": {}  # Maps canonical IDs to their variants
        }
        
        # Group satellites by canonical ID
        satellite_groups = {}
        
        for sat_id in original_data.get("satellites", []):
            canonical_id = get_canonical_id(sat_id)
            
            if canonical_id not in satellite_groups:
                satellite_groups[canonical_id] = []
            
            satellite_groups[canonical_id].append(sat_id)
        
        # Use canonical IDs as the satellite list
        normalized_data["satellites"] = sorted(satellite_groups.keys())
        
        # Store variant mapping
        normalized_data["satellite_variants"] = satellite_groups
        
        # Merge relationships for each canonical ID
        for canonical_id, variants in satellite_groups.items():
            normalized_data["relationships"][canonical_id] = {
                "coverages": [],
                "instruments": [],
                "coverage_instruments": {}
            }
            
            # Merge relationship data from all variants
            for variant_id in variants:
                if variant_id not in original_data.get("relationships", {}):
                    continue
                    
                original_relationship = original_data["relationships"][variant_id]
                
                # Merge coverages
                for coverage in original_relationship.get("coverages", []):
                    if coverage not in normalized_data["relationships"][canonical_id]["coverages"]:
                        normalized_data["relationships"][canonical_id]["coverages"].append(coverage)
                
                # Merge instruments
                for instrument in original_relationship.get("instruments", []):
                    if instrument not in normalized_data["relationships"][canonical_id]["instruments"]:
                        normalized_data["relationships"][canonical_id]["instruments"].append(instrument)
                
                # Merge coverage_instruments
                for coverage, instruments in original_relationship.get("coverage_instruments", {}).items():
                    if coverage not in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
                        normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage] = []
                    
                    for instrument in instruments:
                        if instrument not in normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage]:
                            normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].append(instrument)
            
            # Sort arrays for consistent output
            normalized_data["relationships"][canonical_id]["coverages"].sort()
            normalized_data["relationships"][canonical_id]["instruments"].sort()
            
            for coverage in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
                normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].sort()
        
        return normalized_data
    
    def load_relationship_data():
        """
        Load prebuilt satellite relationship data from JSON file and consolidate duplicates.
        """
        try:
            if os.path.exists(RELATIONSHIPS_FILE):
                with open(RELATIONSHIPS_FILE, 'r') as f:
                    relationships = json.load(f)
                    
                # Consolidate satellite data to merge variants
                consolidated = consolidate_satellite_data(relationships)
                
                if consolidated:
                    logger.info(f"Loaded and consolidated {len(consolidated['satellites'])} unique satellites from relationships")
                    return consolidated
                else:
                    logger.warning("Failed to consolidate relationships data")
                    return relationships
            else:
                logger.warning(f"Relationships file not found: {RELATIONSHIPS_FILE}")
                return None
        except Exception as e:
            logger.error(f"Error loading relationship data: {str(e)}")
            return None
    
    def run_sat_latency_query(start_time, end_time, filters=None):
        """
        Directly query the satellite latency database using sat_latency_interface
        
        Args:
            start_time (str): Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
            end_time (str): End time in ISO format (YYYY-MM-DDTHH:MM:SS)
            filters (dict): Optional filters for satellite_id, coverage, instrument, etc.
            
        Returns:
            list: List of latency records as dictionaries
        """
        # Expand satellite IDs to include all variants
        if filters and "satellite-id" in filters:
            satellite_id = filters["satellite-id"]
            
            # Handle comma-separated list of satellites
            if isinstance(satellite_id, str) and ',' in satellite_id:
                satellite_ids = [s.strip() for s in satellite_id.split(',')]
                expanded_ids = []
                
                for sat_id in satellite_ids:
                    # Get the canonical form
                    canonical_id = get_canonical_id(sat_id)
                    
                    # Add all variants of this canonical ID
                    expanded_ids.extend(get_all_variants(canonical_id))
                
                # Remove duplicates
                expanded_ids = list(set(expanded_ids))
                filters["satellite-id"] = expanded_ids
            # Handle single satellite ID
            elif isinstance(satellite_id, str):
                canonical_id = get_canonical_id(satellite_id)
                filters["satellite-id"] = get_all_variants(canonical_id)
        
        # Build the command exactly as specified
        base_cmd = "module load miniconda/3.6-base && source activate ~/.mdrexler_conda && sat_latency_interface"
        
        # Add database path and time parameters
        cmd = f"{base_cmd} -d {SATELLITE_DATA_DIR} --from '{start_time}' --until '{end_time}' --output-type json"
        
        # Add filters if provided
        if filters:
            for key, values in filters.items():
                if values and (isinstance(values, list) or isinstance(values, str)):
                    if isinstance(values, str):
                        values = [values]
                    filter_values = " ".join(f'"{v}"' for v in values if v)
                    if filter_values:
                        cmd += f" --{key} {filter_values}"
        
        logger.info(f"Running command: {cmd}")
        
        try:
            # Create a temporary shell script to run the command
            script_path = "/tmp/run_sat_latency.sh"
            with open(script_path, 'w') as f:
                f.write("#!/bin/bash\n")
                f.write(cmd + "\n")
            
            # Make the script executable
            os.chmod(script_path, 0o755)
            
            # Run the script using sudo as the oper user
            sudo_cmd = ["sudo", "-u", "oper", "-i", script_path]
            
            logger.info(f"Executing: {' '.join(sudo_cmd)}")
            
            # Use PIPE for stdout and stderr
            process = subprocess.Popen(
                sudo_cmd, 
                stdout=subprocess.PIPE, 
                stderr=subprocess.PIPE,
                universal_newlines=True
            )
            
            # Get the output and error
            stdout, stderr = process.communicate()
            
            # Check if the command was successful
            if process.returncode != 0:
                logger.error(f"Command failed with exit code {process.returncode}: {stderr}")
                return []
            
            # Log the first part of the output
            if stdout:
                logger.info(f"Command output (first 200 chars): {stdout[:200]}...")
            else:
                logger.warning("Command returned empty output")
                
            # Parse the JSON output
            try:
                data = json.loads(stdout)
                logger.info(f"Successfully parsed JSON data: {len(data)} records found")
                return data
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse JSON output: {e}")
                logger.error(f"Raw output (first 500 chars): {stdout[:500]}...")
                return []
                
        except Exception as e:
            logger.error(f"Error executing command: {str(e)}")
            return []
        finally:
            # Clean up temporary script
            if os.path.exists(script_path):
                os.remove(script_path)