From fc72f6f51c5ffe8de6757bf48abbb65e7be29cc5 Mon Sep 17 00:00:00 2001 From: CaffeineFueled Date: Wed, 9 Apr 2025 19:38:03 +0200 Subject: [PATCH] CHANGE SLD+TLD to FQDN, CHANGE dedublication to show all entries --- main.py | 140 ++++++++++++++----------------------- templates/dns_records.html | 57 +++++++-------- templates/index.html | 28 ++------ 3 files changed, 83 insertions(+), 142 deletions(-) diff --git a/main.py b/main.py index 4baf1cb..cf5991f 100644 --- a/main.py +++ b/main.py @@ -32,29 +32,11 @@ def process_domain_entry(domain_entry): if domain_entry.endswith('.'): domain_entry = domain_entry[:-1] - # Parse domain components - parts = domain_entry.split('.') - if len(parts) > 1: - # For domain.tld format - if len(parts) == 2: - sld = parts[0] # Second Level Domain - tld = parts[1] # Top Level Domain - domain_info = { - "sld": sld, - "tld": tld, - "full_domain": domain_entry - } - # For subdomain.domain.tld format - else: - sld = parts[-2] # Second Level Domain - tld = parts[-1] # Top Level Domain - subdomain = '.'.join(parts[:-2]) # Subdomains - domain_info = { - "sld": sld, - "tld": tld, - "full_domain": domain_entry, - "subdomain": subdomain - } + if domain_entry: + # Store only the full domain name without splitting + domain_info = { + "full_domain": domain_entry + } return domain_info return None @@ -90,7 +72,7 @@ async def process_csv_upload(file_content, upload_id, description=None): domain_info = process_domain_entry(domain_entry) if domain_info: # Create a unique key to avoid duplicates within this upload - unique_key = f"{domain_info['sld']}.{domain_info['tld']}" + unique_key = domain_info['full_domain'] if unique_key not in unique_domains: unique_domains.add(unique_key) @@ -127,22 +109,9 @@ async def process_csv_upload(file_content, upload_id, description=None): "timestamp": timestamp } - # Add domain components - if len(domain_parts) > 1: - if domain_parts[0].startswith('_'): # Service records like _dmarc - entry["service"] = domain_parts[0] - # Adjust domain parts - domain_parts = domain_parts[1:] - - # For domain.tld format - if len(domain_parts) == 2: - entry["sld"] = domain_parts[0] # Second Level Domain - entry["tld"] = domain_parts[1] # Top Level Domain - # For subdomain.domain.tld format - elif len(domain_parts) > 2: - entry["sld"] = domain_parts[-2] # Second Level Domain - entry["tld"] = domain_parts[-1] # Top Level Domain - entry["subdomain"] = '.'.join(domain_parts[:-2]) # Subdomains + # Add special handling for service records + if len(domain_parts) > 0 and domain_parts[0].startswith('_'): # Service records like _dmarc + entry["service"] = domain_parts[0] dns_records_to_insert.append(entry) @@ -201,34 +170,38 @@ def load_domains(specific_upload_id: str = None) -> List[Dict]: print(f"Error loading domains from database: {e}") return [] -# Load DNS entries from database - deduplicated by domain, class, and type (no history) -def load_dns_entries(specific_upload_id: str = None) -> List[Dict]: +# Load DNS entries from database - with optional deduplication +def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False) -> List[Dict]: try: entries = dns_records_table.all() # If a specific upload ID is provided, only show records from that upload if specific_upload_id: entries = [e for e in entries if e.get('upload_id') == specific_upload_id] - return entries # Sort by timestamp in descending order (newest first) entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True) - # Create a dictionary to track unique entries (most recent only) - unique_entries = {} - - for entry in entries: - # Create a unique key based on domain, class, and type - unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}" + # If deduplication is requested, only keep the most recent entry for each unique combination + if deduplicate: + # Create a dictionary to track unique entries (most recent only) + unique_entries = {} - # Only keep the most recent entry for each unique combination - if unique_key not in unique_entries: - # Mark as most recent entry - entry['is_latest'] = True - unique_entries[unique_key] = entry - - # Return the deduplicated list with only the most recent entries - return list(unique_entries.values()) + for entry in entries: + # Create a unique key based on domain, class, type, TTL, and data + unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}:{entry.get('ttl')}:{entry.get('record_data')}" + + # Only keep the most recent entry for each unique combination + if unique_key not in unique_entries: + # Mark as most recent entry + entry['is_latest'] = True + unique_entries[unique_key] = entry + + # Return the deduplicated list with only the most recent entries + return list(unique_entries.values()) + else: + # No deduplication - return all entries + return entries except Exception as e: print(f"Error loading DNS records from database: {e}") return [] @@ -237,9 +210,7 @@ def load_dns_entries(specific_upload_id: str = None) -> List[Dict]: def get_unique_values(entries: List[Dict]) -> Dict[str, Set]: unique_values = { "record_type": set(), - "record_class": set(), - "tld": set(), - "sld": set() + "record_class": set() } for entry in entries: @@ -361,28 +332,23 @@ async def dns_records( upload_id: Optional[str] = None, record_type: Optional[str] = None, record_class: Optional[str] = None, - tld: Optional[str] = None, - sld: Optional[str] = None, - domain: Optional[str] = None + domain: Optional[str] = None, + deduplicate: Optional[bool] = True # Default to showing only unique latest entries ): """DNS Records page with filtering""" - # Get all entries first, based on upload_id if provided - entries = load_dns_entries(upload_id) + # Get all entries first, based on upload_id if provided, with deduplication option + entries = load_dns_entries(upload_id, deduplicate) # Apply additional filters if provided if record_type: entries = [e for e in entries if e.get("record_type") == record_type] if record_class: entries = [e for e in entries if e.get("record_class") == record_class] - if tld: - entries = [e for e in entries if e.get("tld") == tld] - if sld: - entries = [e for e in entries if e.get("sld") == sld] if domain: entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()] # Get unique values for filter dropdowns from all entries (not filtered) - all_entries = load_dns_entries(upload_id) + all_entries = load_dns_entries(upload_id, deduplicate=False) unique_values = get_unique_values(all_entries) uploads = get_uploads() @@ -392,7 +358,8 @@ async def dns_records( "request": request, "entries": entries, "unique_values": unique_values, - "uploads": uploads + "uploads": uploads, + "deduplicate": deduplicate } ) @@ -402,24 +369,24 @@ async def get_all_uploads(): """API endpoint that returns all uploads""" return get_uploads() -@app.get("/api/slds", response_model=List[Dict]) -async def get_slds(upload_id: Optional[str] = None): - """API endpoint that returns all SLDs with optional filter by upload_id""" +@app.get("/api/domains", response_model=List[Dict]) +async def get_domains(upload_id: Optional[str] = None): + """API endpoint that returns all domains with optional filter by upload_id""" # The load_domains function now handles deduplication and upload_id filtering domains = load_domains(upload_id) return domains -@app.get("/api/slds/{sld}", response_model=List[Dict]) -async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None): - """API endpoint that returns domains for a specific SLD with optional filter by upload_id""" +@app.get("/api/domains/{domain}", response_model=List[Dict]) +async def get_domains_by_name(domain: str, upload_id: Optional[str] = None): + """API endpoint that returns domains matching a specific domain name with optional filter by upload_id""" # Get domains, already deduplicated and optionally filtered by upload_id all_domains = load_domains(upload_id) - # Filter by SLD - filtered = [item for item in all_domains if item["sld"].lower() == sld.lower()] + # Filter by domain name + filtered = [item for item in all_domains if domain.lower() in item["full_domain"].lower()] if not filtered: - raise HTTPException(status_code=404, detail=f"No domains found with SLD: {sld}") + raise HTTPException(status_code=404, detail=f"No domains found matching: {domain}") return filtered @@ -427,24 +394,19 @@ async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None): async def get_dns_entries( record_type: Optional[str] = None, record_class: Optional[str] = None, - tld: Optional[str] = None, - sld: Optional[str] = None, domain: Optional[str] = None, - upload_id: Optional[str] = None + upload_id: Optional[str] = None, + deduplicate: Optional[bool] = True ): - """API endpoint that returns filtered DNS entries""" + """API endpoint that returns filtered DNS entries with optional deduplication""" # Get entries - if upload_id is specified, only those entries are returned - entries = load_dns_entries(upload_id) + entries = load_dns_entries(upload_id, deduplicate) # Apply additional filters if provided if record_type: entries = [e for e in entries if e.get("record_type") == record_type] if record_class: entries = [e for e in entries if e.get("record_class") == record_class] - if tld: - entries = [e for e in entries if e.get("tld") == tld] - if sld: - entries = [e for e in entries if e.get("sld") == sld] if domain: entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()] diff --git a/templates/dns_records.html b/templates/dns_records.html index f28ec83..9b39169 100644 --- a/templates/dns_records.html +++ b/templates/dns_records.html @@ -115,14 +115,10 @@ background-color: #e0e0e0; color: #333; } - .sld-badge { + .domain-badge { background-color: #d1e7dd; color: #0f5132; } - .tld-badge { - background-color: #cfe2ff; - color: #0a58ca; - } .service-badge { background-color: #fff3cd; color: #664d03; @@ -149,6 +145,13 @@ margin-left: 10px; font-weight: normal; } + .dedup-note { + display: block; + font-size: 0.7em; + color: #666; + font-weight: normal; + margin-top: 5px; + } .tooltip { position: relative; display: inline-block; @@ -185,10 +188,10 @@
-

DNS Entry Viewer

+

DNS Records Viewer {% if deduplicate %}(Deduplicated){% else %}(All Records){% endif %}

@@ -222,35 +225,28 @@ {% endfor %}
-
- - -
-
- - -
+
+ + +
Reset
-

DNS Records {{ entries|length }}

+

DNS Records {{ entries|length }} + {% if deduplicate %} + (Showing most recent entries for each Domain+Class+Type+TTL+Data combination) + {% endif %} +

{% if entries %} @@ -271,11 +267,7 @@ {% if entry.get('service') %} {{ entry.service }} {% endif %} - {% if entry.get('subdomain') %} - {{ entry.subdomain }}. - {% endif %} - {{ entry.sld }}. - {{ entry.tld }} + {{ entry.domain }} @@ -302,8 +294,9 @@

API Endpoints

Get all DNS entries: /api/dns

-

Get filtered DNS entries: /api/dns?record_type=A&tld=de

+

Get filtered DNS entries: /api/dns?record_type=A&domain=example.com

Filter by upload: /api/dns?upload_id={upload_id}

+

Show all records (no deduplication): /api/dns?deduplicate=false

Get unique filter values: /api/dns/types

diff --git a/templates/index.html b/templates/index.html index 6b4f35f..91846fb 100644 --- a/templates/index.html +++ b/templates/index.html @@ -53,7 +53,7 @@ tr:hover { background-color: #f5f5f5; } - .sld-badge { + .domain-badge { display: inline-block; padding: 3px 7px; background-color: #d1e7dd; @@ -61,14 +61,6 @@ font-size: 0.9em; color: #0f5132; } - .tld-badge { - display: inline-block; - padding: 3px 7px; - background-color: #cfe2ff; - border-radius: 4px; - font-size: 0.9em; - color: #0a58ca; - } .api-section { margin-top: 30px; padding: 15px; @@ -149,7 +141,7 @@

Domain Management System

@@ -222,9 +214,9 @@

API Endpoints

Get all uploads: /api/uploads

-

Get all domains: /api/slds

-

Get domains by SLD: /api/slds/{sld}

-

Filter by upload: /api/slds?upload_id={upload_id}

+

Get all domains: /api/domains

+

Get domains by name: /api/domains/{domain}

+

Filter by upload: /api/domains?upload_id={upload_id}

{% if domains %} @@ -232,20 +224,14 @@
{{ entry.ttl }} {{ entry.record_class }}
- - - - + {% for item in domains %} - - - - + {% endfor %}
SLDTLDSubdomainFull DomainDomain Upload Date
{{ item.sld }}{{ item.tld }}{{ item.get('subdomain', 'N/A') }}{{ item.full_domain }}{{ item.full_domain }} {{ item.timestamp.replace('T', ' ').split('.')[0] if item.get('timestamp') else 'N/A' }}