-
Notifications
You must be signed in to change notification settings - Fork 1
/
burp_pii_scan.py
113 lines (90 loc) · 3.69 KB
/
burp_pii_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-
import re
from burp import IBurpExtender, IHttpListener, IScanIssue
def validate_cpf(cpf):
# check if all digits are the same (11111111111)
if cpf == cpf[0] * 11:
return False
# calculate the first check digit
sum_ = sum(int(cpf[i]) * (10 - i) for i in range(9))
digit1 = 11 - (sum_ % 11)
digit1 = 0 if digit1 >= 10 else digit1
# calculate the second check digit
sum_ = sum(int(cpf[i]) * (11 - i) for i in range(10))
digit2 = 11 - (sum_ % 11)
digit2 = 0 if digit2 >= 10 else digit2
# check if the calculated digits match the provided digits
return cpf[-2:] == "{}{}".format(digit1, digit2)
class BurpExtender(IBurpExtender, IHttpListener):
def registerExtenderCallbacks(self, callbacks):
# initial configs
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
callbacks.setExtensionName("PII Scanner")
callbacks.registerHttpListener(self)
print("PII Scanner, Installation OK!!!")
def processHttpMessage(self, toolFlag, messageIsRequest, messageInfo):
# process http response
if not messageIsRequest:
response_info = self._helpers.analyzeResponse(messageInfo.getResponse())
# extract body response
body_offset = response_info.getBodyOffset()
body_bytes = messageInfo.getResponse()[body_offset:]
body = self._helpers.bytesToString(body_bytes)
# looking for cpf and validate
cpf_pattern = re.compile(r'\b\d{11}\b') # extract 11 numbers together
possible_cpf = cpf_pattern.findall(body)
possible_cpf = list(set(possible_cpf)) # remove duplicated
cpf_ok = [cpf for cpf in possible_cpf if validate_cpf(cpf)]
if cpf_ok: # if found cpf, create issue
print("CPF: %s" % cpf_ok[0])
http_service = messageInfo.getHttpService()
url = self._helpers.analyzeRequest(messageInfo).getUrl()
issue_name = "PII data detect"
issue_detail = "Was found a PII data - CPF: %s " % cpf_ok[0]
severity = "High"
confidence = "Certain"
remediation = "Mask the first 6 number and show just the last 5 numbers."
issue = CustomScanIssue(
http_service,
url,
[messageInfo],
issue_name,
issue_detail,
severity,
confidence,
remediation
)
self._callbacks.addScanIssue(issue)
class CustomScanIssue(IScanIssue):
def __init__(self, http_service, url, http_messages, name, detail, severity, confidence, remediation):
self._http_service = http_service
self._url = url
self._http_messages = http_messages
self._name = name
self._detail = detail
self._severity = severity
self._confidence = confidence
self._remediation = remediation
def getUrl(self):
return self._url
def getIssueName(self):
return self._name
def getIssueType(self):
return 0
def getIssueBackground(self):
return None
def getRemediationBackground(self):
return None
def getSeverity(self):
return self._severity
def getConfidence(self):
return self._confidence
def getIssueDetail(self):
return self._detail
def getRemediationDetail(self):
return self._remediation
def getHttpMessages(self):
return self._http_messages
def getHttpService(self):
return self._http_service