diff --git a/main.py b/main.py
index 4baf1cb..cf5991f 100644
--- a/main.py
+++ b/main.py
@@ -32,29 +32,11 @@ def process_domain_entry(domain_entry):
if domain_entry.endswith('.'):
domain_entry = domain_entry[:-1]
- # Parse domain components
- parts = domain_entry.split('.')
- if len(parts) > 1:
- # For domain.tld format
- if len(parts) == 2:
- sld = parts[0] # Second Level Domain
- tld = parts[1] # Top Level Domain
- domain_info = {
- "sld": sld,
- "tld": tld,
- "full_domain": domain_entry
- }
- # For subdomain.domain.tld format
- else:
- sld = parts[-2] # Second Level Domain
- tld = parts[-1] # Top Level Domain
- subdomain = '.'.join(parts[:-2]) # Subdomains
- domain_info = {
- "sld": sld,
- "tld": tld,
- "full_domain": domain_entry,
- "subdomain": subdomain
- }
+ if domain_entry:
+ # Store only the full domain name without splitting
+ domain_info = {
+ "full_domain": domain_entry
+ }
return domain_info
return None
@@ -90,7 +72,7 @@ async def process_csv_upload(file_content, upload_id, description=None):
domain_info = process_domain_entry(domain_entry)
if domain_info:
# Create a unique key to avoid duplicates within this upload
- unique_key = f"{domain_info['sld']}.{domain_info['tld']}"
+ unique_key = domain_info['full_domain']
if unique_key not in unique_domains:
unique_domains.add(unique_key)
@@ -127,22 +109,9 @@ async def process_csv_upload(file_content, upload_id, description=None):
"timestamp": timestamp
}
- # Add domain components
- if len(domain_parts) > 1:
- if domain_parts[0].startswith('_'): # Service records like _dmarc
- entry["service"] = domain_parts[0]
- # Adjust domain parts
- domain_parts = domain_parts[1:]
-
- # For domain.tld format
- if len(domain_parts) == 2:
- entry["sld"] = domain_parts[0] # Second Level Domain
- entry["tld"] = domain_parts[1] # Top Level Domain
- # For subdomain.domain.tld format
- elif len(domain_parts) > 2:
- entry["sld"] = domain_parts[-2] # Second Level Domain
- entry["tld"] = domain_parts[-1] # Top Level Domain
- entry["subdomain"] = '.'.join(domain_parts[:-2]) # Subdomains
+ # Add special handling for service records
+ if len(domain_parts) > 0 and domain_parts[0].startswith('_'): # Service records like _dmarc
+ entry["service"] = domain_parts[0]
dns_records_to_insert.append(entry)
@@ -201,34 +170,38 @@ def load_domains(specific_upload_id: str = None) -> List[Dict]:
print(f"Error loading domains from database: {e}")
return []
-# Load DNS entries from database - deduplicated by domain, class, and type (no history)
-def load_dns_entries(specific_upload_id: str = None) -> List[Dict]:
+# Load DNS entries from database - with optional deduplication
+def load_dns_entries(specific_upload_id: str = None, deduplicate: bool = False) -> List[Dict]:
try:
entries = dns_records_table.all()
# If a specific upload ID is provided, only show records from that upload
if specific_upload_id:
entries = [e for e in entries if e.get('upload_id') == specific_upload_id]
- return entries
# Sort by timestamp in descending order (newest first)
entries.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
- # Create a dictionary to track unique entries (most recent only)
- unique_entries = {}
-
- for entry in entries:
- # Create a unique key based on domain, class, and type
- unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}"
+ # If deduplication is requested, only keep the most recent entry for each unique combination
+ if deduplicate:
+ # Create a dictionary to track unique entries (most recent only)
+ unique_entries = {}
- # Only keep the most recent entry for each unique combination
- if unique_key not in unique_entries:
- # Mark as most recent entry
- entry['is_latest'] = True
- unique_entries[unique_key] = entry
-
- # Return the deduplicated list with only the most recent entries
- return list(unique_entries.values())
+ for entry in entries:
+ # Create a unique key based on domain, class, type, TTL, and data
+ unique_key = f"{entry.get('domain')}:{entry.get('record_class')}:{entry.get('record_type')}:{entry.get('ttl')}:{entry.get('record_data')}"
+
+ # Only keep the most recent entry for each unique combination
+ if unique_key not in unique_entries:
+ # Mark as most recent entry
+ entry['is_latest'] = True
+ unique_entries[unique_key] = entry
+
+ # Return the deduplicated list with only the most recent entries
+ return list(unique_entries.values())
+ else:
+ # No deduplication - return all entries
+ return entries
except Exception as e:
print(f"Error loading DNS records from database: {e}")
return []
@@ -237,9 +210,7 @@ def load_dns_entries(specific_upload_id: str = None) -> List[Dict]:
def get_unique_values(entries: List[Dict]) -> Dict[str, Set]:
unique_values = {
"record_type": set(),
- "record_class": set(),
- "tld": set(),
- "sld": set()
+ "record_class": set()
}
for entry in entries:
@@ -361,28 +332,23 @@ async def dns_records(
upload_id: Optional[str] = None,
record_type: Optional[str] = None,
record_class: Optional[str] = None,
- tld: Optional[str] = None,
- sld: Optional[str] = None,
- domain: Optional[str] = None
+ domain: Optional[str] = None,
+ deduplicate: Optional[bool] = True # Default to showing only unique latest entries
):
"""DNS Records page with filtering"""
- # Get all entries first, based on upload_id if provided
- entries = load_dns_entries(upload_id)
+ # Get all entries first, based on upload_id if provided, with deduplication option
+ entries = load_dns_entries(upload_id, deduplicate)
# Apply additional filters if provided
if record_type:
entries = [e for e in entries if e.get("record_type") == record_type]
if record_class:
entries = [e for e in entries if e.get("record_class") == record_class]
- if tld:
- entries = [e for e in entries if e.get("tld") == tld]
- if sld:
- entries = [e for e in entries if e.get("sld") == sld]
if domain:
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
# Get unique values for filter dropdowns from all entries (not filtered)
- all_entries = load_dns_entries(upload_id)
+ all_entries = load_dns_entries(upload_id, deduplicate=False)
unique_values = get_unique_values(all_entries)
uploads = get_uploads()
@@ -392,7 +358,8 @@ async def dns_records(
"request": request,
"entries": entries,
"unique_values": unique_values,
- "uploads": uploads
+ "uploads": uploads,
+ "deduplicate": deduplicate
}
)
@@ -402,24 +369,24 @@ async def get_all_uploads():
"""API endpoint that returns all uploads"""
return get_uploads()
-@app.get("/api/slds", response_model=List[Dict])
-async def get_slds(upload_id: Optional[str] = None):
- """API endpoint that returns all SLDs with optional filter by upload_id"""
+@app.get("/api/domains", response_model=List[Dict])
+async def get_domains(upload_id: Optional[str] = None):
+ """API endpoint that returns all domains with optional filter by upload_id"""
# The load_domains function now handles deduplication and upload_id filtering
domains = load_domains(upload_id)
return domains
-@app.get("/api/slds/{sld}", response_model=List[Dict])
-async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None):
- """API endpoint that returns domains for a specific SLD with optional filter by upload_id"""
+@app.get("/api/domains/{domain}", response_model=List[Dict])
+async def get_domains_by_name(domain: str, upload_id: Optional[str] = None):
+ """API endpoint that returns domains matching a specific domain name with optional filter by upload_id"""
# Get domains, already deduplicated and optionally filtered by upload_id
all_domains = load_domains(upload_id)
- # Filter by SLD
- filtered = [item for item in all_domains if item["sld"].lower() == sld.lower()]
+ # Filter by domain name
+ filtered = [item for item in all_domains if domain.lower() in item["full_domain"].lower()]
if not filtered:
- raise HTTPException(status_code=404, detail=f"No domains found with SLD: {sld}")
+ raise HTTPException(status_code=404, detail=f"No domains found matching: {domain}")
return filtered
@@ -427,24 +394,19 @@ async def get_domains_by_sld(sld: str, upload_id: Optional[str] = None):
async def get_dns_entries(
record_type: Optional[str] = None,
record_class: Optional[str] = None,
- tld: Optional[str] = None,
- sld: Optional[str] = None,
domain: Optional[str] = None,
- upload_id: Optional[str] = None
+ upload_id: Optional[str] = None,
+ deduplicate: Optional[bool] = True
):
- """API endpoint that returns filtered DNS entries"""
+ """API endpoint that returns filtered DNS entries with optional deduplication"""
# Get entries - if upload_id is specified, only those entries are returned
- entries = load_dns_entries(upload_id)
+ entries = load_dns_entries(upload_id, deduplicate)
# Apply additional filters if provided
if record_type:
entries = [e for e in entries if e.get("record_type") == record_type]
if record_class:
entries = [e for e in entries if e.get("record_class") == record_class]
- if tld:
- entries = [e for e in entries if e.get("tld") == tld]
- if sld:
- entries = [e for e in entries if e.get("sld") == sld]
if domain:
entries = [e for e in entries if domain.lower() in e.get("domain", "").lower()]
diff --git a/templates/dns_records.html b/templates/dns_records.html
index f28ec83..9b39169 100644
--- a/templates/dns_records.html
+++ b/templates/dns_records.html
@@ -115,14 +115,10 @@
background-color: #e0e0e0;
color: #333;
}
- .sld-badge {
+ .domain-badge {
background-color: #d1e7dd;
color: #0f5132;
}
- .tld-badge {
- background-color: #cfe2ff;
- color: #0a58ca;
- }
.service-badge {
background-color: #fff3cd;
color: #664d03;
@@ -149,6 +145,13 @@
margin-left: 10px;
font-weight: normal;
}
+ .dedup-note {
+ display: block;
+ font-size: 0.7em;
+ color: #666;
+ font-weight: normal;
+ margin-top: 5px;
+ }
.tooltip {
position: relative;
display: inline-block;
@@ -185,10 +188,10 @@
-
DNS Entry Viewer
+
DNS Records Viewer {% if deduplicate %}(Deduplicated){% else %}(All Records){% endif %}
DNS Records {{ entries|length }}
+ {% if deduplicate %}
+ (Showing most recent entries for each Domain+Class+Type+TTL+Data combination)
+ {% endif %}
+