Skip to content
Snippets Groups Projects
Commit 0267cf6e authored by shadow's avatar shadow
Browse files

Attempt to decouple functionality. Will not work - Dockerfile is unfinished

parent 0c3f6722
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@ FROM python:3.5-alpine
RUN apk add nmap nmap-scripts git
RUN pip install xmltodict google-cloud-storage boto3
RUN pip install requirements.txt
RUN git clone https://github.com/vulnersCom/nmap-vulners /usr/share/nmap/scripts/vulners && nmap --script-updatedb
RUN mkdir /shared
......
......@@ -8,6 +8,6 @@ s3_client = boto3.client('s3')
try:
response = s3_client.upload_file("/" + filename, os.getenv('bucket'),
filename)
except Exception, e:
except Exception as e:
print('Error uploading to s3')
print(e)
from .description_provider import VulnDescriptionProvider, CveProjectProvider
import abc
from requests import Session, HTTPError
__all__ = ['VulnDescriptionProvider', 'CveProjectProvider']
class VulnDescriptionProvider(metaclass=abc.ABCMeta):
"""
Provides extended vulnerability description by vulnerablity identifier and type
"""
@abc.abstractmethod
def get_description(self, vuln: str, vuln_type: str) -> str:
pass
class CveProjectProvider(VulnDescriptionProvider):
"""
Provides vulnerability descriptions using requests to CVEProject
"""
uri_template = 'https://raw.githubusercontent.com/CVEProject/cvelist/master/{}/{}/{}.json'
def __init__(self, session: Session):
self.sess = session
def get_description(self, vuln: str, vuln_type: str) -> str:
try:
if vuln_type == 'cve':
year = vuln[4:8]
section = vuln[9:-3] + 'xxx'
url = self.uri_template.format(year, section, vuln)
response = self.sess.get(url)
response.raise_for_status()
cve_json = response.json()
return cve_json['description']['description_data'][0]['value']
except HTTPError as he:
return 'Description fetching error: ' + str(he)
return ''
from .flan_types import *
from collections import defaultdict
__all__ = ['SeverityLevels', 'Vuln', 'ScanResult']
class SeverityLevels:
"""
Just constants
"""
Low = 'Low'
Medium = 'Medium'
High = 'High'
class Vuln:
"""
Descriptor for vulnerability
"""
def __init__(self, name: str, vuln_type: str, severity: float):
self.name = name
self.vuln_type = vuln_type
self.severity = severity
@staticmethod
def convert_severity(severity: float) -> str:
"""
:return: Float severity value to text
"""
if severity < 4:
return 'Low'
elif severity < 7:
return 'Medium'
else:
return 'High'
@property
def severity_str(self) -> str:
"""
:return: Text severity representation
"""
return self.convert_severity(self.severity)
class ScanResult:
"""
Scan result representation
"""
def __init__(self):
self.locations = defaultdict(list)
self.vulns = []
import sys
import json
import urllib.request as urllib
import os
import xmltodict
results = {}
vulnerable_services = []
colors = {'High': 'FD6864', 'Medium': 'F8A102', 'Low': '34CDF9'}
def parse_vuln(ip_addr, port, app_name, vuln):
vuln_name = ''
severity = ''
type = ''
for field in vuln:
if field['@key'] == 'cvss':
severity = float(field['#text'])
elif field['@key'] == 'id':
vuln_name = field['#text']
elif field['@key'] == 'type':
type = field['#text']
if 'vulns'in results[app_name].keys():
results[app_name]['vulns'].append({'name': vuln_name,
'type': type,
'severity': severity})
else:
results[app_name]['vulns'] = [{'name': vuln_name,
'type': type,
'severity': severity}]
def parse_script(ip_addr, port, app_name, script):
vulnerable_services.append(app_name)
script_table = script['table']['table']
if isinstance(script_table, list):
for vuln in script_table:
parse_vuln(ip_addr, port, app_name, vuln['elem'])
else:
parse_vuln(ip_addr, port, app_name, script_table['elem'])
def get_app_name(service):
app_name = ''
if '@product' in service.keys():
app_name += service['@product'] + " "
if '@version' in service.keys():
app_name += service['@version'] + " "
elif '@name' in service.keys():
app_name += service['@name'] + " "
if('cpe' in service.keys()):
if isinstance(service['cpe'], list):
for cpe in service['cpe']:
app_name += '(' + cpe + ") "
else:
app_name += '(' + service['cpe'] + ") "
return app_name
def parse_port(ip_addr, port):
if port['state']['@state'] == 'closed':
return
app_name = get_app_name(port['service'])
port_num = port['@portid']
if app_name in results.keys():
if ip_addr in results[app_name]['locations'].keys():
results[app_name]['locations'][ip_addr].append(port_num)
else:
results[app_name]['locations'][ip_addr] = [port_num]
else:
results[app_name] = {'locations': {ip_addr: [port_num]}}
if 'script' in port.keys():
scripts = port['script']
if isinstance(scripts, list):
for s in scripts:
if s['@id'] == 'vulners':
parse_script(ip_addr, port_num, app_name, s)
else:
if scripts['@id'] == 'vulners':
parse_script(ip_addr, port_num, app_name, scripts)
def parse_host(host):
ip_addr = host['address']['@addr']
if host['status']['@state'] == 'up' and 'port' in host['ports'].keys():
ports = host['ports']['port']
if isinstance(ports, list):
for p in ports:
parse_port(ip_addr, p)
else:
parse_port(ip_addr, ports)
def parse_results(data):
hosts = data['nmaprun']['host']
if isinstance(hosts, list):
for h in hosts:
parse_host(h)
else:
parse_host(hosts)
def convert_severity(sev):
if sev < 4:
return 'Low'
elif sev < 7:
return 'Medium'
else:
return 'High'
def get_description(vuln, type):
if type == 'cve':
year = vuln[4:8]
section = vuln[9:-3] + 'xxx'
url = """https://raw.githubusercontent.com/CVEProject/cvelist/master/{}/{}/{}.json""".format(year, section, vuln)
cve_json = json.loads(urllib.urlopen(url).read().decode("utf-8"))
return cve_json["description"]["description_data"][0]["value"]
else:
return ''
def create_latex(nmap_command, start_date):
f = open('./latex_header.tex')
write_buffer = f.read()
f.close()
output_file = sys.argv[2]
ip_file = sys.argv[3]
write_buffer += "Flan Scan ran a network vulnerability scan with the following Nmap command on " \
+ start_date \
+ "UTC.\n\\begin{lstlisting}\n" \
+ nmap_command \
+ "\n\end{lstlisting}\nTo find out what IPs were scanned see the end of this report.\n"
write_buffer += "\section*{Services with Vulnerabilities}"
if vulnerable_services:
write_buffer += """\\begin{enumerate}[wide, labelwidth=!, labelindent=0pt,
label=\\textbf{\large \\arabic{enumi} \large}]\n"""
for s in vulnerable_services:
write_buffer += '\item \\textbf{\large ' + s + ' \large}'
vulns = results[s]['vulns']
locations = results[s]['locations']
num_vulns = len(vulns)
for i, v in enumerate(vulns):
write_buffer += '\\begin{figure}[h!]\n'
severity_name = convert_severity(v['severity'])
write_buffer += '\\begin{tabular}{|p{16cm}|}\\rowcolor[HTML]{' \
+ colors[severity_name] \
+ """} \\begin{tabular}{@{}p{15cm}>{\\raggedleft\\arraybackslash}
p{0.5cm}@{}}\\textbf{""" \
+ v['name'] + ' ' + severity_name + ' (' \
+ str(v['severity']) \
+ ')} & \href{https://nvd.nist.gov/vuln/detail/' \
+ v['name'] + '}{\large \\faicon{link}}' \
+ '\end{tabular}\\\\\n Summary:' \
+ get_description(v['name'], v['type']) \
+ '\\\\ \hline \end{tabular} '
write_buffer += '\end{figure}\n'
write_buffer += '\FloatBarrier\n\\textbf{The above ' \
+ str(num_vulns) \
+ """ vulnerabilities apply to these network locations:}\n
\\begin{itemize}\n"""
for addr in locations.keys():
write_buffer += '\item ' + addr + ' Ports: ' + str(locations[addr])+ '\n'
write_buffer += '\\\\ \\\\ \n \end{itemize}\n'
write_buffer += '\end{enumerate}\n'
non_vuln_services = list(set(results.keys()) - set(vulnerable_services))
write_buffer += '\section*{Services With No Known Vulnerabilities}'
if non_vuln_services:
write_buffer += """\\begin{enumerate}[wide, labelwidth=!, labelindent=0pt,
label=\\textbf{\large \\arabic{enumi} \large}]\n"""
for ns in non_vuln_services:
write_buffer += '\item \\textbf{\large ' + ns \
+ ' \large}\n\\begin{itemize}\n'
locations = results[ns]['locations']
for addr in locations.keys():
write_buffer += '\item ' + addr + ' Ports: ' + str(locations[addr])+ '\n'
write_buffer += '\end{itemize}\n'
write_buffer += '\end{enumerate}\n'
write_buffer += '\section*{List of IPs Scanned}'
write_buffer += '\\begin{itemize}\n'
f = open(ip_file)
for line in f:
write_buffer += '\item ' + line + '\n'
f.close()
write_buffer += '\end{itemize}\n'
write_buffer += '\end{document}'
latex_file = open(output_file, "w+")
latex_file.write(write_buffer)
latex_file.close()
def parse_nmap_command(raw_command):
nmap_split = raw_command.split()[:-1] #remove last element, ip address
nmap_split[3] = "<output-file>"
return " ".join(nmap_split)
def main():
dirname = sys.argv[1]
nmap_command = ""
start_date = ""
for i, filename in enumerate(os.listdir(dirname)):
f = open(dirname + "/" + filename)
xml_content = f.read()
f.close()
data = xmltodict.parse(xml_content)
parse_results(data)
if i == 0:
nmap_command = parse_nmap_command(data['nmaprun']['@args'])
start_date = data['nmaprun']['@startstr']
create_latex(nmap_command, start_date)
if __name__ == "__main__":
main()
import sys
from typing import IO
from requests import Session
from descriptions import CveProjectProvider
from parsers import FlanXmlParser
from report_builders import ReportBuilder, LatexReportBuilder
def read_file(path: str):
with open(path) as f:
return f.read()
def create_report(parser: FlanXmlParser, builder: ReportBuilder, nmap_command: str, start_date: str, output_writer: IO,
ip_reader: IO):
builder.init_report(start_date, nmap_command)
if parser.vulnerable_services:
builder.add_vulnerable_section()
builder.initialize_section()
builder.add_vulnerable_services(parser.vulnerable_dict)
if parser.non_vuln_services:
builder.add_non_vulnerable_section()
builder.initialize_section()
builder.add_non_vulnerable_services(parser.non_vulnerable_dict)
builder.add_ips_section()
for ip in ip_reader:
builder.add_ip_address(ip)
builder.finalize()
output_writer.write(builder.build())
def parse_nmap_command(raw_command: str) -> str:
nmap_split = raw_command.split()[:-1] # remove last element, ip address
nmap_split[3] = '<output-file>'
return ' '.join(nmap_split)
def create_report_builder(report_type: str) -> ReportBuilder:
if report_type == 'latex':
session = Session()
description_provider = CveProjectProvider(session)
report_bilder = LatexReportBuilder(description_provider)
return report_bilder
raise NotImplementedError(report_type)
def main(dirname: str, output_file: str, ip_file: str, report_type: str = 'latex'):
nmap_command = ''
start_date = ''
builder = create_report_builder(report_type)
parser = FlanXmlParser()
for entry in os.scandir(dirname): # type: os.DirEntry
if not (entry.is_file() and entry.name.endswith('.xml')):
continue
data = parser.read_xml_file(entry.path)
parser.parse(data)
nmap_command = parse_nmap_command(data['nmaprun']['@args'])
start_date = data['nmaprun']['@startstr']
with open(output_file, 'w+') as output, open(ip_file) as ip_source:
create_report(parser, builder, nmap_command, start_date, output, ip_source)
if __name__ == '__main__':
main(*sys.argv[1:4], report_type='latex')
from .flan_xml_parser import FlanXmlParser
from collections import defaultdict
from typing import Dict, Any, Collection, List, Set
import xmltodict
from internal_types import ScanResult, Vuln
__all__ = ['FlanXmlParser']
class FlanXmlParser:
"""
NMAP XML file reader and contents parser
"""
def __init__(self):
self.results = defaultdict(ScanResult)
self.vulnerable_services = [] # type: List[str]
@property
def vulnerable_dict(self) -> Dict[str, ScanResult]:
"""
:return: Map {app_name -> scan result} for vulnerable services
"""
return {service: self.results[service] for service in self.vulnerable_services}
@property
def non_vulnerable_dict(self) -> Dict[str, ScanResult]:
"""
:return: Map {app_name -> scan result} for services without detected vulnerabilities
"""
return {service: self.results[service] for service in self.non_vuln_services}
@property
def non_vuln_services(self) -> Set[str]:
"""
:return: App names for services without detected vulnerabilities
"""
return set(self.results) - set(self.vulnerable_services)
def parse(self, data: Dict[str, Any]):
"""
Parse xmltodict output and fill internal collections
:param data: xmltodict output
"""
hosts = data['nmaprun']['host']
if isinstance(hosts, list):
for h in hosts:
self.parse_host(h)
else:
self.parse_host(hosts)
def parse_vuln(self, app_name: str, vuln: Collection[Dict[str, Any]]):
vuln_name = ''
severity = ''
vuln_type = ''
for field in vuln:
if field['@key'] == 'cvss':
severity = float(field['#text'])
elif field['@key'] == 'id':
vuln_name = field['#text']
elif field['@key'] == 'type':
vuln_type = field['#text']
self.results[app_name].vulns.append(Vuln(vuln_name, vuln_type, severity))
def parse_script(self, app_name: str, script: Dict[str, Any]):
self.vulnerable_services.append(app_name)
script_table = script['table']['table']
if isinstance(script_table, list):
for vuln in script_table:
self.parse_vuln(app_name, vuln['elem'])
else:
self.parse_vuln(app_name, script_table['elem'])
def parse_port(self, ip_addr: str, port: Dict[str, Any]):
if port['state']['@state'] == 'closed':
return
app_name = self.get_app_name(port['service'])
port_num = port['@portid']
self.results[app_name].locations[ip_addr].append(port_num)
if 'script' in port:
scripts = port['script']
if isinstance(scripts, list):
for s in scripts:
if s['@id'] == 'vulners':
self.parse_script(app_name, s)
else:
if scripts['@id'] == 'vulners':
self.parse_script(app_name, scripts)
def parse_host(self, host: Dict[str, Any]):
ip_addr = host['address']['@addr']
if host['status']['@state'] == 'up' and 'port' in host['ports']:
ports = host['ports']['port']
if isinstance(ports, list):
for p in ports:
self.parse_port(ip_addr, p)
else:
self.parse_port(ip_addr, ports)
def read_xml_file(self, path: str) -> Dict[str, Any]:
"""
Read file and convert to dictionary. To read raw contents use `read_xml_contents`
:param path: path to .xml file
:return: parsed contents
"""
with open(path) as f:
contents = f.read()
return self.read_xml_contents(contents)
@staticmethod
def get_app_name(service: Dict[str, Any]) -> str:
app_name = ''
if '@product' in service:
app_name += service['@product'] + ' '
if '@version' in service:
app_name += service['@version'] + ' '
elif '@name' in service:
app_name += service['@name'] + ' '
if 'cpe' in service:
if isinstance(service['cpe'], list):
for cpe in service['cpe']:
app_name += '(' + cpe + ') '
else:
app_name += '(' + service['cpe'] + ') '
return app_name
@staticmethod
def read_xml_contents(contents: str) -> Dict[str, Any]:
return xmltodict.parse(contents)
from .report_builder import ReportBuilder
from .latex_report_builder import LatexReportBuilder
from typing import Any, Dict
from descriptions import VulnDescriptionProvider
from report_builders import ReportBuilder
from internal_types import ScanResult, SeverityLevels
__all__ = ['LatexReportBuilder']
class LatexReportBuilder(ReportBuilder):
"""
Report builer for LaTeX format. Returns raw contents as string
"""
def __init__(self, description_provider: VulnDescriptionProvider):
"""
:param description_provider: A provider of vulnerability full description
"""
self.description_provider = description_provider
self.buffer = self.header
self.colors = {SeverityLevels.High: 'FD6864',
SeverityLevels.Medium: 'F8A102',
SeverityLevels.Low: '34CDF9'}
def init_report(self, start_date: str, nmap_command: str):
self._append('Flan Scan ran a network vulnerability scan with the following Nmap command on '
+ start_date
+ 'UTC.\n\\begin{lstlisting}\n'
+ nmap_command
+ '\n\end{lstlisting}\nTo find out what IPs were scanned see the end of this report.\n')
def build(self) -> Any:
return self.buffer
@property
def header(self) -> str:
return self.report_header
def add_vulnerable_services(self, scan_results: Dict[str, ScanResult]):
for s, report in scan_results.items():
self._append('\item \\textbf{\large ' + s + ' \large}')
vulns = report.vulns
locations = report.locations
num_vulns = len(vulns)
for i, v in enumerate(vulns):
severity_name = v.severity_str
self._append('\\begin{figure}[h!]\n')
self._append('\\begin{tabular}{|p{16cm}|}\\rowcolor[HTML]{'
+ self.colors[severity_name]
+ '''} \\begin{tabular}{@{}p{15cm}>{\\raggedleft\\arraybackslash}
p{0.5cm}@{}}\\textbf{'''
+ v.name + ' ' + severity_name + ' ('
+ str(v.severity)
+ ')} & \href{https://nvd.nist.gov/vuln/detail/'
+ v.name + '}{\large \\faicon{link}}'
+ '\end{tabular}\\\\\n Summary:'
+ self.description_provider.get_description(v.name, v.vuln_type)
+ '\\\\ \hline \end{tabular} ')
self._append('\end{figure}\n')
self._append('\FloatBarrier\n\\textbf{The above '
+ str(num_vulns)
+ ''' vulnerabilities apply to these network locations:}\n
\\begin{itemize}\n''')
for addr in locations:
self._append('\item ' + addr + ' Ports: ' + str(locations[addr]) + '\n')
self._append('\\\\ \\\\ \n \end{itemize}\n')
self._append('\end{enumerate}\n')
def add_non_vulnerable_services(self, scan_results: Dict[str, ScanResult]):
self._append('''\\begin{enumerate}[wide, labelwidth=!, labelindent=0pt,
label=\\textbf{\large \\arabic{enumi} \large}]\n''')
for app_name, result in scan_results.items():
self._append('\item \\textbf{\large ' + app_name + ' \large}\n\\begin{itemize}\n')
locations = result.locations
for addr in locations:
self._append('\item ' + addr + ' Ports: ' + str(locations[addr]) + '\n')
self._append('\end{itemize}\n')
self._append('\end{enumerate}\n')
def initialize_section(self):
self._append('''\\begin{enumerate}[wide, labelwidth=!, labelindent=0pt,
label=\\textbf{\large \\arabic{enumi} \large}]\n''')
def add_vulnerable_section(self):
self._append('\section*{Services with Vulnerabilities}')
def add_non_vulnerable_section(self):
self._append('\section*{Services With No Known Vulnerabilities}')
def add_ips_section(self):
self._append('\section*{List of IPs Scanned}')
self._append('\\begin{itemize}\n')
def add_ip_address(self, ip: str):
self._append('\item ' + ip + '\n')
def finalize(self):
self._append('\end{itemize}\n')
self._append('\end{document}')
def _append(self, text: str):
self.buffer += text
# Don't want to depend on external file since this header is not so big.
report_header = r"""\documentclass{article}
\usepackage{enumitem}
\usepackage[margin=1in]{geometry}
\usepackage[utf8]{inputenc}
\usepackage[table,xcdraw]{xcolor}
\usepackage{placeins}
\usepackage{hyperref}
\usepackage{fontawesome}
\usepackage{listings}
\lstset{
basicstyle=\small\ttfamily,
columns=flexible,
breaklines=true
}
\title{Flan Scan Report\\}
\date{\today}
\begin{document}
\maketitle
\section*{Summary}
"""
import abc
from typing import Any, Dict
from internal_types import ScanResult
__all__ = ['ReportBuilder']
class ReportBuilder(metaclass=abc.ABCMeta):
@abc.abstractmethod
def init_report(self, start_date: str, nmap_command: str):
"""
Creates document section with report overview
"""
pass
@abc.abstractmethod
def build(self) -> Any:
"""
:return: Ready report in specific format
"""
pass
@abc.abstractmethod
def add_vulnerable_section(self):
"""
Adds header for section with vulnerable services
"""
pass
@abc.abstractmethod
def add_non_vulnerable_section(self):
"""
Adds header for section with services without detected vulnerabilities
"""
pass
@abc.abstractmethod
def add_vulnerable_services(self, scan_results: Dict[str, ScanResult]):
"""
Adds descriptions of vulnerable services
"""
pass
@abc.abstractmethod
def add_non_vulnerable_services(self, scan_results: Dict[str, ScanResult]):
"""
Adds descriptions of services without detected vulnerabilities
"""
pass
@abc.abstractmethod
def initialize_section(self):
"""
Adds begin of report section
"""
pass
@abc.abstractmethod
def add_ips_section(self):
"""
Adds section with list of scanned ip addresses
"""
pass
@abc.abstractmethod
def add_ip_address(self, ip: str):
"""
Adds IP-address to scanned addresses section
"""
pass
@abc.abstractmethod
def finalize(self):
"""
Adds report footer
"""
pass
@property
@abc.abstractmethod
def header(self) -> Any:
"""
:return: Common document header for format type (e.g. for latex report)
"""
pass
boto3==1.10.26
botocore==1.13.26
cachetools==3.1.1
certifi==2019.9.11
chardet==3.0.4
docutils==0.15.2
google-api-core==1.14.3
google-auth==1.7.1
google-cloud-core==1.0.3
google-cloud-storage==1.23.0
google-resumable-media==0.5.0
googleapis-common-protos==1.6.0
idna==2.8
jmespath==0.9.4
protobuf==3.10.0
pyasn1==0.4.8
pyasn1-modules==0.2.7
python-dateutil==2.8.0
pytz==2019.3
requests==2.22.0
rsa==4.0
s3transfer==0.2.1
six==1.13.0
urllib3==1.25.7
xmltodict==0.12.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment