125 lines
3.6 KiB
Python
125 lines
3.6 KiB
Python
import dns.name
|
|
import dns.message
|
|
import dns.query
|
|
import dns.resolver
|
|
|
|
#from collections import defaultdict
|
|
# FIXME: DNS timeouts
|
|
|
|
def compareRecords(rrset, expected):
|
|
result = {
|
|
"nameMissing": [],
|
|
"rrMissing": [],
|
|
"rrExtra": [],
|
|
}
|
|
|
|
for domain, rrtype, content in expected:
|
|
for rrrec in rrset:
|
|
if domain == rrrec.name.to_text() and dns.rdatatype.from_text(rrtype) == rrrec.rdtype:
|
|
for name in content:
|
|
if name not in map(lambda _x: _x.to_text(), rrrec.items):
|
|
# record missing
|
|
result["rrMissing"].append((domain, rrtype, name))
|
|
|
|
for item in rrrec.items:
|
|
if item.to_text() not in content:
|
|
# superfluous record
|
|
result["rrExtra"].append((domain, rrtype, item.to_text()))
|
|
|
|
break
|
|
else:
|
|
# domain + rr nicht in nameserver
|
|
result["nameMissing"].append((domain, rrtype))
|
|
|
|
success = not any(len(_x) > 0 for _x in result.values())
|
|
|
|
return success, result
|
|
|
|
|
|
def dnsQuery(domain, rrType, nameserverIp):
|
|
dname = dns.name.from_text(domain)
|
|
req = dns.message.make_query(dname, dns.rdatatype.from_text(rrType))
|
|
resp = dns.query.udp(req, nameserverIp, timeout=2.0)
|
|
|
|
if resp.rcode() != dns.rcode.NXDOMAIN:
|
|
rrset = resp.answer + resp.authority + resp.additional
|
|
return True, rrset
|
|
else:
|
|
return False, []
|
|
|
|
|
|
def checkDomain(domain, tldNameserver, nameservers):
|
|
result = []
|
|
|
|
# build record set
|
|
nsRecords = [(domain, "NS", list(ns.name for ns in nameservers))]
|
|
glueRecords = []
|
|
for ns in nameservers:
|
|
if ns.name.endswith("." + domain):
|
|
if ns.glueIPv4 or ns.glueIPv6:
|
|
if ns.glueIPv4:
|
|
glueRecords.append((ns.name, "A", [ns.glueIPv4]))
|
|
if ns.glueIPv6:
|
|
glueRecords.append((ns.name, "AAAA", [ns.glueIPv6]))
|
|
else:
|
|
result.append(("err", "Nameserver %s is under domain %s, but has no glue entries." % (ns.name, domain)))
|
|
|
|
# 1. TLD nameserver
|
|
try:
|
|
found, rrset = dnsQuery(domain, "ANY", tldNameserver)
|
|
if found:
|
|
success, errors = compareRecords(rrset, nsRecords + glueRecords)
|
|
if success:
|
|
result.append(("succ", "All records present in TLD nameserver"))
|
|
else:
|
|
result.append(("err", "Record mismatch between TLD nameserver and WHOIS database", errors))
|
|
else:
|
|
result.append(("err", "Domain %s not found in TLD nameserver" % (domain,)))
|
|
except (dns.exception.Timeout, OSError):
|
|
result.append(("err", "TLD nameserver is currently not reachable"))
|
|
|
|
# find other records...
|
|
|
|
# 2. your nameservers
|
|
for ns in nameservers:
|
|
addr = None
|
|
if ns.glueIPv4:
|
|
addr = ns.glueIPv4
|
|
elif ns.glueIPv6:
|
|
addr = ns.glueIPv6
|
|
else:
|
|
for rrType in ("A", "AAAA"):
|
|
try:
|
|
r = dns.resolver.Resolver()
|
|
r.timeout = 2.0
|
|
q = r.query(ns.name, rdtype=dns.rdatatype.from_text(rrType))
|
|
addr = q.response.answer[0].items[0].address
|
|
except (dns.exception.DNSException, OSError):
|
|
pass
|
|
|
|
if addr:
|
|
err = False
|
|
errDict = {"nameMissing": [], "rrMissing": [], "rrExtra": []}
|
|
try:
|
|
for rec in (nsRecords + glueRecords):
|
|
found, rrset = dnsQuery(rec[0], rec[1], addr)
|
|
|
|
#success, errors = compareRecords(rrset, nsRecords + glueRecords)
|
|
success, errors = compareRecords(rrset, [rec])
|
|
if not success:
|
|
err = True
|
|
for k in errors.keys():
|
|
errDict[k].extend(errors[k])
|
|
|
|
if not err:
|
|
result.append(("succ", "Nameserver %s is configured correctly" % ns.name))
|
|
else:
|
|
result.append(("err", "Nameserver %s (via %s) recordset does not match the database" % (ns.name, addr), errDict))
|
|
except (dns.exception.DNSException, OSError):
|
|
result.append(("err", "Nameserver %s is not reachable (via %s)" % (ns.name, addr)))
|
|
|
|
else:
|
|
result.append(("err", "Can't resolv an ip address for nameserver %s" % ns.name))
|
|
|
|
return result
|