Skip to content

Commit

Permalink
sfp_gleif: Add sfp_gleif (smicallef#1194)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcoles committed Jun 24, 2021
1 parent f59fa8e commit 4161d72
Show file tree
Hide file tree
Showing 2 changed files with 370 additions and 0 deletions.
310 changes: 310 additions & 0 deletions modules/sfp_gleif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name: sfp_gleif
# Purpose: Search the Global LEI Index for company information using
# Global Legal Entity Identifier Foundation (GLEIF) search API.
#
# Author: <bcoles@gmail.com>
#
# Created: 2021-06-21
# Copyright: (c) bcoles 2021
# Licence: GPL
# -------------------------------------------------------------------------------

import urllib
import json

from spiderfoot import SpiderFootEvent, SpiderFootPlugin, SpiderFootHelpers


class sfp_gleif(SpiderFootPlugin):

meta = {
'name': "GLEIF",
'summary': "Look up company information from Global Legal Entity Identifier Foundation (GLEIF).",
'flags': [""],
'useCases': ["Passive", "Footprint", "Investigate"],
'categories': ["Search Engines"],
'dataSource': {
'website': "https://search.gleif.org/",
'model': "FREE_NOAUTH_LIMITED",
'references': [
"https://www.gleif.org/en/lei-data/gleif-api",
"https://api.gleif.org/docs",
],
'favIcon': "https://www.gleif.org/favicon.ico",
'logo': "https://search.gleif.org/static/img/gleif-logo.svg",
'description': "The Global Legal Entity Identifier Foundation (GLEIF) Global LEI Index contains "
"historical and current LEI records including related reference data in one authoritative, central repository."
}
}

opts = {
}

optdescs = {
}

results = None

def setup(self, sfc, userOpts=dict()):
self.sf = sfc
self.results = self.tempStorage()

for opt in list(userOpts.keys()):
self.opts[opt] = userOpts[opt]

def watchedEvents(self):
return ["COMPANY_NAME", "LEI"]

def producedEvents(self):
return ["COMPANY_NAME", "LEI", "PHYSICAL_ADDRESS", "RAW_RIR_DATA"]

def searchLegalName(self, qry):
"""Fuzzy search for legal entity by name
Args:
qry (str): legal entity name
Returns:
dict: search results
"""

params = urllib.parse.urlencode({
'q': qry.encode('raw_unicode_escape').decode("ascii", errors='replace'),
'field': "entity.legalName"
})

headers = {
'Accept': 'application/vnd.api+json'
}

res = self.sf.fetchUrl(
f"https://api.gleif.org/api/v1/fuzzycompletions?{params}",
timeout=30,
headers=headers,
useragent=self.opts['_useragent']
)

if res['code'] == "429":
self.sf.error("You are being rate-limited by GLEIF.")
return None

try:
results = json.loads(res['content'])
except Exception as e:
self.sf.debug(f"Error processing JSON response: {e}")
return None

data = results.get('data')
if not data:
return None
if not len(data):
return None

return data

def searchAutocompletions(self, qry):
"""Search for legal entity name autocompletions
Args:
qry (str): legal entity name
Returns:
dict: search results
"""

params = urllib.parse.urlencode({
'q': qry.encode('raw_unicode_escape').decode("ascii", errors='replace'),
'field': "fulltext"
})

headers = {
'Accept': 'application/vnd.api+json'
}

res = self.sf.fetchUrl(
f"https://api.gleif.org/api/v1/autocompletions?{params}",
timeout=30,
headers=headers,
useragent=self.opts['_useragent']
)

if res['code'] == "429":
self.sf.error("You are being rate-limited by GLEIF.")
return None

try:
results = json.loads(res['content'])
except Exception as e:
self.sf.debug(f"Error processing JSON response: {e}")
return None

data = results.get('data')
if not data:
return None
if not len(data):
return None

return data

def retrieveRecord(self, lei):
headers = {
'Accept': 'application/vnd.api+json'
}

res = self.sf.fetchUrl(
f"https://api.gleif.org/api/v1/lei-records/{lei}",
timeout=self.opts['_fetchtimeout'],
headers=headers,
useragent=self.opts['_useragent']
)

if res['code'] == "404":
self.sf.error(f"No record for LEI: {lei}")
return None

if res['code'] == "429":
self.sf.error("You are being rate-limited by GLEIF.")
return None

try:
results = json.loads(res['content'])
except Exception as e:
self.sf.debug(f"Error processing JSON response: {e}")
return None

data = results.get('data')

if not len(data):
return None
if not data:
return None

return data

def handleEvent(self, event):
eventName = event.eventType
srcModuleName = event.module
eventData = event.data

if eventData in self.results:
self.sf.debug(f"Skipping {eventData}, already checked.")
return

self.results[eventData] = True

self.sf.debug(f"Received event, {eventName}, from {srcModuleName}")

leis = list()

if eventName == "LEI":
leis.append(eventData)
elif eventName == "COMPANY_NAME":
res = self.searchAutocompletions(eventData)

if res is None:
self.sf.debug(f"Found no results for {eventData}")
return

e = SpiderFootEvent("RAW_RIR_DATA", str(res), self.__name__, event)
self.notifyListeners(e)

for record in res:
relationships = record.get('relationships')
if not relationships:
continue

lei_records = relationships.get('lei-records')
if not lei_records:
continue

data = lei_records.get('data')
if not data:
continue

lei = data.get('id')
if not SpiderFootHelpers.validLEI(lei):
continue

leis.append(lei)

self.sf.info(f"Found {len(leis)} LEIs matching {eventData}")

for lei in set(leis):
if lei in self.results:
continue

if not SpiderFootHelpers.validLEI(lei):
continue

e = SpiderFootEvent("LEI", lei, self.__name__, event)
self.notifyListeners(e)

self.results[lei] = True

res = self.retrieveRecord(lei)
if not res:
self.sf.debug(f"Found no results for {eventData}")
return

attributes = res.get('attributes')
if not attributes:
continue

entity = attributes.get('entity')
if not entity:
continue

legal_name = entity.get('legalName')
if legal_name:
entity_name = legal_name.get('value')
if entity_name:
e = SpiderFootEvent("COMPANY_NAME", entity_name, self.__name__, event)
self.notifyListeners(e)

addresses = list()

address = entity.get('legalAddress')
if address.get('addressLines'):
address_lines = ', '.join(filter(None, address.get('addressLines')))
location = ', '.join(
filter(
None,
[
address_lines,
address.get('city'),
address.get('region'),
address.get('country'),
address.get('postalCode')
]
)
)

if location:
addresses.append(location)

address = entity.get('headquartersAddress')
if address.get('addressLines'):
address_lines = ', '.join(filter(None, address.get('addressLines')))
location = ', '.join(
filter(
None,
[
address_lines,
address.get('city'),
address.get('region'),
address.get('country'),
address.get('postalCode')
]
)
)

if location:
addresses.append(location)

for address in set(addresses):
e = SpiderFootEvent("PHYSICAL_ADDRESS", address, self.__name__, event)
self.notifyListeners(e)

# End of sfp_gleif class
60 changes: 60 additions & 0 deletions test/unit/modules/test_sfp_gleif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# test_sfp_gleif.py
import pytest
import unittest

from modules.sfp_gleif import sfp_gleif
from sflib import SpiderFoot
from spiderfoot import SpiderFootEvent, SpiderFootTarget


@pytest.mark.usefixtures
class TestModuleGleif(unittest.TestCase):
"""
Test modules.sfp_gleif
"""

def test_opts(self):
module = sfp_gleif()
self.assertEqual(len(module.opts), len(module.optdescs))

def test_setup(self):
"""
Test setup(self, sfc, userOpts=dict())
"""
sf = SpiderFoot(self.default_options)

module = sfp_gleif()
module.setup(sf, dict())

def test_watchedEvents_should_return_list(self):
module = sfp_gleif()
self.assertIsInstance(module.watchedEvents(), list)

def test_producedEvents_should_return_list(self):
module = sfp_gleif()
self.assertIsInstance(module.producedEvents(), list)

@unittest.skip("todo")
def test_handleEvent(self):
"""
Test handleEvent(self, event)
"""
sf = SpiderFoot(self.default_options)

module = sfp_gleif()
module.setup(sf, dict())

target_value = '7ZW8QJWVPR4P1J1KQY45'
target_type = 'LEI'
target = SpiderFootTarget(target_value, target_type)
module.setTarget(target)

event_type = 'ROOT'
event_data = 'example data'
event_module = ''
source_event = ''
evt = SpiderFootEvent(event_type, event_data, event_module, source_event)

result = module.handleEvent(evt)

self.assertIsNone(result)

0 comments on commit 4161d72

Please sign in to comment.