Compare commits
	
		
			4 commits
		
	
	
		
			6ce10f673e
			...
			616bb8d014
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 616bb8d014 | |||
| a7cc072777 | |||
| 7db919bcb7 | |||
| fc72f6f51c | 
					 4 changed files with 531 additions and 169 deletions
				
			
		
							
								
								
									
										57
									
								
								README.md
									
										
									
									
									
								
							
							
						
						
									
										57
									
								
								README.md
									
										
									
									
									
								
							|  | @ -85,11 +85,39 @@ Where columns are: | |||
| 4. Record Type (A, AAAA, MX, CNAME, TXT, etc.) | ||||
| 5. Record Data (IP address, hostname, or other data depending on record type) | ||||
| 
 | ||||
| ## Domain Base Name Detection | ||||
| 
 | ||||
| The application includes functionality to identify base domains from fully qualified domain names, including handling of multi-part TLDs like ".co.uk" or ".com.au". | ||||
| 
 | ||||
| ### Multi-Part TLD List | ||||
| 
 | ||||
| The application uses a hardcoded list of common multi-part TLDs to correctly extract base domains (e.g., "example.co.uk" from "mail.example.co.uk"). | ||||
| 
 | ||||
| This list can be found in `main.py` as `MULTI_PART_TLDS`. | ||||
| 
 | ||||
| ### Updating the TLD List | ||||
| 
 | ||||
| To ensure accurate domain parsing, you should periodically update the multi-part TLD list. The best sources for this information are: | ||||
| 
 | ||||
| 1. **Public Suffix List (PSL)**: The most comprehensive and authoritative source | ||||
|    - Website: https://publicsuffix.org/list/ | ||||
|    - GitHub: https://github.com/publicsuffix/list | ||||
|    - This list is maintained by Mozilla and used by browsers and DNS applications | ||||
| 
 | ||||
| 2. **IANA's TLD Database**: The official registry of top-level domains | ||||
|    - Website: https://www.iana.org/domains/root/db | ||||
| 
 | ||||
| 3. **Commercial Domain Registrars**: Often provide lists of available TLDs | ||||
|    - Examples: GoDaddy, Namecheap, etc. | ||||
| 
 | ||||
| For the most accurate and comprehensive implementation, consider implementing a parser for the Public Suffix List or using a library that maintains this list (e.g., `publicsuffix2` for Python). | ||||
| 
 | ||||
| ## API Endpoints | ||||
| 
 | ||||
| - `/api/uploads` - Get all uploads | ||||
| - `/api/slds` - Get all SLDs (Second Level Domains) | ||||
| - `/api/slds/{sld}` - Get domains by SLD | ||||
| - `/api/domains` - Get all domains | ||||
| - `/api/base-domains` - Get only unique base domains (e.g., example.com, example.co.uk) with simplified response format | ||||
| - `/api/domains/{domain}` - Get domains by name | ||||
| - `/api/dns` - Get all DNS records  | ||||
| - `/api/dns/types` - Get unique values for filters | ||||
| 
 | ||||
|  | @ -100,8 +128,27 @@ You can filter the API results using the following query parameters: | |||
| - `upload_id` - Filter by specific upload | ||||
| - `record_type` - Filter by DNS record type | ||||
| - `record_class` - Filter by DNS record class | ||||
| - `tld` - Filter by Top Level Domain | ||||
| - `sld` - Filter by Second Level Domain | ||||
| - `domain` - Search by domain name | ||||
| - `base_domains_only` - Only show base domains (e.g., example.com not mail.example.com) | ||||
| - `deduplicate` - For DNS records, control whether to show all records or deduplicate | ||||
| 
 | ||||
| Example: `/api/dns?record_type=A&tld=com&upload_id=upload_20250408120000` | ||||
| Examples: | ||||
| - `/api/domains?base_domains_only=true` - Show only base domains | ||||
| - `/api/base-domains` - Get a simplified list of unique base domains | ||||
| - `/api/dns?record_type=A&domain=example.com&deduplicate=false` - Show all A records for example.com without deduplication | ||||
| 
 | ||||
| ### Response Format Examples | ||||
| 
 | ||||
| 1. Base Domains Endpoint (`/api/base-domains`): | ||||
| ```json | ||||
| [ | ||||
|   { | ||||
|     "domain": "example.com", | ||||
|     "timestamp": "2025-04-08T12:00:00" | ||||
|   }, | ||||
|   { | ||||
|     "domain": "example.co.uk", | ||||
|     "timestamp": "2025-04-08T12:00:00" | ||||
|   } | ||||
| ] | ||||
| ``` | ||||
|  |  | |||
							
								
								
									
										351
									
								
								main.py
									
										
									
									
									
								
							
							
						
						
									
										351
									
								
								main.py
									
										
									
									
									
								
							|  | @ -3,7 +3,7 @@ import re | |||
| import io | ||||
| import datetime | ||||
| from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File, Form | ||||
| from fastapi.responses import HTMLResponse, RedirectResponse | ||||
| from fastapi.responses import HTMLResponse, RedirectResponse, Response | ||||
| from fastapi.staticfiles import StaticFiles | ||||
| from fastapi.templating import Jinja2Templates | ||||
| import uvicorn | ||||
|  | @ -32,29 +32,11 @@ def process_domain_entry(domain_entry): | |||
|     if domain_entry.endswith('.'): | ||||
|         domain_entry = domain_entry[:-1] | ||||
|      | ||||
|     # Parse domain components | ||||
|     parts = domain_entry.split('.') | ||||
|     if len(parts) > 1: | ||||
|         # For domain.tld format | ||||
|         if len(parts) == 2: | ||||
|             sld = parts[0]  # Second Level Domain | ||||
|             tld = parts[1]  # Top Level Domain | ||||
|     if domain_entry: | ||||
|         # Store only the full domain name without splitting | ||||
|         domain_info = { | ||||
|                 "sld": sld, | ||||
|                 "tld": tld, | ||||
|             "full_domain": domain_entry | ||||
|         } | ||||
|         # For subdomain.domain.tld format | ||||
|         else: | ||||
|             sld = parts[-2]  # Second Level Domain | ||||
|             tld = parts[-1]  # Top Level Domain | ||||
|             subdomain = '.'.join(parts[:-2])  # Subdomains | ||||
|             domain_info = { | ||||
|                 "sld": sld, | ||||
|                 "tld": tld, | ||||
|                 "full_domain": domain_entry, | ||||
|                 "subdomain": subdomain | ||||
|             } | ||||
|         return domain_info | ||||
|     return None | ||||
| 
 | ||||
|  | @ -90,7 +72,7 @@ async def process_csv_upload(file_content, upload_id, description=None): | |||
|             domain_info = process_domain_entry(domain_entry) | ||||
|             if domain_info: | ||||
|                 # Create a unique key to avoid duplicates within this upload | ||||
|                 unique_key = f"{domain_info['sld']}.{domain_info['tld']}" | ||||
|                 unique_key = domain_info['full_domain'] | ||||
|                  | ||||
|                 if unique_key not in unique_domains: | ||||
|                     unique_domains.add(unique_key) | ||||
|  | @ -127,22 +109,9 @@ async def process_csv_upload(file_content, upload_id, description=None): | |||
|                     "timestamp": timestamp | ||||
|                 } | ||||
|                  | ||||
|                 # Add domain components | ||||
|                 if len(domain_parts) > 1: | ||||
|                     if domain_parts[0].startswith('_'):  # Service records like _dmarc | ||||
|                 # Add special handling for service records | ||||
|                 if len(domain_parts) > 0 and domain_parts[0].startswith('_'):  # Service records like _dmarc | ||||
|                     entry["service"] = domain_parts[0] | ||||
|                         # Adjust domain parts | ||||
|                         domain_parts = domain_parts[1:] | ||||
|                      | ||||
|                     # For domain.tld format | ||||
|                     if len(domain_parts) == 2: | ||||
|                         entry["sld"] = domain_parts[0]  # Second Level Domain | ||||
|                         entry["tld"] = domain_parts[1]  # Top Level Domain | ||||
|                     # For subdomain.domain.tld format | ||||
|                     elif len(domain_parts) > 2: | ||||
|                         entry["sld"] = domain_parts[-2]  # Second Level Domain | ||||
|                         entry["tld"] = domain_parts[-1]  # Top Level Domain | ||||
|                         entry["subdomain"] = '.'.join(domain_parts[:-2])  # Subdomains | ||||
|                  | ||||
|                 dns_records_to_insert.append(entry) | ||||
|          | ||||
|  | @ -170,23 +139,42 @@ async def process_csv_upload(file_content, upload_id, description=None): | |||
|         print(traceback.format_exc()) | ||||
|         return 0, 0 | ||||
| 
 | ||||
| # Load domains from database - deduplicated by full domain name | ||||
| def load_domains(specific_upload_id: str = None) -> List[Dict]: | ||||
| # Load domains from database - deduplicated by full domain name, with optional base domain filtering | ||||
| def load_domains(specific_upload_id: str = None, base_domains_only: bool = False) -> List[Dict]: | ||||
|     try: | ||||
|         domains = domains_table.all() | ||||
|          | ||||
|         # If a specific upload ID is provided, only show domains from that upload | ||||
|         if specific_upload_id: | ||||
|             domains = [d for d in domains if d.get('upload_id') == specific_upload_id] | ||||
|             if not base_domains_only: | ||||
|                 return domains | ||||
|          | ||||
|         # Add the base_domain field to each domain | ||||
|         for domain in domains: | ||||
|             domain['base_domain'] = extract_base_domain(domain.get('full_domain', '')) | ||||
|          | ||||
|         # Sort by timestamp in descending order (newest first) | ||||
|         domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True) | ||||
|          | ||||
|         # Create a dictionary to track unique domains by full domain name | ||||
|         # Create a dictionary to track unique domains | ||||
|         unique_domains = {} | ||||
|         base_domains_set = set() | ||||
|          | ||||
|         # First pass: collect all base domains | ||||
|         if base_domains_only: | ||||
|             for domain in domains: | ||||
|                 base_domains_set.add(domain.get('base_domain', '')) | ||||
|          | ||||
|         for domain in domains: | ||||
|             # If base_domains_only is True, only keep domains that are base domains themselves | ||||
|             if base_domains_only: | ||||
|                 full_domain = domain.get('full_domain', '') | ||||
|                 base_domain = domain.get('base_domain', '') | ||||
|                  | ||||
|                 if full_domain != base_domain: | ||||
|                     continue | ||||
|              | ||||
|             # Create a unique key based on the full domain name | ||||
|             unique_key = domain.get('full_domain', '') | ||||
|              | ||||
|  | @ -201,25 +189,26 @@ def load_domains(specific_upload_id: str = None) -> List[Dict]: | |||
|         print(f"Error loading domains from database: {e}") | ||||
|         return [] | ||||
| 
 | ||||
| # Load DNS entries from database - deduplicated by domain, class, and type (no history) | ||||
| def load_dns_entries(specific_upload_id: str = None) -> List[Dict]: | ||||
| # Load DNS entries from database - with optional deduplication | ||||
| def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False) -> List[Dict]: | ||||
|     try: | ||||
|         entries = dns_records_table.all() | ||||
|          | ||||
|         # If a specific upload ID is provided, only show records from that upload | ||||
|         if specific_upload_id: | ||||
|             entries = [e for e in entries if e.get('upload_id') == specific_upload_id] | ||||
|             return entries | ||||
|          | ||||
|         # Sort by timestamp in descending order (newest first) | ||||
|         entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True) | ||||
|          | ||||
|         # If deduplication is requested, only keep the most recent entry for each unique combination | ||||
|         if deduplicate: | ||||
|             # Create a dictionary to track unique entries (most recent only) | ||||
|             unique_entries = {} | ||||
|              | ||||
|             for entry in entries: | ||||
|             # Create a unique key based on domain, class, and type | ||||
|             unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}" | ||||
|                 # Create a unique key based on domain, class, type, TTL, and data | ||||
|                 unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}:{entry.get('ttl')}:{entry.get('record_data')}" | ||||
|                  | ||||
|                 # Only keep the most recent entry for each unique combination | ||||
|                 if unique_key not in unique_entries: | ||||
|  | @ -229,17 +218,102 @@ def load_dns_entries(specific_upload_id: str = None) -> List[Dict]: | |||
|              | ||||
|             # Return the deduplicated list with only the most recent entries | ||||
|             return list(unique_entries.values()) | ||||
|         else: | ||||
|             # No deduplication - return all entries | ||||
|             return entries | ||||
|     except Exception as e: | ||||
|         print(f"Error loading DNS records from database: {e}") | ||||
|         return [] | ||||
| 
 | ||||
| # List of known multi-part TLDs | ||||
| MULTI_PART_TLDS = [ | ||||
|     'co.uk', 'org.uk', 'me.uk', 'ac.uk', 'gov.uk', 'net.uk', 'sch.uk', | ||||
|     'com.au', 'net.au', 'org.au', 'edu.au', 'gov.au', 'asn.au', 'id.au', | ||||
|     'co.nz', 'net.nz', 'org.nz', 'govt.nz', 'ac.nz', 'school.nz', 'geek.nz', | ||||
|     'com.sg', 'edu.sg', 'gov.sg', 'net.sg', 'org.sg', 'per.sg', | ||||
|     'co.za', 'org.za', 'web.za', 'net.za', 'gov.za', 'ac.za', | ||||
|     'com.br', 'net.br', 'org.br', 'gov.br', 'edu.br', | ||||
|     'co.jp', 'ac.jp', 'go.jp', 'or.jp', 'ne.jp', 'gr.jp', | ||||
|     'co.in', 'firm.in', 'net.in', 'org.in', 'gen.in', 'ind.in', | ||||
|     'edu.cn', 'gov.cn', 'net.cn', 'org.cn', 'com.cn', 'ac.cn', | ||||
|     'com.mx', 'net.mx', 'org.mx', 'edu.mx', 'gob.mx' | ||||
| ] | ||||
| 
 | ||||
| # Extract the base domain (SLD+TLD) from a full domain name | ||||
| def extract_base_domain(domain: str) -> str: | ||||
|     if not domain: | ||||
|         return domain | ||||
|          | ||||
|     # Remove trailing dot if present | ||||
|     if domain.endswith('.'): | ||||
|         domain = domain[:-1] | ||||
|          | ||||
|     parts = domain.split('.') | ||||
|      | ||||
|     # Check if the domain has enough parts | ||||
|     if len(parts) <= 1: | ||||
|         return domain | ||||
|          | ||||
|     # Check for known multi-part TLDs first | ||||
|     for tld in MULTI_PART_TLDS: | ||||
|         tld_parts = tld.split('.') | ||||
|         if len(parts) > len(tld_parts) and '.'.join(parts[-len(tld_parts):]) == tld: | ||||
|             # The domain has a multi-part TLD, extract SLD + multi-part TLD | ||||
|             return parts[-len(tld_parts)-1] + '.' + tld | ||||
|      | ||||
|     # Default case: extract last two parts | ||||
|     if len(parts) > 1: | ||||
|         return '.'.join(parts[-2:]) | ||||
|          | ||||
|     return domain | ||||
| 
 | ||||
| # Get all unique base domains from the database | ||||
| def get_unique_base_domains(specific_upload_id: str = None) -> List[Dict]: | ||||
|     try: | ||||
|         domains = domains_table.all() | ||||
|          | ||||
|         # If a specific upload ID is provided, only show domains from that upload | ||||
|         if specific_upload_id: | ||||
|             domains = [d for d in domains if d.get('upload_id') == specific_upload_id] | ||||
|          | ||||
|         # Add the base_domain field to each domain | ||||
|         for domain in domains: | ||||
|             domain['base_domain'] = extract_base_domain(domain.get('full_domain', '')) | ||||
|          | ||||
|         # Sort by timestamp in descending order (newest first) | ||||
|         domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True) | ||||
|          | ||||
|         # Create dictionaries to track unique base domains | ||||
|         unique_base_domains = {} | ||||
|          | ||||
|         # Process each domain and keep only unique base domains | ||||
|         for domain in domains: | ||||
|             base_domain = domain.get('base_domain', '') | ||||
|              | ||||
|             # Skip if no base domain | ||||
|             if not base_domain: | ||||
|                 continue | ||||
|                  | ||||
|             # Check if this base domain has been seen before | ||||
|             if base_domain not in unique_base_domains: | ||||
|                 # Create a new entry for this base domain - with simplified fields | ||||
|                 base_domain_entry = { | ||||
|                     'domain': base_domain, | ||||
|                     'timestamp': domain.get('timestamp') | ||||
|                 } | ||||
|                 unique_base_domains[base_domain] = base_domain_entry | ||||
|          | ||||
|         # Return the list of unique base domains | ||||
|         return list(unique_base_domains.values()) | ||||
|     except Exception as e: | ||||
|         print(f"Error getting unique base domains: {e}") | ||||
|         return [] | ||||
| 
 | ||||
| # Get unique values for filter dropdowns | ||||
| def get_unique_values(entries: List[Dict]) -> Dict[str, Set]: | ||||
|     unique_values = { | ||||
|         "record_type": set(), | ||||
|         "record_class": set(), | ||||
|         "tld": set(), | ||||
|         "sld": set() | ||||
|         "record_class": set() | ||||
|     } | ||||
|      | ||||
|     for entry in entries: | ||||
|  | @ -276,18 +350,77 @@ def delete_upload(upload_id): | |||
|         print(f"Error deleting upload {upload_id}: {e}") | ||||
|         return False | ||||
| 
 | ||||
| # CSV Export Functions | ||||
| def domains_to_csv(domains: List[Dict]) -> str: | ||||
|     """Convert domains data to CSV format""" | ||||
|     csv_output = io.StringIO() | ||||
|      | ||||
|     if not domains: | ||||
|         return "" | ||||
|          | ||||
|     # Determine fields based on data | ||||
|     # Always include the full_domain field | ||||
|     fields = ["full_domain", "timestamp"] | ||||
|     if "base_domain" in domains[0]: | ||||
|         fields.insert(1, "base_domain") | ||||
|      | ||||
|     # Add headers | ||||
|     writer = csv.DictWriter(csv_output, fieldnames=fields, extrasaction='ignore') | ||||
|     writer.writeheader() | ||||
|      | ||||
|     # Add data | ||||
|     for domain in domains: | ||||
|         # Create a row dict with formatted timestamp | ||||
|         row = {k: domain.get(k) for k in fields} | ||||
|         if "timestamp" in row and row["timestamp"]: | ||||
|             # Format timestamp nicely for CSV | ||||
|             row["timestamp"] = row["timestamp"].replace('T', ' ').split('.')[0] | ||||
|         writer.writerow(row) | ||||
|      | ||||
|     return csv_output.getvalue() | ||||
| 
 | ||||
| def dns_records_to_csv(records: List[Dict]) -> str: | ||||
|     """Convert DNS records data to CSV format""" | ||||
|     csv_output = io.StringIO() | ||||
|      | ||||
|     if not records: | ||||
|         return "" | ||||
|      | ||||
|     # Define the fields to include in the CSV | ||||
|     fields = ["domain", "ttl", "record_class", "record_type", "record_data", "timestamp"] | ||||
|      | ||||
|     # Add headers | ||||
|     writer = csv.DictWriter(csv_output, fieldnames=fields, extrasaction='ignore') | ||||
|     writer.writeheader() | ||||
|      | ||||
|     # Add data | ||||
|     for record in records: | ||||
|         # Create a row dict with formatted timestamp | ||||
|         row = {k: record.get(k) for k in fields} | ||||
|         if "timestamp" in row and row["timestamp"]: | ||||
|             # Format timestamp nicely for CSV | ||||
|             row["timestamp"] = row["timestamp"].replace('T', ' ').split('.')[0] | ||||
|         writer.writerow(row) | ||||
|      | ||||
|     return csv_output.getvalue() | ||||
| 
 | ||||
| # Routes | ||||
| @app.get("/", response_class=HTMLResponse) | ||||
| async def home(request: Request, upload_id: Optional[str] = None): | ||||
|     """Home page with upload form and SLD listing""" | ||||
|     domains = load_domains(upload_id) | ||||
| async def home( | ||||
|     request: Request,  | ||||
|     upload_id: Optional[str] = None, | ||||
|     base_domains_only: Optional[bool] = False | ||||
| ): | ||||
|     """Home page with upload form and domain listing""" | ||||
|     domains = load_domains(upload_id, base_domains_only) | ||||
|     uploads = get_uploads() | ||||
|     return templates.TemplateResponse( | ||||
|         "index.html",  | ||||
|         { | ||||
|             "request": request,  | ||||
|             "domains": domains, | ||||
|             "uploads": uploads | ||||
|             "uploads": uploads, | ||||
|             "base_domains_only": base_domains_only | ||||
|         } | ||||
|     ) | ||||
| 
 | ||||
|  | @ -361,28 +494,23 @@ async def dns_records( | |||
|     upload_id: Optional[str] = None, | ||||
|     record_type: Optional[str] = None, | ||||
|     record_class: Optional[str] = None, | ||||
|     tld: Optional[str] = None, | ||||
|     sld: Optional[str] = None, | ||||
|     domain: Optional[str] = None | ||||
|     domain: Optional[str] = None, | ||||
|     deduplicate: Optional[bool] = True  # Default to showing only unique latest entries | ||||
| ): | ||||
|     """DNS Records page with filtering""" | ||||
|     # Get all entries first, based on upload_id if provided | ||||
|     entries = load_dns_entries(upload_id) | ||||
|     # Get all entries first, based on upload_id if provided, with deduplication option | ||||
|     entries = load_dns_entries(upload_id, deduplicate) | ||||
|      | ||||
|     # Apply additional filters if provided | ||||
|     if record_type: | ||||
|         entries = [e for e in entries if e.get("record_type") == record_type] | ||||
|     if record_class: | ||||
|         entries = [e for e in entries if e.get("record_class") == record_class] | ||||
|     if tld: | ||||
|         entries = [e for e in entries if e.get("tld") == tld] | ||||
|     if sld: | ||||
|         entries = [e for e in entries if e.get("sld") == sld] | ||||
|     if domain: | ||||
|         entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()] | ||||
|      | ||||
|     # Get unique values for filter dropdowns from all entries (not filtered) | ||||
|     all_entries = load_dns_entries(upload_id) | ||||
|     all_entries = load_dns_entries(upload_id, deduplicate=False) | ||||
|     unique_values = get_unique_values(all_entries) | ||||
|     uploads = get_uploads() | ||||
|      | ||||
|  | @ -392,34 +520,96 @@ async def dns_records( | |||
|             "request": request,  | ||||
|             "entries": entries, | ||||
|             "unique_values": unique_values, | ||||
|             "uploads": uploads | ||||
|             "uploads": uploads, | ||||
|             "deduplicate": deduplicate | ||||
|         } | ||||
|     ) | ||||
| 
 | ||||
| @app.get("/export-domains-csv") | ||||
| async def export_domains_csv( | ||||
|     upload_id: Optional[str] = None, | ||||
|     base_domains_only: Optional[bool] = False | ||||
| ): | ||||
|     """Export domains as CSV""" | ||||
|     domains = load_domains(upload_id, base_domains_only) | ||||
|     csv_content = domains_to_csv(domains) | ||||
|      | ||||
|     # Generate a filename with timestamp | ||||
|     filename = f"domains_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | ||||
|      | ||||
|     # Return the CSV as a downloadable file | ||||
|     return Response( | ||||
|         content=csv_content, | ||||
|         media_type="text/csv", | ||||
|         headers={"Content-Disposition": f"attachment; filename={filename}"} | ||||
|     ) | ||||
| 
 | ||||
| @app.get("/export-dns-csv") | ||||
| async def export_dns_csv( | ||||
|     upload_id: Optional[str] = None, | ||||
|     record_type: Optional[str] = None, | ||||
|     record_class: Optional[str] = None, | ||||
|     domain: Optional[str] = None, | ||||
|     deduplicate: Optional[bool] = True | ||||
| ): | ||||
|     """Export DNS records as CSV""" | ||||
|     # Get entries with applied filters | ||||
|     entries = load_dns_entries(upload_id, deduplicate) | ||||
|      | ||||
|     # Apply additional filters if provided | ||||
|     if record_type: | ||||
|         entries = [e for e in entries if e.get("record_type") == record_type] | ||||
|     if record_class: | ||||
|         entries = [e for e in entries if e.get("record_class") == record_class] | ||||
|     if domain: | ||||
|         entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()] | ||||
|      | ||||
|     csv_content = dns_records_to_csv(entries) | ||||
|      | ||||
|     # Generate a filename with timestamp | ||||
|     filename = f"dns_records_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | ||||
|      | ||||
|     # Return the CSV as a downloadable file | ||||
|     return Response( | ||||
|         content=csv_content, | ||||
|         media_type="text/csv", | ||||
|         headers={"Content-Disposition": f"attachment; filename={filename}"} | ||||
|     ) | ||||
| 
 | ||||
| # API Routes | ||||
| @app.get("/api/uploads", response_model=List[Dict]) | ||||
| async def get_all_uploads(): | ||||
|     """API endpoint that returns all uploads""" | ||||
|     return get_uploads() | ||||
| 
 | ||||
| @app.get("/api/slds", response_model=List[Dict]) | ||||
| async def get_slds(upload_id: Optional[str] = None): | ||||
|     """API endpoint that returns all SLDs with optional filter by upload_id""" | ||||
|     # The load_domains function now handles deduplication and upload_id filtering | ||||
|     domains = load_domains(upload_id) | ||||
| @app.get("/api/domains", response_model=List[Dict]) | ||||
| async def get_domains( | ||||
|     upload_id: Optional[str] = None, | ||||
|     base_domains_only: Optional[bool] = False | ||||
| ): | ||||
|     """API endpoint that returns all domains with optional filtering""" | ||||
|     # The load_domains function handles deduplication and filtering | ||||
|     domains = load_domains(upload_id, base_domains_only) | ||||
|     return domains | ||||
| 
 | ||||
| @app.get("/api/slds/{sld}", response_model=List[Dict]) | ||||
| async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None): | ||||
|     """API endpoint that returns domains for a specific SLD with optional filter by upload_id""" | ||||
| @app.get("/api/base-domains", response_model=List[Dict]) | ||||
| async def get_base_domains(upload_id: Optional[str] = None): | ||||
|     """API endpoint that returns only unique base domains""" | ||||
|     # Get only the unique base domains | ||||
|     base_domains = get_unique_base_domains(upload_id) | ||||
|     return base_domains | ||||
| 
 | ||||
| @app.get("/api/domains/{domain}", response_model=List[Dict]) | ||||
| async def get_domains_by_name(domain: str, upload_id: Optional[str] = None): | ||||
|     """API endpoint that returns domains matching a specific domain name with optional filter by upload_id""" | ||||
|     # Get domains, already deduplicated and optionally filtered by upload_id | ||||
|     all_domains = load_domains(upload_id) | ||||
|      | ||||
|     # Filter by SLD | ||||
|     filtered = [item for item in all_domains if item["sld"].lower() == sld.lower()] | ||||
|     # Filter by domain name | ||||
|     filtered = [item for item in all_domains if domain.lower() in item["full_domain"].lower()] | ||||
|      | ||||
|     if not filtered: | ||||
|         raise HTTPException(status_code=404, detail=f"No domains found with SLD: {sld}") | ||||
|         raise HTTPException(status_code=404, detail=f"No domains found matching: {domain}") | ||||
|      | ||||
|     return filtered | ||||
| 
 | ||||
|  | @ -427,24 +617,19 @@ async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None): | |||
| async def get_dns_entries( | ||||
|     record_type: Optional[str] = None, | ||||
|     record_class: Optional[str] = None, | ||||
|     tld: Optional[str] = None, | ||||
|     sld: Optional[str] = None, | ||||
|     domain: Optional[str] = None, | ||||
|     upload_id: Optional[str] = None | ||||
|     upload_id: Optional[str] = None, | ||||
|     deduplicate: Optional[bool] = True | ||||
| ): | ||||
|     """API endpoint that returns filtered DNS entries""" | ||||
|     """API endpoint that returns filtered DNS entries with optional deduplication""" | ||||
|     # Get entries - if upload_id is specified, only those entries are returned | ||||
|     entries = load_dns_entries(upload_id) | ||||
|     entries = load_dns_entries(upload_id, deduplicate) | ||||
|      | ||||
|     # Apply additional filters if provided | ||||
|     if record_type: | ||||
|         entries = [e for e in entries if e.get("record_type") == record_type] | ||||
|     if record_class: | ||||
|         entries = [e for e in entries if e.get("record_class") == record_class] | ||||
|     if tld: | ||||
|         entries = [e for e in entries if e.get("tld") == tld] | ||||
|     if sld: | ||||
|         entries = [e for e in entries if e.get("sld") == sld] | ||||
|     if domain: | ||||
|         entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()] | ||||
|      | ||||
|  |  | |||
|  | @ -115,14 +115,10 @@ | |||
|             background-color: #e0e0e0; | ||||
|             color: #333; | ||||
|         } | ||||
|         .sld-badge { | ||||
|         .domain-badge { | ||||
|             background-color: #d1e7dd; | ||||
|             color: #0f5132; | ||||
|         } | ||||
|         .tld-badge { | ||||
|             background-color: #cfe2ff; | ||||
|             color: #0a58ca; | ||||
|         } | ||||
|         .service-badge { | ||||
|             background-color: #fff3cd; | ||||
|             color: #664d03; | ||||
|  | @ -149,6 +145,13 @@ | |||
|             margin-left: 10px; | ||||
|             font-weight: normal; | ||||
|         } | ||||
|         .dedup-note { | ||||
|             display: block; | ||||
|             font-size: 0.7em; | ||||
|             color: #666; | ||||
|             font-weight: normal; | ||||
|             margin-top: 5px; | ||||
|         } | ||||
|         .tooltip { | ||||
|             position: relative; | ||||
|             display: inline-block; | ||||
|  | @ -181,14 +184,43 @@ | |||
|                 width: 100%; | ||||
|             } | ||||
|         } | ||||
|         .table-header { | ||||
|             display: flex; | ||||
|             justify-content: space-between; | ||||
|             align-items: flex-start; | ||||
|             margin-bottom: 15px; | ||||
|         } | ||||
|         .table-actions { | ||||
|             display: flex; | ||||
|             gap: 10px; | ||||
|             margin-top: 10px; | ||||
|         } | ||||
|         .btn-export { | ||||
|             display: inline-flex; | ||||
|             align-items: center; | ||||
|             gap: 5px; | ||||
|             padding: 8px 15px; | ||||
|             background-color: #4CAF50; | ||||
|             color: white; | ||||
|             border-radius: 4px; | ||||
|             text-decoration: none; | ||||
|             font-weight: bold; | ||||
|             font-size: 0.9em; | ||||
|         } | ||||
|         .btn-export:hover { | ||||
|             background-color: #45a049; | ||||
|         } | ||||
|         .icon { | ||||
|             font-size: 1.2em; | ||||
|         } | ||||
|     </style> | ||||
| </head> | ||||
| <body> | ||||
|     <div class="container"> | ||||
|         <h1>DNS Entry Viewer</h1> | ||||
|         <h1>DNS Records Viewer {% if deduplicate %}(Deduplicated){% else %}(All Records){% endif %}</h1> | ||||
|          | ||||
|         <div class="nav"> | ||||
|             <a href="/" class="nav-link">SLD View</a> | ||||
|             <a href="/" class="nav-link">Domains</a> | ||||
|             <a href="/dns-records" class="nav-link">DNS Records</a> | ||||
|         </div> | ||||
|          | ||||
|  | @ -222,35 +254,46 @@ | |||
|                     {% endfor %} | ||||
|                 </select> | ||||
|             </div> | ||||
|             <div class="filter-group"> | ||||
|                 <label for="tld">TLD:</label> | ||||
|                 <select id="tld" name="tld"> | ||||
|                     <option value="">All TLDs</option> | ||||
|                     {% for tld in unique_values.tld %} | ||||
|                     <option value="{{ tld }}" {% if request.query_params.get('tld') == tld %}selected{% endif %}>{{ tld }}</option> | ||||
|                     {% endfor %} | ||||
|                 </select> | ||||
|             </div> | ||||
|             <div class="filter-group"> | ||||
|                 <label for="sld">SLD:</label> | ||||
|                 <select id="sld" name="sld"> | ||||
|                     <option value="">All SLDs</option> | ||||
|                     {% for sld in unique_values.sld %} | ||||
|                     <option value="{{ sld }}" {% if request.query_params.get('sld') == sld %}selected{% endif %}>{{ sld }}</option> | ||||
|                     {% endfor %} | ||||
|                 </select> | ||||
|             </div> | ||||
|             <div class="filter-group"> | ||||
|                 <label for="domain-search">Domain Search:</label> | ||||
|                 <input type="text" id="domain-search" name="domain" placeholder="Enter domain name..." value="{{ request.query_params.get('domain', '') }}"> | ||||
|             </div> | ||||
|             <div class="filter-group"> | ||||
|                 <label for="deduplicate">Deduplicate Records:</label> | ||||
|                 <select id="deduplicate" name="deduplicate" class="filter-select"> | ||||
|                     <option value="true" {% if request.query_params.get('deduplicate', 'true') == 'true' %}selected{% endif %}>Yes (Deduplicate Identical Records)</option> | ||||
|                     <option value="false" {% if request.query_params.get('deduplicate') == 'false' %}selected{% endif %}>No (Show All)</option> | ||||
|                 </select> | ||||
|             </div> | ||||
|             <div class="filter-buttons"> | ||||
|                 <button type="submit">Apply Filters</button> | ||||
|                 <a href="/dns-records" class="reset-button" style="padding: 8px 16px; background-color: #f44336; color: white; text-decoration: none; border-radius: 4px; font-weight: bold; display: inline-block;">Reset</a> | ||||
|             </div> | ||||
|         </form> | ||||
| 
 | ||||
|         <h2>DNS Records <span class="count-badge">{{ entries|length }}</span></h2> | ||||
|         <div class="api-section"> | ||||
|             <h3>API Endpoints</h3> | ||||
|             <p>Get all DNS entries: <code>/api/dns</code></p> | ||||
|             <p>Get filtered DNS entries: <code>/api/dns?record_type=A&domain=example.com</code></p> | ||||
|             <p>Filter by upload: <code>/api/dns?upload_id={upload_id}</code></p> | ||||
|             <p>Show all records (no deduplication): <code>/api/dns?deduplicate=false</code></p> | ||||
|             <p>Get unique filter values: <code>/api/dns/types</code></p> | ||||
|         </div> | ||||
|          | ||||
|         <div class="table-header"> | ||||
|             <h2>DNS Records <span class="count-badge">{{ entries|length }}</span> | ||||
|                 {% if deduplicate %} | ||||
|                 <small class="dedup-note">(Showing most recent entries for each Domain+Class+Type+TTL+Data combination)</small> | ||||
|                 {% endif %} | ||||
|             </h2> | ||||
|             {% if entries %} | ||||
|             <div class="table-actions"> | ||||
|                 <a href="/export-dns-csv?{% if request.query_params.get('upload_id') %}upload_id={{ request.query_params.get('upload_id') }}{% endif %}{% if request.query_params.get('record_type') %}&record_type={{ request.query_params.get('record_type') }}{% endif %}{% if request.query_params.get('record_class') %}&record_class={{ request.query_params.get('record_class') }}{% endif %}{% if request.query_params.get('domain') %}&domain={{ request.query_params.get('domain') }}{% endif %}{% if request.query_params.get('deduplicate') == 'false' %}&deduplicate=false{% endif %}" class="btn-export" title="Export to CSV"> | ||||
|                     <span class="icon">📥</span> Export CSV | ||||
|                 </a> | ||||
|             </div> | ||||
|             {% endif %} | ||||
|         </div> | ||||
|          | ||||
|         {% if entries %} | ||||
|             <table id="dns-table"> | ||||
|  | @ -271,11 +314,7 @@ | |||
|                             {% if entry.get('service') %} | ||||
|                             <span class="badge service-badge">{{ entry.service }}</span> | ||||
|                             {% endif %} | ||||
|                             {% if entry.get('subdomain') %} | ||||
|                             {{ entry.subdomain }}. | ||||
|                             {% endif %} | ||||
|                             <span class="badge sld-badge">{{ entry.sld }}</span>. | ||||
|                             <span class="badge tld-badge">{{ entry.tld }}</span> | ||||
|                             <span class="badge domain-badge">{{ entry.domain }}</span> | ||||
|                         </td> | ||||
|                         <td>{{ entry.ttl }}</td> | ||||
|                         <td>{{ entry.record_class }}</td> | ||||
|  | @ -298,14 +337,6 @@ | |||
|         {% else %} | ||||
|             <p>No DNS entries found. Please upload a CSV file to get started.</p> | ||||
|         {% endif %} | ||||
|          | ||||
|         <div class="api-section"> | ||||
|             <h2>API Endpoints</h2> | ||||
|             <p>Get all DNS entries: <code>/api/dns</code></p> | ||||
|             <p>Get filtered DNS entries: <code>/api/dns?record_type=A&tld=de</code></p> | ||||
|             <p>Filter by upload: <code>/api/dns?upload_id={upload_id}</code></p> | ||||
|             <p>Get unique filter values: <code>/api/dns/types</code></p> | ||||
|         </div> | ||||
|     </div> | ||||
|      | ||||
|     <!-- All JavaScript removed, using server-side FastAPI for filtering --> | ||||
|  |  | |||
|  | @ -53,7 +53,7 @@ | |||
|         tr:hover { | ||||
|             background-color: #f5f5f5; | ||||
|         } | ||||
|         .sld-badge { | ||||
|         .domain-badge { | ||||
|             display: inline-block; | ||||
|             padding: 3px 7px; | ||||
|             background-color: #d1e7dd; | ||||
|  | @ -61,7 +61,7 @@ | |||
|             font-size: 0.9em; | ||||
|             color: #0f5132; | ||||
|         } | ||||
|         .tld-badge { | ||||
|         .base-domain-badge { | ||||
|             display: inline-block; | ||||
|             padding: 3px 7px; | ||||
|             background-color: #cfe2ff; | ||||
|  | @ -69,6 +69,15 @@ | |||
|             font-size: 0.9em; | ||||
|             color: #0a58ca; | ||||
|         } | ||||
|         .same-domain-badge { | ||||
|             display: inline-block; | ||||
|             padding: 3px 7px; | ||||
|             background-color: #e9ecef; | ||||
|             border-radius: 4px; | ||||
|             font-size: 0.9em; | ||||
|             color: #6c757d; | ||||
|             font-style: italic; | ||||
|         } | ||||
|         .api-section { | ||||
|             margin-top: 30px; | ||||
|             padding: 15px; | ||||
|  | @ -135,12 +144,76 @@ | |||
|         } | ||||
|         .filter-form { | ||||
|             margin-bottom: 20px; | ||||
|             background-color: #f9f9f9; | ||||
|             padding: 15px; | ||||
|             border-radius: 5px; | ||||
|         } | ||||
|         .filter-row { | ||||
|             display: flex; | ||||
|             flex-wrap: wrap; | ||||
|             gap: 15px; | ||||
|             align-items: flex-end; | ||||
|         } | ||||
|         .filter-group { | ||||
|             display: flex; | ||||
|             flex-direction: column; | ||||
|         } | ||||
|         .filter-group label { | ||||
|             font-weight: bold; | ||||
|             margin-bottom: 5px; | ||||
|             font-size: 0.9em; | ||||
|         } | ||||
|         .filter-select { | ||||
|             padding: 8px 12px; | ||||
|             border: 1px solid #ddd; | ||||
|             border-radius: 4px; | ||||
|             margin-right: 10px; | ||||
|             min-width: 150px; | ||||
|         } | ||||
|         .btn-sm { | ||||
|             padding: 8px 16px; | ||||
|             font-size: 0.9em; | ||||
|         } | ||||
|         .reset-button { | ||||
|             display: inline-block; | ||||
|             padding: 8px 16px; | ||||
|             background-color: #f44336; | ||||
|             color: white; | ||||
|             text-decoration: none; | ||||
|             border-radius: 4px; | ||||
|             font-weight: bold; | ||||
|             font-size: 0.9em; | ||||
|         } | ||||
|         .reset-button:hover { | ||||
|             background-color: #e53935; | ||||
|             color: white; | ||||
|         } | ||||
|         .table-header { | ||||
|             display: flex; | ||||
|             justify-content: space-between; | ||||
|             align-items: center; | ||||
|             margin-bottom: 15px; | ||||
|         } | ||||
|         .table-actions { | ||||
|             display: flex; | ||||
|             gap: 10px; | ||||
|         } | ||||
|         .btn-export { | ||||
|             display: inline-flex; | ||||
|             align-items: center; | ||||
|             gap: 5px; | ||||
|             padding: 8px 15px; | ||||
|             background-color: #4CAF50; | ||||
|             color: white; | ||||
|             border-radius: 4px; | ||||
|             text-decoration: none; | ||||
|             font-weight: bold; | ||||
|             font-size: 0.9em; | ||||
|         } | ||||
|         .btn-export:hover { | ||||
|             background-color: #45a049; | ||||
|         } | ||||
|         .icon { | ||||
|             font-size: 1.2em; | ||||
|         } | ||||
|     </style> | ||||
| </head> | ||||
|  | @ -149,7 +222,7 @@ | |||
|         <h1>Domain Management System</h1> | ||||
|          | ||||
|         <div class="nav"> | ||||
|             <a href="/" class="nav-link">SLD View</a> | ||||
|             <a href="/" class="nav-link">Domains</a> | ||||
|             <a href="/dns-records" class="nav-link">DNS Records</a> | ||||
|         </div> | ||||
|          | ||||
|  | @ -207,8 +280,10 @@ | |||
|         <div class="filter-form"> | ||||
|             <h2>Domain List</h2> | ||||
|             <form id="filterForm" method="get"> | ||||
|                 <div class="filter-row"> | ||||
|                     <div class="filter-group"> | ||||
|                         <label for="upload_filter">Filter by upload:</label> | ||||
|                 <select id="upload_filter" name="upload_id" class="filter-select" onchange="this.form.submit()"> | ||||
|                         <select id="upload_filter" name="upload_id" class="filter-select"> | ||||
|                             <option value="">All uploads</option> | ||||
|                             {% for upload in uploads %} | ||||
|                             <option value="{{ upload.id }}" {% if request.query_params.get('upload_id') == upload.id %}selected{% endif %}> | ||||
|  | @ -216,36 +291,60 @@ | |||
|                             </option> | ||||
|                             {% endfor %} | ||||
|                         </select> | ||||
|                     </div> | ||||
|                      | ||||
|                     <div class="filter-group"> | ||||
|                         <label for="base_domains_only">Show base domains only:</label> | ||||
|                         <select id="base_domains_only" name="base_domains_only" class="filter-select"> | ||||
|                             <option value="false" {% if request.query_params.get('base_domains_only', 'false') == 'false' %}selected{% endif %}>No (Show All)</option> | ||||
|                             <option value="true" {% if request.query_params.get('base_domains_only') == 'true' %}selected{% endif %}>Yes (example.com only)</option> | ||||
|                         </select> | ||||
|                     </div> | ||||
|                      | ||||
|                     <div class="filter-buttons"> | ||||
|                         <button type="submit" class="btn btn-sm">Apply Filters</button> | ||||
|                         <a href="/" class="reset-button">Reset</a> | ||||
|                     </div> | ||||
|                 </div> | ||||
|             </form> | ||||
|         </div> | ||||
|          | ||||
|         <div class="api-section"> | ||||
|             <h3>API Endpoints</h3> | ||||
|             <p>Get all uploads: <code>/api/uploads</code></p> | ||||
|             <p>Get all domains: <code>/api/slds</code></p> | ||||
|             <p>Get domains by SLD: <code>/api/slds/{sld}</code></p> | ||||
|             <p>Filter by upload: <code>/api/slds?upload_id={upload_id}</code></p> | ||||
|             <p>Get all domains: <code>/api/domains</code></p> | ||||
|             <p>Get only base domains: <code>/api/base-domains</code> (simplified format: <code>{"domain": "example.com", "timestamp": "..."}</code>)</p> | ||||
|             <p>Get domains by name: <code>/api/domains/{domain}</code></p> | ||||
|             <p>Filter by upload: <code>/api/domains?upload_id={upload_id}</code></p> | ||||
|             <p>Show base domains only: <code>/api/domains?base_domains_only=true</code></p> | ||||
|         </div> | ||||
|          | ||||
|         {% if domains %} | ||||
|             <div class="table-header"> | ||||
|                 <p>Found {{ domains|length }} domains{% if request.query_params.get('upload_id') %} in this upload{% endif %}.</p> | ||||
|                 <div class="table-actions"> | ||||
|                     <a href="/export-domains-csv?{% if request.query_params.get('upload_id') %}upload_id={{ request.query_params.get('upload_id') }}{% endif %}{% if request.query_params.get('base_domains_only') == 'true' %}&base_domains_only=true{% endif %}" class="btn-export" title="Export to CSV"> | ||||
|                         <span class="icon">📥</span> Export CSV | ||||
|                     </a> | ||||
|                 </div> | ||||
|             </div> | ||||
|             <table> | ||||
|                 <thead> | ||||
|                     <tr> | ||||
|                         <th>SLD</th> | ||||
|                         <th>TLD</th> | ||||
|                         <th>Subdomain</th> | ||||
|                         <th>Full Domain</th> | ||||
|                         <th>Domain</th> | ||||
|                         {% if not base_domains_only %} | ||||
|                         <th>Base Domain</th> | ||||
|                         {% endif %} | ||||
|                         <th>Upload Date</th> | ||||
|                     </tr> | ||||
|                 </thead> | ||||
|                 <tbody> | ||||
|                     {% for item in domains %} | ||||
|                     <tr> | ||||
|                         <td><span class="sld-badge">{{ item.sld }}</span></td> | ||||
|                         <td><span class="tld-badge">{{ item.tld }}</span></td> | ||||
|                         <td>{{ item.get('subdomain', 'N/A') }}</td> | ||||
|                         <td>{{ item.full_domain }}</td> | ||||
|                         <td><span class="domain-badge">{{ item.full_domain }}</span></td> | ||||
|                         {% if not base_domains_only %} | ||||
|                         <td>{% if item.base_domain != item.full_domain %}<span class="base-domain-badge">{{ item.base_domain }}</span>{% else %}<span class="same-domain-badge">Same as domain</span>{% endif %}</td> | ||||
|                         {% endif %} | ||||
|                         <td>{{ item.timestamp.replace('T', ' ').split('.')[0] if item.get('timestamp') else 'N/A' }}</td> | ||||
|                     </tr> | ||||
|                     {% endfor %} | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue