Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ygao/latency
1 result
Show changes
Commits on Source (2)
#!/home/ygao/latency/venv/bin/python
#!/home/ygao/miniconda/envs/py39env/bin/python
import cgi
import cgitb
import json
......@@ -7,13 +7,12 @@ import os
import logging
import pandas as pd
import traceback
import subprocess
# Enable detailed CGI error reporting
cgitb.enable()
# Set up logging - use absolute path to ensure writability
LOG_DIR = ""
LOG_DIR = "./" # Using /tmp which should be writable
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "latency_viewer.log")
......@@ -33,54 +32,61 @@ logger.info(f"Script path: {os.path.abspath(__file__)}")
logger.info(f"Python version: {sys.version}")
logger.info(f"User running script: {os.getenv('USER') or 'Unknown'}")
# Import functions from our shared module
# Define fallback functions in case import fails
def fallback_get_canonical_id(satellite_id):
"""Fallback function if import fails - returns the input as-is"""
logger.warning(f"Using fallback get_canonical_id for {satellite_id}")
return satellite_id
def fallback_get_all_variants(canonical_id):
"""Fallback function if import fails - returns the canonical ID in a list"""
logger.warning(f"Using fallback get_all_variants for {canonical_id}")
return [canonical_id]
def fallback_run_sat_latency_query(start_time, end_time, filters=None):
"""Fallback function if import fails - returns empty list"""
logger.error("Using fallback run_sat_latency_query - no data will be returned")
return []
# Set default functions to fallbacks
get_canonical_id = fallback_get_canonical_id
get_all_variants = fallback_get_all_variants
run_sat_latency_query = fallback_run_sat_latency_query
# Try to import the real functions
try:
# Log system information
logger.info("System information:")
whoami_output = subprocess.check_output(["whoami"], stderr=subprocess.STDOUT).decode().strip()
logger.info(f"Current user from whoami: {whoami_output}")
# Check script permissions
logger.info(f"Script permissions: {oct(os.stat(__file__).st_mode)}")
# Add possible module locations to the path
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
# Check if we can write to tmp directory
tmp_test_path = "/tmp/data_py_test.txt"
try:
with open(tmp_test_path, 'w') as f:
f.write("Test")
os.remove(tmp_test_path)
logger.info("Successfully wrote to and removed test file in /tmp")
except Exception as e:
logger.error(f"Failed to write to /tmp: {str(e)}")
# Also try parent directory
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
# Check if sudo is available
try:
sudo_test = subprocess.run(["sudo", "-l"], capture_output=True, text=True)
logger.info(f"Sudo permissions: {sudo_test.stdout}")
if sudo_test.returncode != 0:
logger.error(f"Sudo test failed: {sudo_test.stderr}")
except Exception as e:
logger.error(f"Error checking sudo permissions: {str(e)}")
except Exception as e:
logger.error(f"Error during environment checks: {str(e)}")
# Import functions from our shared module
# Adjust the path as needed to find the module
try:
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
logger.info(f"Looking for sat_db_functions in: {os.path.dirname(os.path.abspath(__file__))}")
logger.info(f"Looking for sat_db_functions in: {current_dir} and {parent_dir}")
# List directory contents to verify module existence
dir_contents = os.listdir(os.path.dirname(os.path.abspath(__file__)))
dir_contents = os.listdir(current_dir)
logger.info(f"Directory contents: {dir_contents}")
from sat_db_functions import run_sat_latency_query, get_canonical_id, get_all_variants
# Try to import the module
import sat_db_functions
# If successful, override the fallback functions
get_canonical_id = sat_db_functions.get_canonical_id
get_all_variants = sat_db_functions.get_all_variants
run_sat_latency_query = sat_db_functions.run_sat_latency_query
logger.info("Successfully imported from sat_db_functions")
except ImportError as e:
logger.error(f"Import error: {str(e)}")
logger.error(traceback.format_exc())
logger.error("Will use fallback functions that provide limited functionality")
except Exception as e:
logger.error(f"Unexpected error during import: {str(e)}")
logger.error(traceback.format_exc())
logger.error("Will use fallback functions that provide limited functionality")
def data_endpoint():
"""
......@@ -189,7 +195,8 @@ def data_endpoint():
# Convert timestamps to string for JSON serialization
if 'start_time' in df.columns:
logger.info("Converting timestamps...")
df['start_time'] = pd.to_datetime(df['start_time']).astype(str)
# Most flexible approach:
df['start_time'] = pd.to_datetime(df['start_time'], format='mixed', errors='coerce').astype(str)
# Convert to records and handle NaN values
logger.info("Converting to records...")
......
This diff is collapsed.
#!/home/ygao/latency/venv/bin/python
#!/home/ygao/miniconda/envs/py39env/bin/python
import cgi
import json
import os
......
#!/home/ygao/latency/venv/bin/python
#!/home/ygao/miniconda/envs/py39env/bin/python
import os
import json
import subprocess
import logging
import pandas as pd
from datetime import datetime, timezone,timedelta
from sat_latency.interface import satellite_data_from_filters
# Custom JSON encoder to handle datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, pd.Timestamp)):
return obj.isoformat()
return super().default(obj)
# Set up logging
logger = logging.getLogger()
# Path to conda environment and database
CONDA_ENV_PATH = "/home/oper/.mdrexler_conda"
# Path to database
SATELLITE_DATA_DIR = "/data/sat_latency" # Path to your latency database
RELATIONSHIPS_FILE = "satellite_relationships.json" # Path to your prebuilt relationships file
......@@ -154,7 +161,7 @@ def load_relationship_data():
def run_sat_latency_query(start_time, end_time, filters=None):
"""
Directly query the satellite latency database using sat_latency_interface
Query the satellite latency database using sat_latency.interface package
Args:
start_time (str): Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
......@@ -164,99 +171,136 @@ def run_sat_latency_query(start_time, end_time, filters=None):
Returns:
list: List of latency records as dictionaries
"""
# Expand satellite IDs to include all variants
if filters and "satellite-id" in filters:
satellite_id = filters["satellite-id"]
# Handle comma-separated list of satellites
if isinstance(satellite_id, str) and ',' in satellite_id:
satellite_ids = [s.strip() for s in satellite_id.split(',')]
expanded_ids = []
for sat_id in satellite_ids:
# Get the canonical form
canonical_id = get_canonical_id(sat_id)
# Add all variants of this canonical ID
expanded_ids.extend(get_all_variants(canonical_id))
# Remove duplicates
expanded_ids = list(set(expanded_ids))
filters["satellite-id"] = expanded_ids
# Handle single satellite ID
elif isinstance(satellite_id, str):
canonical_id = get_canonical_id(satellite_id)
filters["satellite-id"] = get_all_variants(canonical_id)
# Build the command exactly as specified
base_cmd = "module load miniconda/3.6-base && source activate ~/.mdrexler_conda && sat_latency_interface"
# Add database path and time parameters
cmd = f"{base_cmd} -d {SATELLITE_DATA_DIR} --from '{start_time}' --until '{end_time}' --output-type json"
# Add filters if provided
if filters:
for key, values in filters.items():
if values and (isinstance(values, list) or isinstance(values, str)):
if isinstance(values, str):
values = [values]
filter_values = " ".join(f'"{v}"' for v in values if v)
if filter_values:
cmd += f" --{key} {filter_values}"
logger.info(f"Running command: {cmd}")
try:
# Create a temporary shell script to run the command
script_path = "/tmp/run_sat_latency.sh"
with open(script_path, 'w') as f:
f.write("#!/bin/bash\n")
f.write(cmd + "\n")
logger.info(f"Querying satellite latency data from {start_time} to {end_time}")
# Make the script executable
os.chmod(script_path, 0o755)
# Run the script using sudo as the oper user
sudo_cmd = ["sudo", "-u", "oper", "-i", script_path]
# Convert string ISO timestamps to datetime objects if they are strings
if isinstance(start_time, str):
start_datetime = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
else:
start_datetime = start_time
if isinstance(end_time, str):
end_datetime = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_datetime = end_time
logger.info(f"Executing: {' '.join(sudo_cmd)}")
# Ensure timezone is set
if start_datetime.tzinfo is None:
start_datetime = start_datetime.replace(tzinfo=timezone.utc)
if end_datetime.tzinfo is None:
end_datetime = end_datetime.replace(tzinfo=timezone.utc)
logger.info(f"Converted timestamps: {start_datetime} to {end_datetime}")
# Use PIPE for stdout and stderr
process = subprocess.Popen(
sudo_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
# Initialize filter parameters for the sat_latency API
satellite_ids = None
coverage = None
instrument = None
# Process filters
if filters:
# Expand satellite IDs to include all variants
if "satellite-id" in filters:
satellite_id = filters["satellite-id"]
# Handle list or comma-separated list of satellites
if isinstance(satellite_id, list):
expanded_ids = []
for sat_id in satellite_id:
canonical_id = get_canonical_id(sat_id)
expanded_ids.extend(get_all_variants(canonical_id))
# Remove duplicates
satellite_ids = list(set(expanded_ids))
elif isinstance(satellite_id, str) and ',' in satellite_id:
satellite_ids_list = [s.strip() for s in satellite_id.split(',')]
expanded_ids = []
for sat_id in satellite_ids_list:
canonical_id = get_canonical_id(sat_id)
expanded_ids.extend(get_all_variants(canonical_id))
satellite_ids = list(set(expanded_ids))
elif isinstance(satellite_id, str):
canonical_id = get_canonical_id(satellite_id)
satellite_ids = get_all_variants(canonical_id)
# Get coverage filter
if "coverage" in filters:
coverage_value = filters["coverage"]
# Convert coverage to a list if it's a string
if isinstance(coverage_value, str):
if ',' in coverage_value:
coverage = [c.strip() for c in coverage_value.split(',')]
else:
coverage = [coverage_value]
else:
coverage = coverage_value # Already a list or None
# Get instrument filter
if "instrument" in filters:
instrument_value = filters["instrument"]
# Convert instrument to a list if it's a string
if isinstance(instrument_value, str):
if ',' in instrument_value:
instrument = [i.strip() for i in instrument_value.split(',')]
else:
instrument = [instrument_value]
else:
instrument = instrument_value # Already a list or None
# Log the query parameters
logger.info(f"Query parameters: database={SATELLITE_DATA_DIR}, start_date={start_datetime}, end_date={end_datetime}")
logger.info(f"Filters: satellite_ids={satellite_ids}, coverage={coverage}, instrument={instrument}")
# Call the sat_latency.interface function
data = satellite_data_from_filters(
SATELLITE_DATA_DIR,
start_date=start_datetime,
end_date=end_datetime,
satellite_ids=satellite_ids,
coverages=coverage,
instruments=instrument
)
# Get the output and error
stdout, stderr = process.communicate()
# Check if the command was successful
if process.returncode != 0:
logger.error(f"Command failed with exit code {process.returncode}: {stderr}")
return []
# Log the first part of the output
if stdout:
logger.info(f"Command output (first 200 chars): {stdout[:200]}...")
# Convert result to a list of dictionaries for JSON serialization
if data is not None:
try:
# Convert Polars DataFrame to list of dictionaries
records = data.to_dicts()
# Process datetime objects for JSON serialization
processed_records = []
for record in records:
processed_record = {}
for key, value in record.items():
if isinstance(value, (datetime, pd.Timestamp)):
processed_record[key] = value.isoformat()
else:
processed_record[key] = value
processed_records.append(processed_record)
logger.info(f"Successfully converted data: {len(processed_records)} records found")
return processed_records
except Exception as e:
logger.error(f"Error converting Polars DataFrame to dict: {str(e)}")
# Fallback method if to_dicts() is not available
try:
pandas_df = data.to_pandas()
# Convert datetime columns to strings
for col in pandas_df.select_dtypes(include=['datetime64']).columns:
pandas_df[col] = pandas_df[col].astype(str)
records = pandas_df.to_dict(orient='records')
logger.info(f"Successfully converted data via pandas: {len(records)} records found")
return records
except Exception as e2:
logger.error(f"Error in pandas conversion fallback: {str(e2)}")
return []
else:
logger.warning("Command returned empty output")
# Parse the JSON output
try:
data = json.loads(stdout)
logger.info(f"Successfully parsed JSON data: {len(data)} records found")
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON output: {e}")
logger.error(f"Raw output (first 500 chars): {stdout[:500]}...")
logger.warning("Query returned None")
return []
except Exception as e:
logger.error(f"Error executing command: {str(e)}")
return []
finally:
# Clean up temporary script
if os.path.exists(script_path):
os.remove(script_path)
\ No newline at end of file
logger.error(f"Error executing satellite latency query: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []
\ No newline at end of file
#!/home/ygao/latency/venv/bin/python
#!/home/ygao/miniconda/envs/py39env/bin/python
import cgi
import json
import sys
......
#!/home/ygao/miniconda/envs/py39env/bin/python
"""
CGI script to test satellite data access with proper JSON serialization
"""
import cgi
import cgitb
import os
import sys
import json
from datetime import datetime, timezone, timedelta
import traceback
# Enable CGI error reporting to browser
cgitb.enable()
# Set up basic logging to a specific file
import logging
LOG_FILE = "./test_log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename=LOG_FILE,
filemode='a'
)
logger = logging.getLogger()
# Log script start and environment
logger.info("=" * 80)
logger.info("Test CGI script starting")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Script path: {os.path.abspath(__file__)}")
logger.info(f"Python executable: {sys.executable}")
logger.info(f"Python version: {sys.version}")
logger.info(f"User running script: {os.getenv('USER') or 'Unknown'}")
logger.info(f"Environment variables: {dict(os.environ)}")
def check_imports():
"""Test importing required modules and log results"""
modules_to_check = [
'pandas',
'sat_latency.interface'
]
results = {}
for module_name in modules_to_check:
try:
logger.info(f"Attempting to import {module_name}...")
if module_name == 'pandas':
import pandas
results[module_name] = f"Success (version: {pandas.__version__})"
elif module_name == 'sat_latency.interface':
from sat_latency.interface import satellite_data_from_filters
results[module_name] = "Success"
except Exception as e:
logger.error(f"Failed to import {module_name}: {str(e)}")
results[module_name] = f"Error: {str(e)}"
return results
def check_directory_permissions():
"""Check if the current user has access to the satellite data directory"""
data_dir = "/data/sat_latency"
# Get current user info
current_user = os.getenv('USER') or "Unknown"
try:
import pwd
uid = os.getuid()
current_user = pwd.getpwuid(uid).pw_name
except:
pass
# Check if directory exists
dir_exists = os.path.exists(data_dir)
logger.info(f"Directory exists: {dir_exists}")
# Check if it's readable
dir_readable = os.access(data_dir, os.R_OK)
logger.info(f"Directory readable: {dir_readable}")
# Get directory stats if possible
try:
dir_stat = os.stat(data_dir)
dir_mode = dir_stat.st_mode
dir_perms = oct(dir_mode & 0o777)
logger.info(f"Directory permissions: {dir_perms}")
try:
import pwd, grp
dir_owner = pwd.getpwuid(dir_stat.st_uid).pw_name
dir_group = grp.getgrgid(dir_stat.st_gid).gr_name
logger.info(f"Directory owner: {dir_owner}")
logger.info(f"Directory group: {dir_group}")
except:
pass
except Exception as e:
logger.error(f"Error getting directory stats: {str(e)}")
# Try to list directory contents
try:
dir_contents = os.listdir(data_dir)
logger.info(f"Successfully listed directory with {len(dir_contents)} items")
logger.info(f"First few items: {dir_contents[:5] if len(dir_contents) > 0 else 'empty'}")
return {
"success": True,
"user": current_user,
"exists": dir_exists,
"readable": dir_readable,
"items_count": len(dir_contents)
}
except Exception as e:
logger.error(f"Failed to list directory contents: {str(e)}")
return {
"success": False,
"user": current_user,
"exists": dir_exists,
"readable": dir_readable,
"error": str(e)
}
def run_diagnostics():
"""Run all diagnostic tests and return results"""
results = {
"timestamp": datetime.now().isoformat(),
"import_checks": None,
"directory_permissions": None,
"success": False,
"error": None
}
try:
# Check imports
results["import_checks"] = check_imports()
# Check directory permissions
results["directory_permissions"] = check_directory_permissions()
# Consider test successful if we got this far
results["success"] = True
except Exception as e:
logger.error(f"Error in diagnostics: {str(e)}")
logger.error(traceback.format_exc())
results["error"] = str(e)
return results
# Main CGI handler
if __name__ == "__main__":
try:
# Print HTTP headers
print("Content-Type: application/json")
print("Cache-Control: no-cache")
print() # Empty line after headers
# Run diagnostics
results = run_diagnostics()
# Custom JSON encoder to handle datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
# Output JSON response using custom encoder
print(json.dumps(results, cls=DateTimeEncoder, indent=2))
except Exception as e:
# Handle any final errors
error_response = {
"success": False,
"message": f"Critical error: {str(e)}",
"error_type": type(e).__name__,
"traceback": traceback.format_exc()
}
# Log the error
logger.error(f"Critical error in main block: {str(e)}")
logger.error(traceback.format_exc())
try:
# Try to output JSON response
print(json.dumps(error_response))
except:
# If that fails, output plain text
print(f"Critical error: {str(e)}")
\ No newline at end of file
This diff is collapsed.