dnmgmt/api/dnshelper.py

125 lines
3.6 KiB
Python
Raw Normal View History

2017-05-01 06:11:54 +02:00
import dns.name
import dns.message
import dns.query
import dns.resolver
#from collections import defaultdict
# FIXME: DNS timeouts
def compareRecords(rrset, expected):
result = {
"nameMissing": [],
"rrMissing": [],
"rrExtra": [],
}
for domain, rrtype, content in expected:
for rrrec in rrset:
if domain == rrrec.name.to_text() and dns.rdatatype.from_text(rrtype) == rrrec.rdtype:
for name in content:
if name not in map(lambda _x: _x.to_text(), rrrec.items):
# record missing
result["rrMissing"].append((domain, rrtype, name))
for item in rrrec.items:
if item.to_text() not in content:
# superfluous record
result["rrExtra"].append((domain, rrtype, item.to_text()))
break
else:
# domain + rr nicht in nameserver
result["nameMissing"].append((domain, rrtype))
success = not any(len(_x) > 0 for _x in result.values())
return success, result
def dnsQuery(domain, rrType, nameserverIp):
dname = dns.name.from_text(domain)
req = dns.message.make_query(dname, dns.rdatatype.from_text(rrType))
resp = dns.query.udp(req, nameserverIp, timeout=2.0)
if resp.rcode() != dns.rcode.NXDOMAIN:
rrset = resp.answer + resp.authority + resp.additional
return True, rrset
else:
return False, []
def checkDomain(domain, tldNameserver, nameservers):
result = []
# build record set
nsRecords = [(domain, "NS", list(ns.name for ns in nameservers))]
glueRecords = []
for ns in nameservers:
if ns.name.endswith("." + domain):
if ns.glueIPv4 or ns.glueIPv6:
if ns.glueIPv4:
glueRecords.append((ns.name, "A", [ns.glueIPv4]))
if ns.glueIPv6:
glueRecords.append((ns.name, "AAAA", [ns.glueIPv6]))
else:
result.append(("err", "Nameserver %s is under domain %s, but has no glue entries." % (ns.name, domain)))
# 1. TLD nameserver
try:
found, rrset = dnsQuery(domain, "ANY", tldNameserver)
if found:
success, errors = compareRecords(rrset, nsRecords + glueRecords)
if success:
result.append(("succ", "All records present in TLD nameserver"))
else:
result.append(("err", "Record mismatch between TLD nameserver and WHOIS database", errors))
else:
result.append(("err", "Domain %s not found in TLD nameserver" % (domain,)))
except (dns.exception.Timeout, OSError):
result.append(("err", "TLD nameserver is currently not reachable"))
# find other records...
# 2. your nameservers
for ns in nameservers:
addr = None
if ns.glueIPv4:
addr = ns.glueIPv4
elif ns.glueIPv6:
addr = ns.glueIPv6
else:
for rrType in ("A", "AAAA"):
try:
r = dns.resolver.Resolver()
r.timeout = 2.0
q = r.query(ns.name, rdtype=dns.rdatatype.from_text(rrType))
addr = q.response.answer[0].items[0].address
except (dns.exception.DNSException, OSError):
pass
if addr:
err = False
errDict = {"nameMissing": [], "rrMissing": [], "rrExtra": []}
try:
for rec in (nsRecords + glueRecords):
found, rrset = dnsQuery(rec[0], rec[1], addr)
#success, errors = compareRecords(rrset, nsRecords + glueRecords)
success, errors = compareRecords(rrset, [rec])
2017-05-01 06:11:54 +02:00
if not success:
err = True
for k in errors.keys():
errDict[k].extend(errors[k])
if not err:
result.append(("succ", "Nameserver %s is configured correctly" % ns.name))
else:
result.append(("err", "Nameserver %s (via %s) recordset does not match the database" % (ns.name, addr), errDict))
2017-05-01 06:11:54 +02:00
except (dns.exception.DNSException, OSError):
result.append(("err", "Nameserver %s is not reachable (via %s)" % (ns.name, addr)))
else:
result.append(("err", "Can't resolv an ip address for nameserver %s" % ns.name))
return result