diff --git a/README.md b/README.md index c0577e1..f6357eb 100644 --- a/README.md +++ b/README.md @@ -85,11 +85,39 @@ Where columns are: 4. Record Type (A, AAAA, MX, CNAME, TXT, etc.) 5. Record Data (IP address, hostname, or other data depending on record type) +## Domain Base Name Detection + +The application includes functionality to identify base domains from fully qualified domain names, including handling of multi-part TLDs like ".co.uk" or ".com.au". + +### Multi-Part TLD List + +The application uses a hardcoded list of common multi-part TLDs to correctly extract base domains (e.g., "example.co.uk" from "mail.example.co.uk"). + +This list can be found in `main.py` as `MULTI_PART_TLDS`. + +### Updating the TLD List + +To ensure accurate domain parsing, you should periodically update the multi-part TLD list. The best sources for this information are: + +1. **Public Suffix List (PSL)**: The most comprehensive and authoritative source + - Website: https://publicsuffix.org/list/ + - GitHub: https://github.com/publicsuffix/list + - This list is maintained by Mozilla and used by browsers and DNS applications + +2. **IANA's TLD Database**: The official registry of top-level domains + - Website: https://www.iana.org/domains/root/db + +3. **Commercial Domain Registrars**: Often provide lists of available TLDs + - Examples: GoDaddy, Namecheap, etc. + +For the most accurate and comprehensive implementation, consider implementing a parser for the Public Suffix List or using a library that maintains this list (e.g., `publicsuffix2` for Python). + ## API Endpoints - `/api/uploads` - Get all uploads -- `/api/slds` - Get all SLDs (Second Level Domains) -- `/api/slds/{sld}` - Get domains by SLD +- `/api/domains` - Get all domains +- `/api/base-domains` - Get only unique base domains (e.g., example.com, example.co.uk) with simplified response format +- `/api/domains/{domain}` - Get domains by name - `/api/dns` - Get all DNS records - `/api/dns/types` - Get unique values for filters @@ -100,8 +128,27 @@ You can filter the API results using the following query parameters: - `upload_id` - Filter by specific upload - `record_type` - Filter by DNS record type - `record_class` - Filter by DNS record class -- `tld` - Filter by Top Level Domain -- `sld` - Filter by Second Level Domain - `domain` - Search by domain name +- `base_domains_only` - Only show base domains (e.g., example.com not mail.example.com) +- `deduplicate` - For DNS records, control whether to show all records or deduplicate -Example: `/api/dns?record_type=A&tld=com&upload_id=upload_20250408120000` +Examples: +- `/api/domains?base_domains_only=true` - Show only base domains +- `/api/base-domains` - Get a simplified list of unique base domains +- `/api/dns?record_type=A&domain=example.com&deduplicate=false` - Show all A records for example.com without deduplication + +### Response Format Examples + +1. Base Domains Endpoint (`/api/base-domains`): +```json +[ + { + "domain": "example.com", + "timestamp": "2025-04-08T12:00:00" + }, + { + "domain": "example.co.uk", + "timestamp": "2025-04-08T12:00:00" + } +] +``` diff --git a/main.py b/main.py index cf5991f..0d2cc7a 100644 --- a/main.py +++ b/main.py @@ -139,23 +139,42 @@ async def process_csv_upload(file_content, upload_id, description=None): print(traceback.format_exc()) return 0, 0 -# Load domains from database - deduplicated by full domain name -def load_domains(specific_upload_id: str = None) -> List[Dict]: +# Load domains from database - deduplicated by full domain name, with optional base domain filtering +def load_domains(specific_upload_id: str = None, base_domains_only: bool = False) -> List[Dict]: try: domains = domains_table.all() # If a specific upload ID is provided, only show domains from that upload if specific_upload_id: domains = [d for d in domains if d.get('upload_id') == specific_upload_id] - return domains + if not base_domains_only: + return domains + + # Add the base_domain field to each domain + for domain in domains: + domain['base_domain'] = extract_base_domain(domain.get('full_domain', '')) # Sort by timestamp in descending order (newest first) domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True) - # Create a dictionary to track unique domains by full domain name + # Create a dictionary to track unique domains unique_domains = {} + base_domains_set = set() + + # First pass: collect all base domains + if base_domains_only: + for domain in domains: + base_domains_set.add(domain.get('base_domain', '')) for domain in domains: + # If base_domains_only is True, only keep domains that are base domains themselves + if base_domains_only: + full_domain = domain.get('full_domain', '') + base_domain = domain.get('base_domain', '') + + if full_domain != base_domain: + continue + # Create a unique key based on the full domain name unique_key = domain.get('full_domain', '') @@ -206,6 +225,90 @@ def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False) print(f"Error loading DNS records from database: {e}") return [] +# List of known multi-part TLDs +MULTI_PART_TLDS = [ + 'co.uk', 'org.uk', 'me.uk', 'ac.uk', 'gov.uk', 'net.uk', 'sch.uk', + 'com.au', 'net.au', 'org.au', 'edu.au', 'gov.au', 'asn.au', 'id.au', + 'co.nz', 'net.nz', 'org.nz', 'govt.nz', 'ac.nz', 'school.nz', 'geek.nz', + 'com.sg', 'edu.sg', 'gov.sg', 'net.sg', 'org.sg', 'per.sg', + 'co.za', 'org.za', 'web.za', 'net.za', 'gov.za', 'ac.za', + 'com.br', 'net.br', 'org.br', 'gov.br', 'edu.br', + 'co.jp', 'ac.jp', 'go.jp', 'or.jp', 'ne.jp', 'gr.jp', + 'co.in', 'firm.in', 'net.in', 'org.in', 'gen.in', 'ind.in', + 'edu.cn', 'gov.cn', 'net.cn', 'org.cn', 'com.cn', 'ac.cn', + 'com.mx', 'net.mx', 'org.mx', 'edu.mx', 'gob.mx' +] + +# Extract the base domain (SLD+TLD) from a full domain name +def extract_base_domain(domain: str) -> str: + if not domain: + return domain + + # Remove trailing dot if present + if domain.endswith('.'): + domain = domain[:-1] + + parts = domain.split('.') + + # Check if the domain has enough parts + if len(parts) <= 1: + return domain + + # Check for known multi-part TLDs first + for tld in MULTI_PART_TLDS: + tld_parts = tld.split('.') + if len(parts) > len(tld_parts) and '.'.join(parts[-len(tld_parts):]) == tld: + # The domain has a multi-part TLD, extract SLD + multi-part TLD + return parts[-len(tld_parts)-1] + '.' + tld + + # Default case: extract last two parts + if len(parts) > 1: + return '.'.join(parts[-2:]) + + return domain + +# Get all unique base domains from the database +def get_unique_base_domains(specific_upload_id: str = None) -> List[Dict]: + try: + domains = domains_table.all() + + # If a specific upload ID is provided, only show domains from that upload + if specific_upload_id: + domains = [d for d in domains if d.get('upload_id') == specific_upload_id] + + # Add the base_domain field to each domain + for domain in domains: + domain['base_domain'] = extract_base_domain(domain.get('full_domain', '')) + + # Sort by timestamp in descending order (newest first) + domains.sort(key=lambda x: x.get('timestamp', ''), reverse=True) + + # Create dictionaries to track unique base domains + unique_base_domains = {} + + # Process each domain and keep only unique base domains + for domain in domains: + base_domain = domain.get('base_domain', '') + + # Skip if no base domain + if not base_domain: + continue + + # Check if this base domain has been seen before + if base_domain not in unique_base_domains: + # Create a new entry for this base domain - with simplified fields + base_domain_entry = { + 'domain': base_domain, + 'timestamp': domain.get('timestamp') + } + unique_base_domains[base_domain] = base_domain_entry + + # Return the list of unique base domains + return list(unique_base_domains.values()) + except Exception as e: + print(f"Error getting unique base domains: {e}") + return [] + # Get unique values for filter dropdowns def get_unique_values(entries: List[Dict]) -> Dict[str, Set]: unique_values = { @@ -249,16 +352,21 @@ def delete_upload(upload_id): # Routes @app.get("/", response_class=HTMLResponse) -async def home(request: Request, upload_id: Optional[str] = None): - """Home page with upload form and SLD listing""" - domains = load_domains(upload_id) +async def home( + request: Request, + upload_id: Optional[str] = None, + base_domains_only: Optional[bool] = False +): + """Home page with upload form and domain listing""" + domains = load_domains(upload_id, base_domains_only) uploads = get_uploads() return templates.TemplateResponse( "index.html", { "request": request, "domains": domains, - "uploads": uploads + "uploads": uploads, + "base_domains_only": base_domains_only } ) @@ -370,12 +478,22 @@ async def get_all_uploads(): return get_uploads() @app.get("/api/domains", response_model=List[Dict]) -async def get_domains(upload_id: Optional[str] = None): - """API endpoint that returns all domains with optional filter by upload_id""" - # The load_domains function now handles deduplication and upload_id filtering - domains = load_domains(upload_id) +async def get_domains( + upload_id: Optional[str] = None, + base_domains_only: Optional[bool] = False +): + """API endpoint that returns all domains with optional filtering""" + # The load_domains function handles deduplication and filtering + domains = load_domains(upload_id, base_domains_only) return domains +@app.get("/api/base-domains", response_model=List[Dict]) +async def get_base_domains(upload_id: Optional[str] = None): + """API endpoint that returns only unique base domains""" + # Get only the unique base domains + base_domains = get_unique_base_domains(upload_id) + return base_domains + @app.get("/api/domains/{domain}", response_model=List[Dict]) async def get_domains_by_name(domain: str, upload_id: Optional[str] = None): """API endpoint that returns domains matching a specific domain name with optional filter by upload_id""" diff --git a/templates/index.html b/templates/index.html index 91846fb..7d0bef1 100644 --- a/templates/index.html +++ b/templates/index.html @@ -61,6 +61,23 @@ font-size: 0.9em; color: #0f5132; } + .base-domain-badge { + display: inline-block; + padding: 3px 7px; + background-color: #cfe2ff; + border-radius: 4px; + font-size: 0.9em; + color: #0a58ca; + } + .same-domain-badge { + display: inline-block; + padding: 3px 7px; + background-color: #e9ecef; + border-radius: 4px; + font-size: 0.9em; + color: #6c757d; + font-style: italic; + } .api-section { margin-top: 30px; padding: 15px; @@ -127,12 +144,48 @@ } .filter-form { margin-bottom: 20px; + background-color: #f9f9f9; + padding: 15px; + border-radius: 5px; + } + .filter-row { + display: flex; + flex-wrap: wrap; + gap: 15px; + align-items: flex-end; + } + .filter-group { + display: flex; + flex-direction: column; + } + .filter-group label { + font-weight: bold; + margin-bottom: 5px; + font-size: 0.9em; } .filter-select { padding: 8px 12px; border: 1px solid #ddd; border-radius: 4px; - margin-right: 10px; + min-width: 150px; + } + .btn-sm { + padding: 8px 16px; + font-size: 0.9em; + } + .reset-button { + display: inline-block; + padding: 8px 16px; + background-color: #f44336; + color: white; + text-decoration: none; + border-radius: 4px; + font-weight: bold; + font-size: 0.9em; + } + .reset-button:hover { + background-color: #e53935; + color: white; } @@ -199,15 +252,32 @@

Domain List

- - +
+
+ + +
+ +
+ + +
+ +
+ + Reset +
+
@@ -215,8 +285,10 @@

API Endpoints

Get all uploads: /api/uploads

Get all domains: /api/domains

+

Get only base domains: /api/base-domains (simplified format: {"domain": "example.com", "timestamp": "..."})

Get domains by name: /api/domains/{domain}

Filter by upload: /api/domains?upload_id={upload_id}

+

Show base domains only: /api/domains?base_domains_only=true

{% if domains %} @@ -225,6 +297,9 @@ Domain + {% if not base_domains_only %} + Base Domain + {% endif %} Upload Date @@ -232,6 +307,9 @@ {% for item in domains %} {{ item.full_domain }} + {% if not base_domains_only %} + {% if item.base_domain != item.full_domain %}{{ item.base_domain }}{% else %}Same as domain{% endif %} + {% endif %} {{ item.timestamp.replace('T', ' ').split('.')[0] if item.get('timestamp') else 'N/A' }} {% endfor %}