# Simple FastAPI Data Collector # Purpose: Accept simple comma-separated input and save to text files # Usage: curl -X POST http://localhost:8000/api/run1/ -d "host_a,is ok" from fastapi import FastAPI, HTTPException, Request, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from datetime import datetime from pathlib import Path import re import bleach import os from dotenv import load_dotenv # Load environment variables load_dotenv() # Initialize FastAPI application app = FastAPI( title="Simple Data Collector", version="1.0.0", description="Simple API for collecting comma-separated data" ) # Define the base directory for input files INPUT_DIR = Path("input") INPUT_DIR.mkdir(exist_ok=True) # Create directory if it doesn't exist # Load API keys from environment variables (with fallback for backward compatibility) INPUT_API_KEYS = [ key.strip() for key in os.getenv("INPUT_API_KEYS").split(",") if key.strip() ] READ_API_KEYS = [ key.strip() for key in os.getenv("READ_API_KEYS").split(",") if key.strip() ] # Validate that we have at least one key for each operation if not INPUT_API_KEYS: raise ValueError("At least one INPUT_API_KEY must be configured") if not READ_API_KEYS: raise ValueError("At least one READ_API_KEY must be configured") # Security schemes input_security = HTTPBearer() read_security = HTTPBearer() def verify_input_token(credentials: HTTPAuthorizationCredentials = Depends(input_security)): """Verify bearer token for input operations (supports multiple API keys)""" if credentials.credentials not in INPUT_API_KEYS: raise HTTPException( status_code=401, detail="Invalid authentication token for input operations" ) return credentials def verify_read_token(credentials: HTTPAuthorizationCredentials = Depends(read_security)): """Verify bearer token for read operations (supports multiple API keys)""" if credentials.credentials not in READ_API_KEYS: raise HTTPException( status_code=401, detail="Invalid authentication token for read operations" ) return credentials def sanitize_input(data: str) -> str: """ Sanitize input data using bleach to prevent malicious content. Args: data (str): Raw input data Returns: str: Sanitized data safe for storage """ if not data: return "" # Clean with bleach - strip all HTML tags and dangerous content sanitized = bleach.clean(data, tags=[], attributes={}, strip=True) # Remove control characters except newlines and tabs sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', sanitized) # Normalize whitespace sanitized = re.sub(r'\s+', ' ', sanitized).strip() # Limit length to prevent DoS max_length = 1000 if len(sanitized) > max_length: sanitized = sanitized[:max_length] + "... [truncated]" return sanitized def sanitize_run_name(run_name: str) -> str: """ Sanitize run name to ensure it's safe for filesystem operations. Args: run_name (str): Raw run name Returns: str: Sanitized run name safe for filesystem """ if not run_name: raise HTTPException(status_code=400, detail="Run name cannot be empty") # Use bleach to clean any potential HTML/script content sanitized = bleach.clean(run_name, tags=[], attributes={}, strip=True) # Remove dangerous filesystem characters sanitized = re.sub(r'[^\w\-.]', '_', sanitized) # Remove leading/trailing dots and dashes sanitized = sanitized.strip('.-') # Prevent reserved names reserved_names = ['con', 'prn', 'aux', 'nul', 'com1', 'com2', 'com3', 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9'] if sanitized.lower() in reserved_names: sanitized = f"safe_{sanitized}" # Limit length if len(sanitized) > 50: sanitized = sanitized[:50] if not sanitized: raise HTTPException(status_code=400, detail="Invalid run name after sanitization") return sanitized def save_to_file(run_name: str, data: str): """ Save data as a single line to input/{run_name}/results.txt Args: run_name (str): Name of the run (creates subdirectory) data (str): The data to save as a single line """ # Create run-specific directory run_dir = INPUT_DIR / run_name run_dir.mkdir(exist_ok=True) # Define the results file path results_file = run_dir / "results.txt" # Get current timestamp timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Append the data with timestamp to the file with open(results_file, "a", encoding="utf-8") as f: f.write(f"{timestamp} - {data.strip()}\n") @app.get("/") async def root(): """Root endpoint providing API information""" return { "message": "Simple Data Collector API", "version": "1.0.0", "usage": "POST /api/{run_name}/ with data in body", "example": "curl -X POST http://localhost:8000/api/run1/ -d 'host_a,is ok'", "output": "Data saved to input/{run_name}/results.txt" } @app.post("/api/{run_name}/") async def collect_data(run_name: str, request: Request, token: HTTPAuthorizationCredentials = Depends(verify_input_token)): """ Collect simple data and save to text file. Args: run_name (str): Name of the run (creates input/{run_name}/ directory) request: Raw request body containing the data Returns: dict: Confirmation message Example: curl -X POST http://localhost:8000/api/run1/ -d "host_a,is ok" -> Saves to input/run1/results.txt """ # Read the raw body data body = await request.body() data = body.decode("utf-8") # Validate that we have some data if not data.strip(): raise HTTPException(status_code=400, detail="No data provided") # Sanitize input data and run name sanitized_data = sanitize_input(data) sanitized_run_name = sanitize_run_name(run_name) # Save to file save_to_file(sanitized_run_name, sanitized_data) # Return confirmation return { "message": "Data saved successfully", "run_name": sanitized_run_name, "data": sanitized_data, "saved_to": f"input/{sanitized_run_name}/results.txt", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } @app.get("/results/{run_name}/") async def get_results(run_name: str, token: HTTPAuthorizationCredentials = Depends(verify_read_token)): """ Retrieve all results for a specific run. Args: run_name (str): Name of the run to get results for Returns: dict: All lines from the results file """ # Sanitize run name for safe filesystem access sanitized_run_name = sanitize_run_name(run_name) results_file = INPUT_DIR / sanitized_run_name / "results.txt" if not results_file.exists(): raise HTTPException( status_code=404, detail=f"No results found for run '{sanitized_run_name}'" ) # Read all lines from the file with open(results_file, "r", encoding="utf-8") as f: lines = f.readlines() return { "run_name": sanitized_run_name, "total_entries": len(lines), "results": [line.strip() for line in lines] } @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "service": "Simple Data Collector", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # Application entry point if __name__ == "__main__": import uvicorn print("Starting Simple Data Collector API...") print("Server will be available at: http://localhost:8000") print("API documentation at: http://localhost:8000/docs") print("Example: curl -X POST http://localhost:8000/api/run1/ -d 'host_a,is ok'") print("Results saved to: ./input/{run_name}/results.txt") uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info" )