# This file is part of dnmgmt, a number resource management system # Licensed under GNU General Public License v3 or later # Written by Sebastian Lohff (seba@someserver.de) import dns.name import dns.message import dns.query import dns.resolver #from collections import defaultdict # FIXME: DNS timeouts def compareRecords(rrset, expected): result = { "nameMissing": [], "rrMissing": [], "rrExtra": [], } for domain, rrtype, content in expected: for rrrec in rrset: if domain == rrrec.name.to_text() and dns.rdatatype.from_text(rrtype) == rrrec.rdtype: for name in content: if name not in map(lambda _x: _x.to_text(), rrrec.items): # record missing result["rrMissing"].append((domain, rrtype, name)) for item in rrrec.items: if item.to_text() not in content: # superfluous record result["rrExtra"].append((domain, rrtype, item.to_text())) break else: # domain + rr nicht in nameserver result["nameMissing"].append((domain, rrtype)) success = not any(len(_x) > 0 for _x in result.values()) return success, result def dnsQuery(domain, rrType, nameserverIp): dname = dns.name.from_text(domain) req = dns.message.make_query(dname, dns.rdatatype.from_text(rrType)) resp = dns.query.udp(req, nameserverIp, timeout=2.0) if resp.rcode() != dns.rcode.NXDOMAIN: rrset = resp.answer + resp.authority + resp.additional return True, rrset else: return False, [] def checkDomain(domain, tldNameserver, nameservers): result = [] if nameservers.count() == 0: return [("err", "Domain %s has no nameservers attached to it, nothing to check" % domain)] # build record set nsRecords = [(domain, "NS", list(ns.name for ns in nameservers))] glueRecords = [] for ns in nameservers: if ns.name.endswith("." + domain): if ns.glueIPv4 or ns.glueIPv6: if ns.glueIPv4: glueRecords.append((ns.name, "A", [ns.glueIPv4])) if ns.glueIPv6: glueRecords.append((ns.name, "AAAA", [ns.glueIPv6])) else: result.append(("err", "Nameserver %s is under domain %s, but has no glue entries." % (ns.name, domain))) # 1. TLD nameserver try: found, rrset = dnsQuery(domain, "ANY", tldNameserver) if found: success, errors = compareRecords(rrset, nsRecords + glueRecords) if success: result.append(("succ", "All records present in TLD nameserver")) else: result.append(("err", "Record mismatch between TLD nameserver and WHOIS database", errors)) else: result.append(("err", "Domain %s not found in TLD nameserver" % (domain,))) except (dns.exception.Timeout, OSError): result.append(("err", "TLD nameserver is currently not reachable")) # find other records... # 2. your nameservers for ns in nameservers: addr = None if ns.glueIPv4: addr = ns.glueIPv4 elif ns.glueIPv6: addr = ns.glueIPv6 else: for rrType in ("A", "AAAA"): try: r = dns.resolver.Resolver() r.timeout = 2.0 q = r.query(ns.name, rdtype=dns.rdatatype.from_text(rrType)) addr = q.response.answer[0].items[0].address except (dns.exception.DNSException, OSError): pass if addr: err = False errDict = {"nameMissing": [], "rrMissing": [], "rrExtra": []} try: for rec in (nsRecords + glueRecords): found, rrset = dnsQuery(rec[0], rec[1], addr) #success, errors = compareRecords(rrset, nsRecords + glueRecords) success, errors = compareRecords(rrset, [rec]) if not success: err = True for k in errors.keys(): errDict[k].extend(errors[k]) if not err: result.append(("succ", "Nameserver %s is configured correctly" % ns.name)) else: result.append(("err", "Nameserver %s (via %s) recordset does not match the database" % (ns.name, addr), errDict)) except (dns.exception.DNSException, OSError): result.append(("err", "Nameserver %s is not reachable (via %s)" % (ns.name, addr))) else: result.append(("err", "Can't resolv an ip address for nameserver %s" % ns.name)) return result