Manual Gateway Certificate Downloader (Obsolete)
Note: This script is obsolete now. It was used prior to the creation of the https://wiki.inside.milvian.group/wiki/spaces/SE/pages/607354910
Overview
This Python script downloads gateway certificates from S3 bucket in AWS Prod Hub account (172670235729).
What This Script Does
Downloads all gateway certificates from the S3 bucket location
metering-project-files-us-east-1-172670235729/InboundConverts
.cerfiles to.pemwhen appropriateCreates a timestamped ZIP archive
Uploads the ZIP back to S3
metering-project-files-us-east-1-172670235729/all_certificatesProvides detailed logging of all operations
Prerequisites (When running through AWS CloudShell)
Access to AWS CloudShell
Necessary permissions for read/write on the S3 bucket using the role ‘MilvianRole’. GREF team controls the permissions.
"""
Gateway Certificate DownloaderPyt
"""
import boto3
import os
import zipfile
from pathlib import Path
import logging
import concurrent.futures
import sys
import datetime
from typing import List, Tuple
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configuration
BUCKET_NAME = "metering-project-files-us-east-1-172670235729"
PREFIX = "Inbound/"
OUTPUT_DIR = "./gateway_certs"
MAX_WORKERS = 10
# Generate timestamp for zip file
TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
ZIP_FILE = f"gateway_certificates_{TIMESTAMP}.zip"
S3_UPLOAD_PATH = f"all_certificates/gateway_certificates_{TIMESTAMP}.zip"
def create_directories():
"""Create necessary directories"""
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory: {OUTPUT_DIR}")
def list_gateway_folders() -> List[str]:
"""List all gateway EUI folders in S3"""
s3 = boto3.client('s3')
try:
response = s3.list_objects_v2(
Bucket=BUCKET_NAME,
Prefix=PREFIX,
Delimiter='/'
)
gateway_folders = []
if 'CommonPrefixes' in response:
for prefix in response['CommonPrefixes']:
folder_name = prefix['Prefix'].split('/')[-2]
if folder_name:
gateway_folders.append(folder_name)
logger.info(f"Found {len(gateway_folders)} gateway folders")
return gateway_folders
except Exception as e:
logger.error(f"Error listing gateway folders: {e}")
return []
def list_files_for_gateway(gateway_eui: str) -> List[str]:
"""List all files for a specific gateway (for dry run)"""
s3 = boto3.client('s3')
try:
response = s3.list_objects_v2(
Bucket=BUCKET_NAME,
Prefix=f"{PREFIX}{gateway_eui}/"
)
files = []
if 'Contents' in response:
for obj in response['Contents']:
file_name = obj['Key'].split('/')[-1]
# Skip directories
if file_name and not obj['Key'].endswith('/'):
files.append(file_name)
return files
except Exception as e:
logger.error(f"Error listing files for gateway {gateway_eui}: {e}")
return []
def convert_cer_to_pem(cer_path: Path, gateway_dir: Path, file_name: str) -> bool:
"""Convert .cer file to .pem if it's a PEM certificate"""
try:
with open(cer_path, 'r') as f:
content = f.read()
if '-----BEGIN CERTIFICATE-----' in content:
# It's a PEM certificate, rename to .pem
pem_name = file_name.replace('.cer', '.pem')
pem_path = gateway_dir / pem_name
with open(pem_path, 'w') as pem_file:
pem_file.write(content)
logger.info(f"Converted {file_name} to {pem_name} (PEM certificate)")
return True
else:
logger.warning(f"{file_name} doesn't appear to be a PEM certificate")
return False
except Exception as e:
logger.error(f"Error converting {file_name}: {e}")
return False
def download_gateway_files(gateway_eui: str) -> Tuple[bool, str]:
"""Download all files for a specific gateway - idempotent"""
s3 = boto3.client('s3')
gateway_dir = Path(OUTPUT_DIR) / gateway_eui
# Create directory if it doesn't exist, or use existing one
try:
gateway_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Using directory for gateway {gateway_eui}: {gateway_dir}")
except Exception as e:
return False, f"Error creating directory for gateway {gateway_eui}: {e}"
try:
response = s3.list_objects_v2(
Bucket=BUCKET_NAME,
Prefix=f"{PREFIX}{gateway_eui}/"
)
if 'Contents' not in response:
return False, f"No files found for gateway {gateway_eui}"
downloaded_files = []
updated_files = []
for obj in response['Contents']:
file_key = obj['Key']
file_name = file_key.split('/')[-1]
# Skip directories
if file_name == '' or file_key.endswith('/'):
continue
local_path = gateway_dir / file_name
# Check if file already exists
if local_path.exists():
logger.info(f"File {file_name} already exists for gateway {gateway_eui}, updating...")
updated_files.append(file_name)
else:
logger.info(f"Downloading new file {file_name} for gateway {gateway_eui}")
downloaded_files.append(file_name)
# Download/update the file
s3.download_file(BUCKET_NAME, file_key, str(local_path))
# Handle .cer/.pem conversion
if file_name.endswith('.cer'):
success = convert_cer_to_pem(local_path, gateway_dir, file_name)
if success:
pem_name = file_name.replace('.cer', '.pem')
if file_name in downloaded_files:
downloaded_files.append(pem_name)
else:
updated_files.append(pem_name)
total_files = len(downloaded_files) + len(updated_files)
if downloaded_files and updated_files:
message = f"Downloaded {len(downloaded_files)} new files, updated {len(updated_files)} existing files for gateway {gateway_eui}"
elif downloaded_files:
message = f"Downloaded {len(downloaded_files)} files for gateway {gateway_eui}"
elif updated_files:
message = f"Updated {len(updated_files)} files for gateway {gateway_eui}"
else:
message = f"No files processed for gateway {gateway_eui}"
return True, message
except Exception as e:
return False, f"Error downloading files for gateway {gateway_eui}: {e}"
def download_gateways_parallel(gateway_folders: List[str]) -> Tuple[int, int]:
"""Download all gateways using parallel processing"""
successful_downloads = 0
failed_downloads = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_gateway = {
executor.submit(download_gateway_files, gateway): gateway
for gateway in gateway_folders
}
for future in concurrent.futures.as_completed(future_to_gateway):
gateway = future_to_gateway[future]
try:
success, message = future.result()
if success:
successful_downloads += 1
logger.info(f"✅ {message}")
else:
failed_downloads += 1
logger.error(f"❌ {message}")
except Exception as e:
failed_downloads += 1
logger.error(f"❌ Exception for gateway {gateway}: {e}")
return successful_downloads, failed_downloads
def create_zip_archive() -> bool:
"""Create a zip file of all downloaded certificates"""
try:
with zipfile.ZipFile(ZIP_FILE, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(OUTPUT_DIR):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(OUTPUT_DIR)
zipf.write(file_path, arcname)
logger.info(f"Created zip archive: {ZIP_FILE}")
return True
except Exception as e:
logger.error(f"Error creating zip archive: {e}")
return False
def upload_zip_to_s3() -> bool:
"""Upload zip file to S3 with timestamp"""
try:
s3 = boto3.client('s3')
# Upload the zip file to S3
s3.upload_file(ZIP_FILE, BUCKET_NAME, S3_UPLOAD_PATH)
# Get the S3 URL
s3_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{S3_UPLOAD_PATH}"
logger.info(f"✅ Uploaded zip file to S3: {S3_UPLOAD_PATH}")
logger.info(f"📎 S3 URL: {s3_url}")
logger.info(f"📁 Local file: {ZIP_FILE}")
return True
except Exception as e:
logger.error(f"Error uploading zip to S3: {e}")
return False
def print_file_structure():
"""Print the file structure of downloaded files"""
logger.info("📁 File structure:")
for root, dirs, files in os.walk(OUTPUT_DIR):
level = root.replace(OUTPUT_DIR, '').count(os.sep)
indent = ' ' * 2 * level
logger.info(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files:
logger.info(f"{subindent}{file}")
def print_summary(gateway_folders: List[str], successful_downloads: int, failed_downloads: int):
"""Print summary of the download process"""
logger.info("📊 Summary:")
logger.info(f" Total gateways: {len(gateway_folders)}")
logger.info(f" Successful downloads: {successful_downloads}")
logger.info(f" Failed downloads: {failed_downloads}")
logger.info(f" Output directory: {OUTPUT_DIR}")
logger.info(f" Local zip file: {ZIP_FILE}")
logger.info(f" S3 zip file: {S3_UPLOAD_PATH}")
def dry_run():
"""Show what would be downloaded without actually downloading"""
logger.info("🔍 DRY RUN MODE - No files will be downloaded")
gateway_folders = list_gateway_folders()
if not gateway_folders:
logger.error("No gateway folders found!")
return
logger.info(f"Would download files for {len(gateway_folders)} gateways:")
for i, gateway_eui in enumerate(gateway_folders, 1):
logger.info(f" {i}. {gateway_eui}")
# List files that would be downloaded
files = list_files_for_gateway(gateway_eui)
if files:
for file_name in files:
logger.info(f" - {file_name}")
else:
logger.info(f" - No files found")
def main():
"""Main function"""
# Check for dry run flag
if len(sys.argv) > 1 and sys.argv[1] == '--dry-run':
dry_run()
return
logger.info("🚀 Starting Gateway Certificate Downloader")
logger.info(f"📅 Timestamp: {TIMESTAMP}")
# Create directories
create_directories()
# List all gateway folders
gateway_folders = list_gateway_folders()
if not gateway_folders:
logger.error("No gateway folders found!")
return
# Download all gateways
successful_downloads, failed_downloads = download_gateways_parallel(gateway_folders)
# Create zip archive
logger.info("📦 Creating zip archive...")
create_zip_archive()
# Upload zip to S3
logger.info("☁️ Uploading zip to S3...")
upload_zip_to_s3()
# Print summary
print_summary(gateway_folders, successful_downloads, failed_downloads)
# Print file structure
print_file_structure()
if __name__ == "__main__":
main()