Skip to content

Commit

Permalink
sflib: Code cleanup (smicallef#1673)
Browse files Browse the repository at this point in the history
* sflib: Use f-strings and resolve pylint violations

* sflib: Store dictnames and dictwords words in set()
  • Loading branch information
bcoles committed May 8, 2022
1 parent e7deced commit 8f70de2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 72 deletions.
113 changes: 53 additions & 60 deletions sflib.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self, options: dict) -> None:
TypeError: options argument was invalid type
"""
if not isinstance(options, dict):
raise TypeError("options is %s; expected dict()" % type(options))
raise TypeError(f"options is {type(options)}; expected dict()")

self.opts = deepcopy(options)
self.log = logging.getLogger(f"spiderfoot.{__name__}")
Expand Down Expand Up @@ -307,17 +307,17 @@ def cacheGet(self, label: str, timeoutHrs: int) -> str:
pathLabel = hashlib.sha224(label.encode('utf-8')).hexdigest()
cacheFile = SpiderFootHelpers.cachePath() + "/" + pathLabel
try:
(m, i, d, n, u, g, sz, atime, mtime, ctime) = os.stat(cacheFile)

if sz == 0:
return None
cache_stat = os.stat(cacheFile)
except OSError:
return None

if mtime > time.time() - timeoutHrs * 3600 or timeoutHrs == 0:
with open(cacheFile, "r") as fp:
return fp.read()
except BaseException:
if cache_stat.st_size == 0:
return None

if cache_stat.st_mtime > time.time() - timeoutHrs * 3600 or timeoutHrs == 0:
with open(cacheFile, "r") as fp:
return fp.read()

return None

def configSerialize(self, opts: dict, filterSystem: bool = True):
Expand Down Expand Up @@ -361,7 +361,7 @@ def configSerialize(self, opts: dict, filterSystem: bool = True):
return storeopts

if not isinstance(opts['__modules__'], dict):
raise TypeError("opts['__modules__'] is %s; expected dict()" % type(opts['__modules__']))
raise TypeError(f"opts['__modules__'] is {type(opts['__modules__'])}; expected dict()")

for mod in opts['__modules__']:
for opt in opts['__modules__'][mod]['opts']:
Expand Down Expand Up @@ -401,9 +401,9 @@ def configUnserialize(self, opts: dict, referencePoint: dict, filterSystem: bool
"""

if not isinstance(opts, dict):
raise TypeError("opts is %s; expected dict()" % type(opts))
raise TypeError(f"opts is {type(opts)}; expected dict()")
if not isinstance(referencePoint, dict):
raise TypeError("referencePoint is %s; expected dict()" % type(referencePoint))
raise TypeError(f"referencePoint is {type(referencePoint)}; expected dict()")

returnOpts = referencePoint

Expand Down Expand Up @@ -614,7 +614,7 @@ def urlRelativeToAbsolute(self, url: str) -> str:
str: URL relative path
"""
if not url:
self.error("Invalid URL: %s" % url)
self.error(f"Invalid URL: {url}")
return None

finalBits = list()
Expand Down Expand Up @@ -651,7 +651,7 @@ def urlBaseDir(self, url: str) -> str:
str: base directory
"""
if not url:
self.error("Invalid URL: %s" % url)
self.error(f"Invalid URL: {url}")
return None

bits = url.split('/')
Expand Down Expand Up @@ -680,7 +680,7 @@ def urlBaseUrl(self, url: str) -> str:
str: base URL without trailing slash
"""
if not url:
self.error("Invalid URL: %s" % url)
self.error(f"Invalid URL: {url}")
return None

if '://' in url:
Expand Down Expand Up @@ -754,14 +754,14 @@ def domainKeywords(self, domainList: list, tldList: list) -> set:
set: List of keywords
"""
if not domainList:
self.error("Invalid domain list: %s" % domainList)
self.error(f"Invalid domain list: {domainList}")
return set()

keywords = list()
for domain in domainList:
keywords.append(self.domainKeyword(domain, tldList))

self.debug("Keywords: %s" % keywords)
self.debug(f"Keywords: {keywords}")
return set([k for k in keywords if k])

def hostDomain(self, hostname: str, tldList: list) -> str:
Expand Down Expand Up @@ -978,53 +978,47 @@ def validPhoneNumber(self, phone: str) -> bool:
except Exception:
return False

def dictwords(self) -> list:
"""Return dictionary words and/or names from several language dictionaries.
def dictwords(self) -> set:
"""Return dictionary words from several language dictionaries.
Returns:
list: words and names from dictionaries
set: words from dictionaries
"""
wd = dict()
words = set()

dicts = ["english", "german", "french", "spanish"]

for d in dicts:
try:
with io.open(f"{self.myPath()}/spiderfoot/dicts/ispell/{d}.dict", 'r', encoding='utf8', errors='ignore') as wdct:
dlines = wdct.readlines()
with io.open(f"{self.myPath()}/spiderfoot/dicts/ispell/{d}.dict", 'r', encoding='utf8', errors='ignore') as dict_file:
for w in dict_file.readlines():
words.add(w.strip().lower().split('/')[0])
except BaseException as e:
self.debug(f"Could not read dictionary: {e}")
continue

for w in dlines:
w = w.strip().lower()
wd[w.split('/')[0]] = True
return words

return list(wd.keys())

def dictnames(self) -> list:
"""Return names of available dictionary files.
def dictnames(self) -> set:
"""Return list of human names.
Returns:
list: list of dictionary file names.
set: human names
"""
wd = dict()
words = set()

dicts = ["names"]

for d in dicts:
try:
with open(f"{self.myPath()}/spiderfoot/dicts/ispell/{d}.dict", 'r') as wdct:
dlines = wdct.readlines()
with open(f"{self.myPath()}/spiderfoot/dicts/ispell/{d}.dict", 'r') as dict_file:
for w in dict_file.readlines():
words.add(w.strip().lower().split('/')[0])
except BaseException as e:
self.debug(f"Could not read dictionary: {e}")
continue

for w in dlines:
w = w.strip().lower()
wd[w.split('/')[0]] = True

return list(wd.keys())
return words

def resolveHost(self, host: str) -> list:
"""Return a normalised IPv4 resolution of a hostname.
Expand Down Expand Up @@ -1726,7 +1720,7 @@ def parseCert(self, rawcert: str, fqdn: str = None, expiringdays: int = 30) -> d
Args:
rawcert (str): PEM-format SSL certificate
fqdn (str): TBD
fqdn (str): expected FQDN for certificate
expiringdays (int): The certificate will be considered as "expiring" if within this number of days of expiry.
Returns:
Expand Down Expand Up @@ -1779,8 +1773,7 @@ def parseCert(self, rawcert: str, fqdn: str = None, expiringdays: int = 30) -> d
if isinstance(x, cryptography.x509.DNSName):
ret['altnames'].append(x.value.lower().encode('raw_unicode_escape').decode("ascii", errors='replace'))
except BaseException as e:
self.debug("Problem processing certificate: " + str(e))
pass
self.debug(f"Problem processing certificate: {e}")

certhosts = list()
try:
Expand All @@ -1792,8 +1785,7 @@ def parseCert(self, rawcert: str, fqdn: str = None, expiringdays: int = 30) -> d
if name not in ret['altnames']:
certhosts.append(name)
except BaseException as e:
self.debug("Problem processing certificate: " + str(e))
pass
self.debug(f"Problem processing certificate: {e}")

# Check for mismatch
if fqdn and ret['issued']:
Expand All @@ -1810,7 +1802,7 @@ def parseCert(self, rawcert: str, fqdn: str = None, expiringdays: int = 30) -> d

ret['hosts'] = certhosts

self.debug("Checking for " + fqdn + " in certificate subject")
self.debug(f"Checking for {fqdn} in certificate subject")
fqdn_tld = ".".join(fqdn.split(".")[1:]).lower()

found = False
Expand All @@ -1825,7 +1817,7 @@ def parseCert(self, rawcert: str, fqdn: str = None, expiringdays: int = 30) -> d
if not found:
ret['mismatch'] = True
except BaseException as e:
self.error("Error processing certificate: " + str(e))
self.error(f"Error processing certificate: {e}")
ret['certerror'] = True

return ret
Expand All @@ -1843,10 +1835,11 @@ def extractUrls(self, content: str) -> list:
# https://tools.ietf.org/html/rfc3986#section-3.3
return re.findall(r"(https?://[a-zA-Z0-9-\.:]+/[\-\._~!\$&'\(\)\*\+\,\;=:@/a-zA-Z0-9]*)", html.unescape(content))

def parseLinks(self, url: str, data: str, domains: list) -> list:
def parseLinks(self, url: str, data: str, domains: list) -> dict:
"""Find all URLs within the supplied content.
This does not fetch any URLs!
This function does not fetch any URLs.
A dictionary will be returned, where each link will have the keys
'source': The URL where the link was obtained from
'original': What the link looked like in the content it was obtained from
Expand All @@ -1860,12 +1853,12 @@ def parseLinks(self, url: str, data: str, domains: list) -> list:
domains: TBD
Returns:
list: links
dict: links
"""
returnLinks = dict()

if not isinstance(data, str):
self.debug("parseLinks() data is %s; expected str()" % type(data))
self.debug(f"parseLinks() data is {type(data)}; expected str()")
return returnLinks

if not data:
Expand Down Expand Up @@ -1900,7 +1893,7 @@ def parseLinks(self, url: str, data: str, domains: list) -> list:
if lnk.has_attr(tags[t]):
urlsRel.append(lnk[tags[t]])
except BaseException as e:
self.error("Error parsing with BeautifulSoup: " + str(e))
self.error(f"Error parsing with BeautifulSoup: {e}")
return returnLinks

# Loop through all the URLs/links found
Expand Down Expand Up @@ -2272,15 +2265,15 @@ def fetchUrl(

try:
result['headers'] = dict()
result['realurl'] = res.url
result['code'] = str(res.status_code)

for header, value in res.headers.items():
result['headers'][str(header).lower()] = str(value)

# Sometimes content exceeds the size limit after decompression
if sizeLimit and len(res.content) > sizeLimit:
self.debug(f"Content exceeded size limit ({sizeLimit}), so returning no data just headers")
result['realurl'] = res.url
result['code'] = str(res.status_code)
return result

refresh_header = result['headers'].get('refresh')
Expand All @@ -2307,8 +2300,6 @@ def fetchUrl(
headOnly
)

result['realurl'] = res.url
result['code'] = str(res.status_code)
if disableContentEncoding:
result['content'] = res.content
else:
Expand Down Expand Up @@ -2344,10 +2335,10 @@ def fetchUrl(
return result

def checkDnsWildcard(self, target: str) -> bool:
"""Check if wildcard DNS is enabled by looking up a random subdomain.
"""Check if wildcard DNS is enabled for a domain by looking up a random subdomain.
Args:
target (str): TBD
target (str): domain
Returns:
bool: Domain returns DNS records for any subdomains
Expand Down Expand Up @@ -2460,7 +2451,7 @@ def googleIterate(self, searchString: str, opts: dict = None) -> dict:
timeout: API call timeout
Args:
searchString (str) :TBD
searchString (str): Google search query
opts (dict): TBD
Returns:
Expand Down Expand Up @@ -2506,7 +2497,7 @@ def googleIterate(self, searchString: str, opts: dict = None) -> dict:
"webSearchUrl": f"https://www.google.com/search?q={search_string}&{params}"
}

def bingIterate(self, searchString: str, opts: dict = {}) -> dict:
def bingIterate(self, searchString: str, opts: dict = None) -> dict:
"""Request search results from the Bing API.
Will return a dict:
Expand All @@ -2521,12 +2512,14 @@ def bingIterate(self, searchString: str, opts: dict = {}) -> dict:
timeout: API call timeout
Args:
searchString (str): TBD
searchString (str): Bing search query
opts (dict): TBD
Returns:
dict: Search results as {"webSearchUrl": "URL", "urls": [results]}
"""
if opts is None:
opts = {}

search_string = searchString.replace(" ", "%20")
params = urllib.parse.urlencode({
Expand Down
16 changes: 4 additions & 12 deletions test/unit/test_spiderfoot.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,23 +722,15 @@ def test_normalize_dns_should_return_list(self):
dns = sf.normalizeDNS(invalid_type)
self.assertIsInstance(dns, list)

def test_dictwords_should_return_a_list(self):
"""
Test dictwords(self)
"""
def test_dictwords_should_return_a_set(self):
sf = SpiderFoot(dict())

dict_words = sf.dictwords()
self.assertIsInstance(dict_words, list)
self.assertIsInstance(dict_words, set)

def test_dictnames_should_return_a_list(self):
"""
Test dictnames(self)
"""
def test_dictnames_should_return_a_set(self):
sf = SpiderFoot(dict())

dict_names = sf.dictnames()
self.assertIsInstance(dict_names, list)
self.assertIsInstance(dict_names, set)

def test_resolve_host_should_return_list(self):
"""
Expand Down

0 comments on commit 8f70de2

Please sign in to comment.