A few years back, mainly out of frustration, I developed something that started as a poor man’s asset management system for a company’s perimeter, which turned into a poor man’s perimeter Security tool. Right now, I’m doing the same again, so I guess it’s time to do a short write up.
Perimeter?
For me, in this context, the perimeter is the public facing presence of a company. These may be Websites and –services, mail servers, routers and well, everything that can be reached from the Internet. I’m explicitly not saying “everything that >should< be reachable", because often these systems are misconfigured and expose by far too much information.
In this context, the perimeter does not contain phones, laptops etc.. Even though they’re technical part of a company’s perimeter, they’re not part of this post.
Getting Started
The first information I usually have is set of Websites, thus a set of URLs and thus a set of domains. During my first project I knew from that beginning I might be running into hundreds of domains and thus potentially thousands of URLs. The decision to work with a database was a no-brainer. Working with python, coding on the go and not having any technical requirements, I chose sqlite.
Tables and Columns
Domains turn into subdomains, subdomains into IP addresses. Each IP can serve multiple subdomains and above that even more services on different ports. I quickly lost joy in creating a data structure, which would be flexible enough to cover all my unexpected future ideas. So…I decided to use python Django! I started by designing models for Domains which had underlying subdomains which could be resolved into IP address. These then had ports with various potential details.
Initial Dataset
Starting from Domains, the initial steps should be obvious:
- Run a few of the various dnsrecon tools to get more names
- Resolve all names into IP addresses
- Do reverse lookups for all of the IP addresses
- Access each one of t and https and store the resulting URLs
- Extract domains and subdomains
- Recursively repeat until done
- Run portscans
- Try not to drown in the results :)
When dealing with a larger company it also makes sense to have a closer look at the subnets in which each one of the identified IP addresses resides. You might very well identify a company owned subnet/ASN and thus want to scan all included IPs.
Models
class domain(models.Model):
# data
text = models.CharField(max_length=200)
tld = models.ForeignKey(tld, on_delete=models.CASCADE, related_name='domain_tld')
ip = models.ForeignKey(ip, on_delete=models.CASCADE, related_name='domain_ip', blank=True, null=True)
# owner & contact
owner = models.ForeignKey(organization, on_delete=models.CASCADE, null=True, blank=True,
related_name='domain_owner')
operator = models.ForeignKey(organization, on_delete=models.CASCADE, null=True, blank=True,
related_name='domain_operator')
registrar = models.ForeignKey(organization, on_delete=models.CASCADE, null=True, blank=True,
related_name='domain_registrar')
adm_contact = models.ForeignKey(person, on_delete=models.CASCADE, null=True, blank=True,
related_name='domain_adm_contact')
sec_contact = models.ForeignKey(person, on_delete=models.CASCADE, null=True, blank=True,
related_name='domain_sec_contact')
# status
scan_blacklist = models.BooleanField(default=False)
active = models.BooleanField(default=True)
r_added = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='domain_r_added')
r_last_seen = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='domain_r_last_seen')
r_touched = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='domain_r_touched')
alert = models.ForeignKey(alert, on_delete=models.CASCADE, blank=True, null=True)
# dns
dns_txt = models.TextField(blank=True)
dns_spf = models.TextField(blank=True)
dnstwist = models.BooleanField(default=False)
confirmed_ours = models.BooleanField(default=False)
# rest
source = models.ForeignKey(datasource, on_delete=models.CASCADE)
source_reference = models.TextField(default="{}")
comment = models.TextField(blank=True)
# alerts
fields.GenericRelation(alert)
# classification
classification = TaggableManager(blank=True)
class Meta:
unique_together = ["text", "tld"]
def __str__(self):
return "%s.%s" % (self.text, self.tld)
The models for Subdomains, CNAME, Hosts and IPs are nearly the same.
The model for port simply contains the fields an nmap scan would return, thus banner and a service name. In addition, it has a boolean value for “webscan”, this is set based on a static list containing i.e. 80, 443, 8443, 8080 and the other usual suspects which are often used for web interfaces. For me the list grew a lot over time.
The “active” field is used to filter identified ports for further scanning. I didn’t want to delete things I had seen in the past, so I decided to simply void them.
class port(models.Model):
number = models.IntegerField()
ip = models.ForeignKey(ip, on_delete=models.CASCADE, null=True, blank=True, related_name='port_ip')
banner = models.TextField(blank=True, null=True)
service = models.TextField(blank=True, null=True)
# status
active = models.BooleanField(default=True)
r_added = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='port_r_added')
r_last_seen = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='port_r_last_seen')
r_touched = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='port_r_touched')
alert = models.ForeignKey(alert, on_delete=models.CASCADE, null=True, blank=True)
# rest
source = models.ForeignKey(datasource, on_delete=models.CASCADE)
comment = models.TextField(blank=True)
# tagging
classification = TaggableManager(blank=True)
webscan = models.BooleanField(default=False)
# notes
# port has no certificate! Due to SNI certificate has to be URL specific
def __str__(self):
return "%s:%s" % (self.ip, self.number)
In addition, I added datasets for MX entries, as mail servers are always interesting and might expose further external resources.
I also added the model “certificate”. On the one hand to identified old certificates and thus potentially vulnerable systems, on the other hand I used the subjects and alternative names to extract further interesting addresses.
class certificate(models.Model):
subject = models.CharField(max_length=100)
issuer = models.CharField(max_length=200)
expires = models.CharField(max_length=50)
sha1 = models.CharField(max_length=60)
alternative_names = models.CharField(max_length=300)
r_added = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='certificate_r_added')
r_last_seen = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='certificate_r_last_seen')
r_touched = models.ForeignKey(run, on_delete=models.CASCADE, null=True, blank=True, related_name='certificate_r_touched')
raw = models.TextField(blank=True)
comment = models.TextField(blank=True)
def __str__(self):
return "%s:%s" % (self.subject, self.sha1)
Full Datasets
To cover a company’s complete perimeter, and if choosing a whitebox approach, which is a must for monitoring systems, get full dumps from your brand management, DNS servers and other domain providers. You will still want to run the recon approach in addition, as well, you’ll see.
Scanning
Importing nmap scans with python is luckily super trivial! The scan itself is based on a set of IP addresses exported from the tool itself. Here it can be helpful to directly split the list into multiple files and thus use parallel nmap instances for scanning. This might save a lot of time.
# import nmap scan
def import_nmap_scan(self, path, run):
report = NmapParser.parse_fromfile(path)
for hst in report.hosts:
ip = self.g_ip(hst.address)
self.s_ip_last_scanned(ip, run)
if hst.is_up():
if not ip == -1:
for s in hst.services:
if s.open():
self.a_port(s.port, ip, s.banner, str(s.cpelist), self.g_src("nmap"), run)
The fun part starts when looking into the actual web content. The following function is called for each URL in the database.
# Single step for fetching site data
def fetch_url(self, target, port, port_id, tid, ttype, bssl, qr):
if self.noisy:
try:
print("Thread (" + str(threading.get_native_id()) + " / " + str(
threading.current_thread()) + "): is starting with target: " + str(target) + ":" + str(port))
except Exception as e:
pass
try:
if bssl:
turl = "https://"
else:
turl = "http://"
turl = turl + str(target)
turl = turl + ":" + port
requests.packages.urllib3.disable_warnings()
response = requests.get(turl, timeout=3, verify=False, allow_redirects=True)
final_url = response.url
content = response.content
headers = ""
headers_raw = response.headers
for header in headers_raw.items():
th = header[0] + ":" + header[1]
th = th.replace("\"", "")
th = th.replace("\'", "")
th = th.strip()
headers += str(th) + "\n"
if final_url.startswith("https://"):
bssl = True
r = [final_url, tid, ttype, port_id, bssl, headers, content]
xr = qr.get()
xr.append(r)
qr.put(xr)
if self.noisy:
try:
print("Thread (" + str(threading.get_native_id()) + " / " + str(
threading.current_thread()) + "): has finished with target: " + str(target) + ":" + str(port))
except Exception as e:
pass
except Exception as e:
if self.noisy:
try:
print("Thread (" + str(threading.get_native_id()) + " / " + str(
threading.current_thread()) + "): Exception")
except Exception as e:
pass
return
It does calls both via HTTP and HTTPs, ignoring SSL errors, and simply gives it a try. In the first step, using python requests, it extracts all headers and stores a full copy of the returned site data.
Afterwards a few further steps are performed:
- Using python ssdeep, a fuzzy hash is calculated for each one of the pages
- This is then used to identify URLs with the same content
- Why ssdeep? Dynamic content makes direct comparisons impossible and ssdeep is a stably working function for comparing partial differences
- The certificate for each established SSL connection is fetched and the contained data extracted
- Using selenium and chrome, fetch a screenshot of every single website
- As much as automation is key, when doing quality assurance and manual inspection, a screenshot is key!
- Tagging & Classification
- Yes, I wrote them all by hand
- Based on:
- Headers
- String matching on contents
- Banners
- Examples:
("Basic Auth", "WWW-Authenticate:Basic", "authbasic", "header_keyword", "", False),
("Version PHP 5.3.29", "X-Powered-By:PHP/5.3.29", "versionphp5329", "header_keyword", "", False),
("Sharepoint", "Sharepoint Portal", "softwaresharepoint", "body_keyword", "", False),
("Apache Default", "<IMG SRC=\"apache_pb.gif\" ALT=\"Powered by Apache!\">", "apachedefault", "body_keyword", "", False),
("QMail", "qmail", "softwareqmail", "banner", "", False),
("Git: .git/config found", '{"path":".git/config", "keyword": "[HEAD]"}', "issuegit", "path", "", True),
The last one is an example of a path tagger I added later on. It will try to access the path on all identified URLs and check whether the keyword exists in the response.
The following block shows how the screenshots where made
def fetch_screenshot(self, url, qr, tc):
PROJECT_ROOT = os.path.abspath(os.path.dirname(__file__))
DRIVER_BIN = os.path.join(PROJECT_ROOT, "external/chromedriver_85")
WINDOW_SIZE = "1280,720"
chrome_options = Options()
chrome_options.add_argument("--headless")
# chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=%s" % WINDOW_SIZE)
fail = True
browser = None
while fail:
try:
browser = webdriver.Chrome(executable_path=DRIVER_BIN, options=chrome_options)
fail = False
except Exception as e:
print(str(tc) + ": booom: " + str(e))
try:
browser.close()
browser.quit()
except Exception as e:
pass
fail = True
time.sleep(4)
browser.implicitly_wait(3)
try:
if self.noisy:
print(str(tc) + ": Fetching Screenshot for " + str(url))
browser.get(url.url)
file_name = hashlib.sha1(url.url.encode('utf-8')).hexdigest() + ".png"
browser.save_screenshot(os.path.join(self.settings.MEDIA_ROOT, file_name))
r = [url, file_name]
browser.close()
browser.quit()
# Write data to Queue
xr = qr.get()
xr.append(r)
qr.put(xr)
if self.noisy:
print(str(tc) + ": Done Screenshot for " + str(url))
except Exception as e:
browser.close()
browser.quit()
if self.noisy:
print(str(tc) + ": Fetch for " + str(url) + "didn't work, because: " + str(e))
Interface
Having used django as an easy database with extended features, the website part offers the option for direct data access. I created views based on tags, similar content and, well, an option to just click through all the screenshots and do a manual assessment.
Further Thoughts
Having all Domains in one place, I decided to add dnstwist to also identify typo squatting domains etc.. Also, I played with a direct integration of the ZAP attack proxy , for scanning all identified websites for low hanging fruits. While it technically worked, it did draw a lot of resources and took a LOAD of time :)
Finally
The described tool was initial named Peri-Peri or rather Piri-Piri after a meal next to the Thames. Version two was Spicy-Chicken and version three ended up as being Spicy-Rooster! Having started as a weekend and after-work project, then being used at work, the visualization and the tagging saved me days of time and allowed the identification of hundreds of potential Security issues in addition to a few significant flaws.
Sadly, but as expected, addressing the issues took up by far far more time than finding them, even including coding.
And, as initially stated, I’m just reading through my old code, finding out why I did the things I did the way that I did them and working out what I should do differently this time.