Select Git revision
sat_db_functions.py
Yuan Gao authored
sat_db_functions.py 9.91 KiB
#!/home/ygao/latency/venv/bin/python
import os
import json
import subprocess
import logging
import pandas as pd
# Set up logging
logger = logging.getLogger()
# Path to conda environment and database
CONDA_ENV_PATH = "/home/oper/.mdrexler_conda"
SATELLITE_DATA_DIR = "/data/sat_latency" # Path to your latency database
RELATIONSHIPS_FILE = "satellite_relationships.json" # Path to your prebuilt relationships file
# Hard-coded mapping of satellite ID variations to canonical IDs
# This makes it easy for future developers to add or modify mappings
SATELLITE_ID_MAPPINGS = {
# Format: 'variant': 'canonical'
'G16': 'G16',
'g16': 'G16',
'G18': 'G18',
'g18': 'G18',
'G19': 'G19',
'g19': 'G19',
'DMSP-17': 'DMSP-17',
'dmsp17': 'DMSP-17',
'DMSP-18': 'DMSP-18',
'dmsp18': 'DMSP-18',
'DMSP-16': 'DMSP-16',
'dmsp16': 'DMSP-16',
'NOAA-19': 'NOAA-19',
'n19': 'NOAA-19',
'NOAA-20': 'NOAA-20',
'n20': 'NOAA-20',
'NOAA-21': 'NOAA-21',
'n21': 'NOAA-21'
}
# Create reverse mapping (canonical to variants)
CANONICAL_TO_VARIANTS = {}
for variant, canonical in SATELLITE_ID_MAPPINGS.items():
if canonical not in CANONICAL_TO_VARIANTS:
CANONICAL_TO_VARIANTS[canonical] = []
CANONICAL_TO_VARIANTS[canonical].append(variant)
def get_canonical_id(satellite_id):
"""Get canonical ID for a satellite ID variant"""
return SATELLITE_ID_MAPPINGS.get(satellite_id, satellite_id)
def get_all_variants(canonical_id):
"""Get all variants for a canonical satellite ID"""
return CANONICAL_TO_VARIANTS.get(canonical_id, [canonical_id])
def consolidate_satellite_data(original_data):
"""
Consolidate satellite data using the hard-coded mapping
"""
if not original_data:
return None
normalized_data = {
"satellites": [],
"coverages": original_data.get("coverages", []),
"instruments": original_data.get("instruments", []),
"relationships": {},
"satellite_variants": {} # Maps canonical IDs to their variants
}
# Group satellites by canonical ID
satellite_groups = {}
for sat_id in original_data.get("satellites", []):
canonical_id = get_canonical_id(sat_id)
if canonical_id not in satellite_groups:
satellite_groups[canonical_id] = []
satellite_groups[canonical_id].append(sat_id)
# Use canonical IDs as the satellite list
normalized_data["satellites"] = sorted(satellite_groups.keys())
# Store variant mapping
normalized_data["satellite_variants"] = satellite_groups
# Merge relationships for each canonical ID
for canonical_id, variants in satellite_groups.items():
normalized_data["relationships"][canonical_id] = {
"coverages": [],
"instruments": [],
"coverage_instruments": {}
}
# Merge relationship data from all variants
for variant_id in variants:
if variant_id not in original_data.get("relationships", {}):
continue
original_relationship = original_data["relationships"][variant_id]
# Merge coverages
for coverage in original_relationship.get("coverages", []):
if coverage not in normalized_data["relationships"][canonical_id]["coverages"]:
normalized_data["relationships"][canonical_id]["coverages"].append(coverage)
# Merge instruments
for instrument in original_relationship.get("instruments", []):
if instrument not in normalized_data["relationships"][canonical_id]["instruments"]:
normalized_data["relationships"][canonical_id]["instruments"].append(instrument)
# Merge coverage_instruments
for coverage, instruments in original_relationship.get("coverage_instruments", {}).items():
if coverage not in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage] = []
for instrument in instruments:
if instrument not in normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage]:
normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].append(instrument)
# Sort arrays for consistent output
normalized_data["relationships"][canonical_id]["coverages"].sort()
normalized_data["relationships"][canonical_id]["instruments"].sort()
for coverage in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].sort()
return normalized_data
def load_relationship_data():
"""
Load prebuilt satellite relationship data from JSON file and consolidate duplicates.
"""
try:
if os.path.exists(RELATIONSHIPS_FILE):
with open(RELATIONSHIPS_FILE, 'r') as f:
relationships = json.load(f)
# Consolidate satellite data to merge variants
consolidated = consolidate_satellite_data(relationships)
if consolidated:
logger.info(f"Loaded and consolidated {len(consolidated['satellites'])} unique satellites from relationships")
return consolidated
else:
logger.warning("Failed to consolidate relationships data")
return relationships
else:
logger.warning(f"Relationships file not found: {RELATIONSHIPS_FILE}")
return None
except Exception as e:
logger.error(f"Error loading relationship data: {str(e)}")
return None
def run_sat_latency_query(start_time, end_time, filters=None):
"""
Directly query the satellite latency database using sat_latency_interface
Args:
start_time (str): Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
end_time (str): End time in ISO format (YYYY-MM-DDTHH:MM:SS)
filters (dict): Optional filters for satellite_id, coverage, instrument, etc.
Returns:
list: List of latency records as dictionaries
"""
# Expand satellite IDs to include all variants
if filters and "satellite-id" in filters:
satellite_id = filters["satellite-id"]
# Handle comma-separated list of satellites
if isinstance(satellite_id, str) and ',' in satellite_id:
satellite_ids = [s.strip() for s in satellite_id.split(',')]
expanded_ids = []
for sat_id in satellite_ids:
# Get the canonical form
canonical_id = get_canonical_id(sat_id)
# Add all variants of this canonical ID
expanded_ids.extend(get_all_variants(canonical_id))
# Remove duplicates
expanded_ids = list(set(expanded_ids))
filters["satellite-id"] = expanded_ids
# Handle single satellite ID
elif isinstance(satellite_id, str):
canonical_id = get_canonical_id(satellite_id)
filters["satellite-id"] = get_all_variants(canonical_id)
# Build the command exactly as specified
base_cmd = "module load miniconda/3.6-base && source activate ~/.mdrexler_conda && sat_latency_interface"
# Add database path and time parameters
cmd = f"{base_cmd} -d {SATELLITE_DATA_DIR} --from '{start_time}' --until '{end_time}' --output-type json"
# Add filters if provided
if filters:
for key, values in filters.items():
if values and (isinstance(values, list) or isinstance(values, str)):
if isinstance(values, str):
values = [values]
filter_values = " ".join(f'"{v}"' for v in values if v)
if filter_values:
cmd += f" --{key} {filter_values}"
logger.info(f"Running command: {cmd}")
try:
# Create a temporary shell script to run the command
script_path = "/tmp/run_sat_latency.sh"
with open(script_path, 'w') as f:
f.write("#!/bin/bash\n")
f.write(cmd + "\n")
# Make the script executable
os.chmod(script_path, 0o755)
# Run the script using sudo as the oper user
sudo_cmd = ["sudo", "-u", "oper", "-i", script_path]
logger.info(f"Executing: {' '.join(sudo_cmd)}")
# Use PIPE for stdout and stderr
process = subprocess.Popen(
sudo_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
# Get the output and error
stdout, stderr = process.communicate()
# Check if the command was successful
if process.returncode != 0:
logger.error(f"Command failed with exit code {process.returncode}: {stderr}")
return []
# Log the first part of the output
if stdout:
logger.info(f"Command output (first 200 chars): {stdout[:200]}...")
else:
logger.warning("Command returned empty output")
# Parse the JSON output
try:
data = json.loads(stdout)
logger.info(f"Successfully parsed JSON data: {len(data)} records found")
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON output: {e}")
logger.error(f"Raw output (first 500 chars): {stdout[:500]}...")
return []
except Exception as e:
logger.error(f"Error executing command: {str(e)}")
return []
finally:
# Clean up temporary script
if os.path.exists(script_path):
os.remove(script_path)