Skip to content

Commit

Permalink
Social links module - mtg-bi.com (smicallef#1142)
Browse files Browse the repository at this point in the history
* Social Links module
  • Loading branch information
krishnasism authored Mar 22, 2021
1 parent f108f73 commit 0762311
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 0 deletions.
263 changes: 263 additions & 0 deletions modules/sfp_sociallinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name: sfp_sociallinks
# Purpose: Spiderfoot plugin to query mtg-bi.com to gather intelligence from
# social media platforms and dark web.
#
# Author: Krishnasis Mandal <krishnasis@hotmail.com>
#
# Created: 20/02/2021
# Copyright: (c) Steve Micallef
# Licence: GPL
# -------------------------------------------------------------------------------

import json

from spiderfoot import SpiderFootEvent, SpiderFootPlugin


class sfp_sociallinks(SpiderFootPlugin):

meta = {
'name': "Social Links",
'summary': "Queries mtg-bi.com to gather intelligence from social media platforms and dark web",
'flags': ["apikey"],
'useCases': ["Footprint", "Investigate", "Passive"],
'categories': ["Real World"],
'dataSource': {
'website': "https://mtg-bi.com/",
'model': "COMMERCIAL_ONLY",
'references': [
"https://docs.osint.rest/"
],
'favIcon': "view-source:https://static.tildacdn.com/tild6563-6633-4533-b362-663333656461/favicon.ico",
'logo': "https://static.tildacdn.com/tild3935-6136-4330-b561-643034663032/LogoSL.svg",
'description': "Social Links provides instruments for OSINT methods "
"that are used by the world's leading investigation and law enforcement agencies",
}
}

# Default options
opts = {
'api_key': '',
}

# Option descriptions
optdescs = {
'api_key': "mtg-bi.com API Key",
}

results = None

def setup(self, sfc, userOpts=dict()):
self.sf = sfc
self.results = self.tempStorage()

for opt in list(userOpts.keys()):
self.opts[opt] = userOpts[opt]

# What events is this module interested in for input
def watchedEvents(self):
return [
"USERNAME",
"EMAILADDR",
"PHONE_NUMBER"
]

# What events this module produces
# This is to support the end user in selecting modules based on events
# produced.
def producedEvents(self):
return [
"GEOINFO",
"SOCIAL_MEDIA",
"HUMAN_NAME",
"JOB_TITLE",
"COMPANY_NAME",
"PHONE_NUMBER",
"ACCOUNT_EXTERNAL_OWNED",
"RAW_RIR_DATA"
]

def query(self, queryString):
headers = {
'Accept': "application/json",
'Authorization': self.opts['api_key']
}

res = self.sf.fetchUrl(
queryString,
headers=headers,
timeout=15,
useragent=self.opts['_useragent']
)

if res['code'] == '429':
self.sf.error("You are being rate-limited by seon.io")
return None

if res['code'] != "200":
self.sf.error("Error retrieving search results from seon.io")
return None

if res['code'] == '404':
self.sf.error("API Endpoint not found")
return None

if res['content'] is None:
return None
return json.loads(res['content'])

def queryTelegram(self, qry, eventName):
if eventName == "PHONE_NUMBER":
queryString = f"https://osint.rest/api/telegram/user_by_phone?query={qry}"
elif eventName == "USERNAME":
queryString = f"https://osint.rest/api/telegram/user_by_alias?query={qry}"

return self.query(queryString)

def queryFlickr(self, qry):
queryString = f"https://osint.rest/api/flickr/email?email={qry}"

return self.query(queryString)

def querySkype(self, qry):
queryString = f"https://osint.rest/api/skype/search/v2?query={qry}"

return self.query(queryString)

def queryLinkedin(self, qry):
queryString = f"https://osint.rest/api/linkedin/lookup_by_email/v2?query={qry}"

return self.query(queryString)

# Handle events sent to this module
def handleEvent(self, event):
eventName = event.eventType
srcModuleName = event.module
eventData = event.data

self.sf.debug(f"Received event, {eventName}, from {srcModuleName}")

if self.errorState:
return None

if self.opts['api_key'] == "":
self.sf.error("You enabled sfp_sociallinks but did not set an API key!")
self.errorState = True
return

# Don't look up stuff twice
if eventData in self.results:
self.sf.debug(f"Skipping {eventData}, already checked.")
return None
else:
self.results[eventData] = True

if eventName == "PHONE_NUMBER":
data = self.queryTelegram(eventData, eventName)
if data is None:
return None

resultSet = data.get('result')
if resultSet:
if resultSet.get('first_name') and resultSet.get('last_name'):
evt = SpiderFootEvent("HUMAN_NAME", f"{resultSet.get('first_name')} {resultSet.get('last_name')}", self.__name__, event)
self.notifyListeners(evt)
if resultSet.get('username'):
evt = SpiderFootEvent("USERNAME", resultSet.get('username'), self.__name__, event)
self.notifyListeners(evt)

evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

elif eventName == "USERNAME":
data = self.queryTelegram(eventData, eventName)
if data is None:
return None

resultSet = data.get('result')
if resultSet:
if resultSet.get('first_name') and resultSet.get('last_name'):
evt = SpiderFootEvent("HUMAN_NAME", f"{resultSet.get('first_name')} {resultSet.get('last_name')}", self.__name__, event)
self.notifyListeners(evt)
if resultSet.get('phone_number'):
evt = SpiderFootEvent("PHONE_NUMBER", resultSet.get('phone_number'), self.__name__, event)
self.notifyListeners(evt)

evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

elif eventName == "EMAILADDR":
failedModules = 0
data = self.queryFlickr(eventData)
humanNames = set()
geoInfos = set()
if data is None:
failedModules += 1
else:
resultSet = data[0].get('entities')[0].get('data')
if resultSet:
if resultSet.get('realname').get('_content'):
humanNames.add(resultSet.get('realname').get('_content'))
if resultSet.get('location').get('_content'):
geoInfos.add(resultSet.get('location').get('_content'))
if resultSet.get('profileurl').get('_content'):
evt = SpiderFootEvent("SOCIAL_MEDIA", f"Flickr: <SFURL>{resultSet.get('profileurl').get('_content')}</SFURL>", self.__name__, event)
self.notifyListeners(evt)

evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

data = self.querySkype(eventData)
if data is None:
failedModules += 1
else:
resultSet = data.get('result')
if resultSet:
resultSet = data.get('result')[0].get('nodeProfileData')
if resultSet.get('name'):
humanNames.add(resultSet.get('name'))
if resultSet.get('skypeId'):
evt = SpiderFootEvent("ACCOUNT_EXTERNAL_OWNED", f"Skype [{resultSet.get('skypeId')}]", self.__name__, event)
self.notifyListeners(evt)
evt = SpiderFootEvent("USERNAME", resultSet.get('skypeId'), self.__name__, event)
self.notifyListeners(evt)

evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

data = self.queryLinkedin(eventData)
if data is None:
failedModules += 1
else:
resultSet = data.get('result')
if resultSet:
resultSet = data.get('result')[0]
if resultSet.get('displayName'):
humanNames.add(resultSet.get('displayName'))
if resultSet.get('location'):
geoInfos.add(resultSet.get('location'))
if resultSet.get('companyName'):
evt = SpiderFootEvent("COMPANY_NAME", resultSet.get('companyName'), self.__name__, event)
self.notifyListeners(evt)
if resultSet.get('headline'):
evt = SpiderFootEvent("JOB_TITLE", resultSet.get('headline'), self.__name__, event)
self.notifyListeners(evt)

evt = SpiderFootEvent('RAW_RIR_DATA', str(resultSet), self.__name__, event)
self.notifyListeners(evt)

for humanName in humanNames:
evt = SpiderFootEvent("HUMAN_NAME", humanName, self.__name__, event)
self.notifyListeners(evt)

for geoInfo in geoInfos:
evt = SpiderFootEvent("GEOINFO", geoInfo, self.__name__, event)
self.notifyListeners(evt)

if failedModules == 3:
self.sf.info(f"No data found for {eventData}")
return None

# End of sfp_sociallinks class
1 change: 1 addition & 0 deletions spiderfoot/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class SpiderFootDb:
['INTERNET_NAME_UNRESOLVED', 'Internet Name - Unresolved', 0, 'ENTITY'],
['IP_ADDRESS', 'IP Address', 0, 'ENTITY'],
['IPV6_ADDRESS', 'IPv6 Address', 0, 'ENTITY'],
['JOB_TITLE', 'Job Title', 0, 'DESCRIPTOR'],
['LINKED_URL_INTERNAL', 'Linked URL - Internal', 0, 'SUBENTITY'],
['LINKED_URL_EXTERNAL', 'Linked URL - External', 0, 'SUBENTITY'],
['MALICIOUS_ASN', 'Malicious AS', 0, 'DESCRIPTOR'],
Expand Down
80 changes: 80 additions & 0 deletions test/unit/modules/test_sfp_sociallinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# test_sfp_sociallinks.py
import unittest

from modules.sfp_sociallinks import sfp_sociallinks
from sflib import SpiderFoot
from spiderfoot import SpiderFootEvent, SpiderFootTarget


class TestModulesociallinks(unittest.TestCase):
"""
Test modules.sfp_sociallinks
"""

default_options = {
'_debug': False, # Debug
'__logging': True, # Logging in general
'__outputfilter': None, # Event types to filter from modules' output
'_useragent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0', # User-Agent to use for HTTP requests
'_dnsserver': '', # Override the default resolver
'_fetchtimeout': 5, # number of seconds before giving up on a fetch
'_internettlds': 'https://publicsuffix.org/list/effective_tld_names.dat',
'_internettlds_cache': 72,
'_genericusers': "abuse,admin,billing,compliance,devnull,dns,ftp,hostmaster,inoc,ispfeedback,ispsupport,list-request,list,maildaemon,marketing,noc,no-reply,noreply,null,peering,peering-notify,peering-request,phish,phishing,postmaster,privacy,registrar,registry,root,routing-registry,rr,sales,security,spam,support,sysadmin,tech,undisclosed-recipients,unsubscribe,usenet,uucp,webmaster,www",
'__version__': '3.3-DEV',
'__database': 'spiderfoot.test.db', # note: test database file
'__modules__': None, # List of modules. Will be set after start-up.
'_socks1type': '',
'_socks2addr': '',
'_socks3port': '',
'_socks4user': '',
'_socks5pwd': '',
'_torctlport': 9051,
'__logstdout': False
}

def test_opts(self):
module = sfp_sociallinks()
self.assertEqual(len(module.opts), len(module.optdescs))

def test_setup(self):
"""
Test setup(self, sfc, userOpts=dict())
"""
sf = SpiderFoot(self.default_options)

module = sfp_sociallinks()
module.setup(sf, dict())

def test_watchedEvents_should_return_list(self):
module = sfp_sociallinks()
self.assertIsInstance(module.watchedEvents(), list)

def test_producedEvents_should_return_list(self):
module = sfp_sociallinks()
self.assertIsInstance(module.producedEvents(), list)

@unittest.skip("todo")
def test_handleEvent(self):
"""
Test handleEvent(self, event)
"""
sf = SpiderFoot(self.default_options)

module = sfp_sociallinks()
module.setup(sf, dict())

target_value = 'example target value'
target_type = 'EMAILADDR'
target = SpiderFootTarget(target_value, target_type)
module.setTarget(target)

event_type = 'ROOT'
event_data = 'example data'
event_module = ''
source_event = ''
evt = SpiderFootEvent(event_type, event_data, event_module, source_event)

result = module.handleEvent(evt)

self.assertIsNone(result)

0 comments on commit 0762311

Please sign in to comment.