Manual Gateway Certificate Downloader (Obsolete)

Manual Gateway Certificate Downloader (Obsolete)

Note: This script is obsolete now. It was used prior to the creation of the https://wiki.inside.milvian.group/wiki/spaces/SE/pages/607354910

Overview
This Python script downloads gateway certificates from S3 bucket in AWS Prod Hub account (172670235729).

What This Script Does

  • Downloads all gateway certificates from the S3 bucket location metering-project-files-us-east-1-172670235729/Inbound

  • Converts .cer files to .pem when appropriate

  • Creates a timestamped ZIP archive

  • Uploads the ZIP back to S3 metering-project-files-us-east-1-172670235729/all_certificates

  • Provides detailed logging of all operations

Prerequisites (When running through AWS CloudShell)

  • Access to AWS CloudShell

  • Necessary permissions for read/write on the S3 bucket using the role ‘MilvianRole’. GREF team controls the permissions.

""" Gateway Certificate DownloaderPyt """ import boto3 import os import zipfile from pathlib import Path import logging import concurrent.futures import sys import datetime from typing import List, Tuple # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Configuration BUCKET_NAME = "metering-project-files-us-east-1-172670235729" PREFIX = "Inbound/" OUTPUT_DIR = "./gateway_certs" MAX_WORKERS = 10 # Generate timestamp for zip file TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") ZIP_FILE = f"gateway_certificates_{TIMESTAMP}.zip" S3_UPLOAD_PATH = f"all_certificates/gateway_certificates_{TIMESTAMP}.zip" def create_directories(): """Create necessary directories""" Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True) logger.info(f"Created directory: {OUTPUT_DIR}") def list_gateway_folders() -> List[str]: """List all gateway EUI folders in S3""" s3 = boto3.client('s3') try: response = s3.list_objects_v2( Bucket=BUCKET_NAME, Prefix=PREFIX, Delimiter='/' ) gateway_folders = [] if 'CommonPrefixes' in response: for prefix in response['CommonPrefixes']: folder_name = prefix['Prefix'].split('/')[-2] if folder_name: gateway_folders.append(folder_name) logger.info(f"Found {len(gateway_folders)} gateway folders") return gateway_folders except Exception as e: logger.error(f"Error listing gateway folders: {e}") return [] def list_files_for_gateway(gateway_eui: str) -> List[str]: """List all files for a specific gateway (for dry run)""" s3 = boto3.client('s3') try: response = s3.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{PREFIX}{gateway_eui}/" ) files = [] if 'Contents' in response: for obj in response['Contents']: file_name = obj['Key'].split('/')[-1] # Skip directories if file_name and not obj['Key'].endswith('/'): files.append(file_name) return files except Exception as e: logger.error(f"Error listing files for gateway {gateway_eui}: {e}") return [] def convert_cer_to_pem(cer_path: Path, gateway_dir: Path, file_name: str) -> bool: """Convert .cer file to .pem if it's a PEM certificate""" try: with open(cer_path, 'r') as f: content = f.read() if '-----BEGIN CERTIFICATE-----' in content: # It's a PEM certificate, rename to .pem pem_name = file_name.replace('.cer', '.pem') pem_path = gateway_dir / pem_name with open(pem_path, 'w') as pem_file: pem_file.write(content) logger.info(f"Converted {file_name} to {pem_name} (PEM certificate)") return True else: logger.warning(f"{file_name} doesn't appear to be a PEM certificate") return False except Exception as e: logger.error(f"Error converting {file_name}: {e}") return False def download_gateway_files(gateway_eui: str) -> Tuple[bool, str]: """Download all files for a specific gateway - idempotent""" s3 = boto3.client('s3') gateway_dir = Path(OUTPUT_DIR) / gateway_eui # Create directory if it doesn't exist, or use existing one try: gateway_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Using directory for gateway {gateway_eui}: {gateway_dir}") except Exception as e: return False, f"Error creating directory for gateway {gateway_eui}: {e}" try: response = s3.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{PREFIX}{gateway_eui}/" ) if 'Contents' not in response: return False, f"No files found for gateway {gateway_eui}" downloaded_files = [] updated_files = [] for obj in response['Contents']: file_key = obj['Key'] file_name = file_key.split('/')[-1] # Skip directories if file_name == '' or file_key.endswith('/'): continue local_path = gateway_dir / file_name # Check if file already exists if local_path.exists(): logger.info(f"File {file_name} already exists for gateway {gateway_eui}, updating...") updated_files.append(file_name) else: logger.info(f"Downloading new file {file_name} for gateway {gateway_eui}") downloaded_files.append(file_name) # Download/update the file s3.download_file(BUCKET_NAME, file_key, str(local_path)) # Handle .cer/.pem conversion if file_name.endswith('.cer'): success = convert_cer_to_pem(local_path, gateway_dir, file_name) if success: pem_name = file_name.replace('.cer', '.pem') if file_name in downloaded_files: downloaded_files.append(pem_name) else: updated_files.append(pem_name) total_files = len(downloaded_files) + len(updated_files) if downloaded_files and updated_files: message = f"Downloaded {len(downloaded_files)} new files, updated {len(updated_files)} existing files for gateway {gateway_eui}" elif downloaded_files: message = f"Downloaded {len(downloaded_files)} files for gateway {gateway_eui}" elif updated_files: message = f"Updated {len(updated_files)} files for gateway {gateway_eui}" else: message = f"No files processed for gateway {gateway_eui}" return True, message except Exception as e: return False, f"Error downloading files for gateway {gateway_eui}: {e}" def download_gateways_parallel(gateway_folders: List[str]) -> Tuple[int, int]: """Download all gateways using parallel processing""" successful_downloads = 0 failed_downloads = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_gateway = { executor.submit(download_gateway_files, gateway): gateway for gateway in gateway_folders } for future in concurrent.futures.as_completed(future_to_gateway): gateway = future_to_gateway[future] try: success, message = future.result() if success: successful_downloads += 1 logger.info(f"✅ {message}") else: failed_downloads += 1 logger.error(f"❌ {message}") except Exception as e: failed_downloads += 1 logger.error(f"❌ Exception for gateway {gateway}: {e}") return successful_downloads, failed_downloads def create_zip_archive() -> bool: """Create a zip file of all downloaded certificates""" try: with zipfile.ZipFile(ZIP_FILE, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(OUTPUT_DIR): for file in files: file_path = Path(root) / file arcname = file_path.relative_to(OUTPUT_DIR) zipf.write(file_path, arcname) logger.info(f"Created zip archive: {ZIP_FILE}") return True except Exception as e: logger.error(f"Error creating zip archive: {e}") return False def upload_zip_to_s3() -> bool: """Upload zip file to S3 with timestamp""" try: s3 = boto3.client('s3') # Upload the zip file to S3 s3.upload_file(ZIP_FILE, BUCKET_NAME, S3_UPLOAD_PATH) # Get the S3 URL s3_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{S3_UPLOAD_PATH}" logger.info(f"✅ Uploaded zip file to S3: {S3_UPLOAD_PATH}") logger.info(f"📎 S3 URL: {s3_url}") logger.info(f"📁 Local file: {ZIP_FILE}") return True except Exception as e: logger.error(f"Error uploading zip to S3: {e}") return False def print_file_structure(): """Print the file structure of downloaded files""" logger.info("📁 File structure:") for root, dirs, files in os.walk(OUTPUT_DIR): level = root.replace(OUTPUT_DIR, '').count(os.sep) indent = ' ' * 2 * level logger.info(f"{indent}{os.path.basename(root)}/") subindent = ' ' * 2 * (level + 1) for file in files: logger.info(f"{subindent}{file}") def print_summary(gateway_folders: List[str], successful_downloads: int, failed_downloads: int): """Print summary of the download process""" logger.info("📊 Summary:") logger.info(f" Total gateways: {len(gateway_folders)}") logger.info(f" Successful downloads: {successful_downloads}") logger.info(f" Failed downloads: {failed_downloads}") logger.info(f" Output directory: {OUTPUT_DIR}") logger.info(f" Local zip file: {ZIP_FILE}") logger.info(f" S3 zip file: {S3_UPLOAD_PATH}") def dry_run(): """Show what would be downloaded without actually downloading""" logger.info("🔍 DRY RUN MODE - No files will be downloaded") gateway_folders = list_gateway_folders() if not gateway_folders: logger.error("No gateway folders found!") return logger.info(f"Would download files for {len(gateway_folders)} gateways:") for i, gateway_eui in enumerate(gateway_folders, 1): logger.info(f" {i}. {gateway_eui}") # List files that would be downloaded files = list_files_for_gateway(gateway_eui) if files: for file_name in files: logger.info(f" - {file_name}") else: logger.info(f" - No files found") def main(): """Main function""" # Check for dry run flag if len(sys.argv) > 1 and sys.argv[1] == '--dry-run': dry_run() return logger.info("🚀 Starting Gateway Certificate Downloader") logger.info(f"📅 Timestamp: {TIMESTAMP}") # Create directories create_directories() # List all gateway folders gateway_folders = list_gateway_folders() if not gateway_folders: logger.error("No gateway folders found!") return # Download all gateways successful_downloads, failed_downloads = download_gateways_parallel(gateway_folders) # Create zip archive logger.info("📦 Creating zip archive...") create_zip_archive() # Upload zip to S3 logger.info("☁️ Uploading zip to S3...") upload_zip_to_s3() # Print summary print_summary(gateway_folders, successful_downloads, failed_downloads) # Print file structure print_file_structure() if __name__ == "__main__": main()