Skip to content

Commit

Permalink
Seon.io module (smicallef#1141)
Browse files Browse the repository at this point in the history
* SEON.io module
  • Loading branch information
krishnasism committed Mar 27, 2021
1 parent b14d1d5 commit d2a4935
Show file tree
Hide file tree
Showing 3 changed files with 378 additions and 0 deletions.
297 changes: 297 additions & 0 deletions modules/sfp_seon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name: sfp_seon
# Purpose: Spiderfoot plugin to query seon.io to gather intelligence about
# IP Addresses, email addresses, and phone numbers.
#
# Author: Krishnasis Mandal <krishnasis@hotmail.com>
#
# Created: 08/02/2021
# Copyright: (c) Steve Micallef
# Licence: GPL
# -------------------------------------------------------------------------------

import json

from spiderfoot import SpiderFootEvent, SpiderFootPlugin


class sfp_seon(SpiderFootPlugin):

meta = {
'name': "Seon",
'summary': "Queries seon.io to gather intelligence about IP Addresses, email addresses, and phone numbers",
'flags': ["apikey"],
'useCases': ["Footprint", "Investigate", "Passive"],
'categories': ["Real World"],
'dataSource': {
'website': "https://seon.io/",
'model': "COMMERCIAL_ONLY",
'references': [
"https://developers.seon.io/"
],
'favIcon': "https://seon.io/assets/favicons/favicon-16x16.png",
'logo': "https://seon.io/assets/favicons/apple-touch-icon-152.png",
'description': "SEON Fraud Prevention tools help organisations reduce "
"the costs and resources lost to fraud. Spot fake accounts, slash manual reviews and cut chargebacks now.",
}
}

# Default options
opts = {
'api_key': '',
'fraud_threshold': 80,
}

# Option descriptions
optdescs = {
'api_key': "seon.io API Key",
'fraud_threshold': 'Minimum fraud score for target to be marked as malicious (0-100)',
}

results = None

def setup(self, sfc, userOpts=dict()):
self.sf = sfc
self.results = self.tempStorage()

for opt in list(userOpts.keys()):
self.opts[opt] = userOpts[opt]

# What events is this module interested in for input
def watchedEvents(self):
return [
"IP_ADDRESS",
"IPV6_ADDRESS",
"EMAILADDR",
"PHONE_NUMBER"
]

# What events this module produces
# This is to support the end user in selecting modules based on events
# produced.
def producedEvents(self):
return [
"GEOINFO",
"MALICIOUS_IPADDR",
"TCP_PORT_OPEN",
"MALICIOUS_EMAILADDR",
"EMAILADDR_DELIVERABLE",
"EMAILADDR_UNDELIVERABLE",
"SOCIAL_MEDIA",
"HUMAN_NAME",
"COMPANY_NAME",
"EMAILADDR_COMPROMISED",
"MALICIOUS_PHONE_NUMBER",
"PROVIDER_TELCO",
"PHONE_NUMBER_TYPE",
"WEBSERVER_TECHNOLOGY",
"RAW_RIR_DATA"
]

def query(self, qry, eventName):
if eventName == "IP_ADDRESS" or eventName == "IPV6_ADDRESS":
queryString = f"https://api.seon.io/SeonRestService/ip-api/v1.0/{qry}"
elif eventName == "EMAILADDR":
queryString = f"https://api.seon.io/SeonRestService/email-api/v2.0/{qry}"
elif eventName == "PHONE_NUMBER":
queryString = f"https://api.seon.io/SeonRestService/phone-api/v1.0/{qry}"

headers = {
'Accept': "application/json",
'X-API-KEY': self.opts['api_key']
}

res = self.sf.fetchUrl(
queryString,
headers=headers,
timeout=15,
useragent=self.opts['_useragent']
)

if res['code'] == '429':
self.sf.error("You are being rate-limited by seon.io")
return None

if res['code'] != "200":
self.sf.error("Error retrieving search results from seon.io")
return None

if res['code'] == '404':
self.sf.error("API Endpoint not found")
return None

return json.loads(res['content'])

# Handle events sent to this module
def handleEvent(self, event):
eventName = event.eventType
srcModuleName = event.module
eventData = event.data

self.sf.debug(f"Received event, {eventName}, from {srcModuleName}")

if self.errorState:
return None

if self.opts['api_key'] == "":
self.sf.error("You enabled sfp_seon but did not set an API key!")
self.errorState = True
return

# Don't look up stuff twice
if eventData in self.results:
self.sf.debug(f"Skipping {eventData}, already checked.")
return None
else:
self.results[eventData] = True

dataFound = False
if eventName == "IP_ADDRESS" or eventName == "IPV6_ADDRESS":
data = self.query(eventData, eventName)
if data is None:
return None

resultSet = data.get('data')
if resultSet:
if resultSet.get('score', 0) >= self.opts['fraud_threshold']:
maliciousDesc = f"SEON [{eventData}]\n - FRAUD SCORE: {resultSet.get('score')}"
evt = SpiderFootEvent("MALICIOUS_IPADDR", maliciousDesc, self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('tor'):
evt = SpiderFootEvent("WEBSERVER_TECHNOLOGY", f"Server is TOR node: {resultSet.get('tor')}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('vpn'):
evt = SpiderFootEvent("WEBSERVER_TECHNOLOGY", f"Server is VPN: {resultSet.get('vpn')}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('web_proxy'):
evt = SpiderFootEvent("WEBSERVER_TECHNOLOGY", f"Server is Web Proxy: {resultSet.get('web_proxy')}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('public_proxy'):
evt = SpiderFootEvent("WEBSERVER_TECHNOLOGY", f"Server is Public Proxy: {resultSet.get('public_proxy')}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('country'):
location = ', '.join(filter(None, [resultSet.get('city'), resultSet.get('state_prov'), resultSet.get('country')]))
evt = SpiderFootEvent('GEOINFO', location, self.__name__, event)
self.notifyListeners(evt)

evt = SpiderFootEvent('PHYSICAL_COORDINATES', f"{resultSet.get('latitude')}, {resultSet.get('longitude')}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True
if resultSet.get('open_ports'):
for port in resultSet.get('open_ports'):
evt = SpiderFootEvent('TCP_PORT_OPEN', f"{eventData}:{port}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True
if dataFound:
evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

elif eventName == "EMAILADDR":
data = self.query(eventData, eventName)
if data is None:
return None

resultSet = data.get('data')
if resultSet:
if resultSet.get('score') >= self.opts['fraud_threshold']:
maliciousDesc = f"SEON [{eventData}]\n - FRAUD SCORE: {resultSet.get('score')}"
evt = SpiderFootEvent("MALICIOUS_EMAILADDR", maliciousDesc, self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('deliverable'):
evt = SpiderFootEvent("EMAILADDR_DELIVERABLE", eventData, self.__name__, event)
self.notifyListeners(evt)
dataFound = True
else:
evt = SpiderFootEvent("EMAILADDR_UNDELIVERABLE", eventData, self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('domain_details'):
if resultSet.get('domain_details').get('disposable'):
evt = SpiderFootEvent("EMAILADDR_DISPOSABLE", eventData, self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('account_details'):
socialMediaList = resultSet.get('account_details').keys()
for site in socialMediaList:
if resultSet.get('account_details').get(site):
if resultSet.get('account_details').get(site).get('url'):
evt = SpiderFootEvent("SOCIAL_MEDIA", f"{site}: <SFURL>{resultSet.get('account_details').get(site).get('url')}</SFURL>", self.__name__, event)
self.notifyListeners(evt)
elif resultSet.get('account_details').get(site).get('registered'):
evt = SpiderFootEvent("SOCIAL_MEDIA", f"Registered on {site}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if site == 'linkedin':
if resultSet.get('account_details').get(site).get('company'):
evt = SpiderFootEvent("COMPANY_NAME", resultSet.get('account_details').get(site).get('company'), self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('account_details').get(site).get('name'):
evt = SpiderFootEvent("HUMAN_NAME", resultSet.get('account_details').get(site).get('name'), self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('breach_details').get('breaches'):
breachList = resultSet.get('breach_details').get('breaches')
for breachSet in breachList:
evt = SpiderFootEvent("EMAILADDR_COMPROMISED", f"{eventData} [{breachSet.get('name')}]", self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if dataFound:
evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

elif eventName == "PHONE_NUMBER":
data = self.query(eventData, eventName)
if data is None:
return None

resultSet = data.get('data')
if resultSet:
if resultSet.get('score') >= self.opts['fraud_threshold']:
maliciousDesc = f"SEON [{eventData}]\n - FRAUD SCORE: {resultSet.get('score')}"
evt = SpiderFootEvent("MALICIOUS_PHONE_NUMBER", maliciousDesc, self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('account_details'):
socialMediaList = resultSet.get('account_details').keys()
for site in socialMediaList:
if resultSet.get('account_details').get(site).get('registered'):
evt = SpiderFootEvent("SOCIAL_MEDIA", f"Registered on {site}", self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('type'):
evt = SpiderFootEvent("PHONE_NUMBER_TYPE", resultSet.get('type'), self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if resultSet.get('carrier'):
evt = SpiderFootEvent("PROVIDER_TELCO", resultSet.get('carrier'), self.__name__, event)
self.notifyListeners(evt)
dataFound = True

if dataFound:
evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

# End of sfp_seon class
1 change: 1 addition & 0 deletions spiderfoot/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class SpiderFootDb:
['EMAILADDR_COMPROMISED', 'Hacked Email Address', 0, 'DESCRIPTOR'],
['EMAILADDR_DISPOSABLE', 'Disposable Email Address', 0, 'DESCRIPTOR'],
['EMAILADDR_GENERIC', 'Email Address - Generic', 0, 'ENTITY'],
['EMAILADDR_UNDELIVERABLE', 'Undeliverable Email Address', 0, 'DESCRIPTOR'],
['ERROR_MESSAGE', 'Error Message', 0, 'DATA'],
['ETHEREUM_ADDRESS', 'Ethereum Address', 0, 'ENTITY'],
['ETHEREUM_BALANCE', 'Ethereum Balance', 0, 'DESCRIPTOR'],
Expand Down
80 changes: 80 additions & 0 deletions test/unit/modules/test_sfp_seon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# test_sfp_seon.py
import unittest

from modules.sfp_seon import sfp_seon
from sflib import SpiderFoot
from spiderfoot import SpiderFootEvent, SpiderFootTarget


class TestModuleseon(unittest.TestCase):
"""
Test modules.sfp_seon
"""

default_options = {
'_debug': False, # Debug
'__logging': True, # Logging in general
'__outputfilter': None, # Event types to filter from modules' output
'_useragent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0', # User-Agent to use for HTTP requests
'_dnsserver': '', # Override the default resolver
'_fetchtimeout': 5, # number of seconds before giving up on a fetch
'_internettlds': 'https://publicsuffix.org/list/effective_tld_names.dat',
'_internettlds_cache': 72,
'_genericusers': "abuse,admin,billing,compliance,devnull,dns,ftp,hostmaster,inoc,ispfeedback,ispsupport,list-request,list,maildaemon,marketing,noc,no-reply,noreply,null,peering,peering-notify,peering-request,phish,phishing,postmaster,privacy,registrar,registry,root,routing-registry,rr,sales,security,spam,support,sysadmin,tech,undisclosed-recipients,unsubscribe,usenet,uucp,webmaster,www",
'__version__': '3.3-DEV',
'__database': 'spiderfoot.test.db', # note: test database file
'__modules__': None, # List of modules. Will be set after start-up.
'_socks1type': '',
'_socks2addr': '',
'_socks3port': '',
'_socks4user': '',
'_socks5pwd': '',
'_torctlport': 9051,
'__logstdout': False
}

def test_opts(self):
module = sfp_seon()
self.assertEqual(len(module.opts), len(module.optdescs))

def test_setup(self):
"""
Test setup(self, sfc, userOpts=dict())
"""
sf = SpiderFoot(self.default_options)

module = sfp_seon()
module.setup(sf, dict())

def test_watchedEvents_should_return_list(self):
module = sfp_seon()
self.assertIsInstance(module.watchedEvents(), list)

def test_producedEvents_should_return_list(self):
module = sfp_seon()
self.assertIsInstance(module.producedEvents(), list)

def test_handleEvent_no_api_key_should_set_errorState(self):
"""
Test handleEvent(self, event)
"""
sf = SpiderFoot(self.default_options)

module = sfp_seon()
module.setup(sf, dict())

target_value = 'example target value'
target_type = 'IP_ADDRESS'
target = SpiderFootTarget(target_value, target_type)
module.setTarget(target)

event_type = 'ROOT'
event_data = 'example data'
event_module = ''
source_event = ''
evt = SpiderFootEvent(event_type, event_data, event_module, source_event)

result = module.handleEvent(evt)

self.assertIsNone(result)
self.assertTrue(module.errorState)

0 comments on commit d2a4935

Please sign in to comment.