Skip to content

Commit

Permalink
Ipqualityscore module (smicallef#1104)
Browse files Browse the repository at this point in the history
* IP Quality Score module
  • Loading branch information
krishnasism authored Oct 9, 2020
1 parent 4179112 commit 8fda91e
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 0 deletions.
176 changes: 176 additions & 0 deletions modules/sfp_ipqualityscore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name: sfp_ipqualityscore
# Purpose: Spiderfoot module to check whether a target is malicious
# using IPQualityScore API
#
# Author: Krishnasis Mandal <krishnasis@hotmail.com>
#
# Created: 2020-10-07
# Copyright: (c) Steve Micallef
# Licence: GPL
# -------------------------------------------------------------------------------
import json

from spiderfoot import SpiderFootEvent, SpiderFootPlugin


class sfp_ipqualityscore(SpiderFootPlugin):

meta = {
"name": "IPQualityScore",
"summary": "Determine if target is malicious using IPQualityScore API",
"flags": ["apikey"],
"useCases": ["Investigate", "Passive"],
"categories": ["Reputation Systems"],
"dataSource": {
"website": "https://www.ipqualityscore.com/",
"model": "FREE_AUTH_LIMITED",
"references": [
"https://www.ipqualityscore.com/documentation/overview"
],
"apiKeyInstructions": [
"Visit https://www.ipqualityscore.com/",
"Click on 'Plans'",
"Register a free account",
"Visit https://www.ipqualityscore.com/user/settings",
"Your API key will be listed under 'API Key'"
],
"favIcon": "https://www.ipqualityscore.com/templates/img/icons/fav/favicon-32x32.png",
"logo": "https://www.ipqualityscore.com/templates/img/logo.png",
"description": "IPQualityScore's suite of fraud prevention tools automate quality control "
"to prevent bots, fake accounts, fraudsters, suspicious transactions, "
"& malicious users without interrupting the user experience.",
},
}

opts = {
"api_key": "",
"abuse_score_threshold": 85
}

optdescs = {
"api_key": "IPQualityScore API Key",
"abuse_score_threshold": "Minimum abuse score for target to be considered malicious"
}

errorState = False

def setup(self, sfc, userOpts=None):
if userOpts is None:
userOpts = {}
self.sf = sfc
self.results = self.tempStorage()
self.opts.update(userOpts)

def watchedEvents(self):
return [
"PHONE_NUMBER",
"EMAILADDR",
"IP_ADDRESS",
"DOMAIN_NAME"
]

def producedEvents(self):
return [
"MALICIOUS_PHONE_NUMBER",
"MALICIOUS_EMAILADDR",
"MALICIOUS_IPADDR",
"MALICIOUS_INTERNET_NAME",
"RAW_RIR_DATA"
]

def handle_error_response(self, qry, res):
try:
error_info = json.loads(res["content"])
except Exception:
error_info = None
if error_info:
error_message = error_info.get("message")
else:
error_message = None
if error_message:
error_str = f", message {error_message}"
else:
error_str = ""
self.sf.error(f"Failed to get results for {qry}, code {res['code']}{error_str}")

def query(self, qry, eventName):
queryString = ""
if eventName == "PHONE_NUMBER":
queryString = f"https://ipqualityscore.com/api/json/phone/{self.opts['api_key']}/{qry}"
elif eventName == "EMAILADDR":
queryString = f"https://ipqualityscore.com/api/json/email/{self.opts['api_key']}/{qry}"
elif eventName == "IP_ADDRESS" or eventName == "DOMAIN_NAME":
queryString = f"https://ipqualityscore.com/api/json/ip/{self.opts['api_key']}/{qry}"

res = self.sf.fetchUrl(
queryString,
timeout=self.opts["_fetchtimeout"],
useragent="SpiderFoot",
)

success = json.loads(res["content"]).get("success")
if res["code"] != "200" or not success:
self.handle_error_response(qry, res)
return None

if res['content'] is None:
self.sf.info(f"No IPQualityScore info found for {qry}")
return None

try:
return json.loads(res['content'])
except Exception as e:
self.sf.error(f"Error processing JSON response from IPQualityScore: {e}")

return None

def handleEvent(self, event):
eventName = event.eventType
srcModuleName = event.module
eventData = event.data

if self.errorState:
return None

self.sf.debug(f"Received event, {eventName}, from {srcModuleName}")

if self.opts["api_key"] == "":
self.sf.error(
f"You enabled {self.__class__.__name__} but did not set an API Key!"
)
self.errorState = True
return None

if eventData in self.results:
self.sf.debug(f"Skipping {eventData} as already mapped.")
return None
self.results[eventData] = True

data = self.query(eventData, eventName)

if data is None:
return

fraudScore = data.get('fraud_score')
recentAbuse = data.get('recent_abuse')
botStatus = data.get('bot_status')
if fraudScore >= self.opts['abuse_score_threshold'] or recentAbuse or botStatus:
evt = SpiderFootEvent("RAW_RIR_DATA", str(data), self.__name__, event)
self.notifyListeners(evt)

if eventName == "PHONE_NUMBER":
evt = SpiderFootEvent("MALICIOUS_PHONE_NUMBER", "IPQualityScore [ " + eventData + " ]", self.__name__, event)
self.notifyListeners(evt)
elif eventName == "EMAILADDR":
evt = SpiderFootEvent("MALICIOUS_EMAILADDR", "IPQualityScore [ " + eventData + " ]", self.__name__, event)
self.notifyListeners(evt)
elif eventName == "IP_ADDRESS":
evt = SpiderFootEvent("MALICIOUS_IPADDR", "IPQualityScore [ " + eventData + " ]", self.__name__, event)
self.notifyListeners(evt)
elif eventName == "DOMAIN_NAME":
evt = SpiderFootEvent("MALICIOUS_INTERNET_NAME", "IPQualityScore [ " + eventData + " ]", self.__name__, event)
self.notifyListeners(evt)

# End of sfp_ipqualityscore class
80 changes: 80 additions & 0 deletions test/unit/modules/test_sfp_ipqualityscore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# test_sfp_ipqualityscore.py
import unittest

from modules.sfp_ipqualityscore import sfp_ipqualityscore
from sflib import SpiderFoot
from spiderfoot import SpiderFootEvent, SpiderFootTarget


class TestModuleipqualityscore(unittest.TestCase):
"""
Test modules.sfp_ipqualityscore
"""

default_options = {
'_debug': False, # Debug
'__logging': True, # Logging in general
'__outputfilter': None, # Event types to filter from modules' output
'_useragent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0', # User-Agent to use for HTTP requests
'_dnsserver': '', # Override the default resolver
'_fetchtimeout': 5, # number of seconds before giving up on a fetch
'_internettlds': 'https://publicsuffix.org/list/effective_tld_names.dat',
'_internettlds_cache': 72,
'_genericusers': "abuse,admin,billing,compliance,devnull,dns,ftp,hostmaster,inoc,ispfeedback,ispsupport,list-request,list,maildaemon,marketing,noc,no-reply,noreply,null,peering,peering-notify,peering-request,phish,phishing,postmaster,privacy,registrar,registry,root,routing-registry,rr,sales,security,spam,support,sysadmin,tech,undisclosed-recipients,unsubscribe,usenet,uucp,webmaster,www",
'__version__': '3.3-DEV',
'__database': 'spiderfoot.test.db', # note: test database file
'__modules__': None, # List of modules. Will be set after start-up.
'_socks1type': '',
'_socks2addr': '',
'_socks3port': '',
'_socks4user': '',
'_socks5pwd': '',
'_torctlport': 9051,
'__logstdout': False
}

def test_opts(self):
module = sfp_ipqualityscore()
self.assertEqual(len(module.opts), len(module.optdescs))

def test_setup(self):
"""
Test setup(self, sfc, userOpts=dict())
"""
sf = SpiderFoot(self.default_options)

module = sfp_ipqualityscore()
module.setup(sf, dict())

def test_watchedEvents_should_return_list(self):
module = sfp_ipqualityscore()
self.assertIsInstance(module.watchedEvents(), list)

def test_producedEvents_should_return_list(self):
module = sfp_ipqualityscore()
self.assertIsInstance(module.producedEvents(), list)

def test_handleEvent_no_api_key_should_set_errorState(self):
"""
Test handleEvent(self, event)
"""
sf = SpiderFoot(self.default_options)

module = sfp_ipqualityscore()
module.setup(sf, dict())

target_value = 'example target value'
target_type = 'PHONE_NUMBER'
target = SpiderFootTarget(target_value, target_type)
module.setTarget(target)

event_type = 'ROOT'
event_data = 'example data'
event_module = ''
source_event = ''
evt = SpiderFootEvent(event_type, event_data, event_module, source_event)

result = module.handleEvent(evt)

self.assertIsNone(result)
self.assertTrue(module.errorState)

0 comments on commit 8fda91e

Please sign in to comment.