-
Notifications
You must be signed in to change notification settings - Fork 0
/
redshift.py
70 lines (55 loc) · 3.11 KB
/
redshift.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import boto3
boto3 = boto3.session.Session(profile_name="default")
redshift_client = boto3.client('redshift')
count = 0
def redshift_checks():
global count
with open('Vulnerability_Check_Results.txt', 'a') as text_file:
text_file.write('Redshift Vulnerability Check Results:\n\n')
redshift_clusters = redshift_client.describe_clusters()
for cluster in redshift_clusters['Clusters']:
# Check for publicly accessible clusters
if cluster['PubliclyAccessible']:
count += 1
text_file.write(f'ClusterIdentifier: {cluster["ClusterIdentifier"]}\n')
text_file.write(
f'{"[Vulnerability] Publicly Accessible: True\n" if cluster['PubliclyAccessible'] else "Publicly Accessible: False\n"}')
# Check for unencrypted snapshots
snapshots = redshift_client.describe_cluster_snapshots(ClusterIdentifier=cluster['ClusterIdentifier'])
unencrypted_snapshots = [snapshot['SnapshotIdentifier'] for snapshot in snapshots['Snapshots'] if
not snapshot['Encrypted']]
if unencrypted_snapshots:
text_file.write(f'[Vulnerability] UnencryptedSnapshots: {", ".join(unencrypted_snapshots)}\n')
count += 1
# Check for encryption of data in transit
if cluster['Encrypted'] and cluster['HsmStatus'] == 'active':
text_file.write(f'DataInTransitEncryption: Enabled\n')
else:
text_file.write(f'[Vulnerability] DataInTransitEncryption: Disabled\n')
count += 1
# Check for the use of default master user credentials
if cluster['MasterUsername'] == 'masteruser':
text_file.write(f'[Vulnerability] Use of Master User Credentials: Yes\n')
count += 1
else:
text_file.write(f'Use of Master User Credentials: No\n')
# Check for unused or idle clusters
if cluster['ClusterStatus'] == 'available' and cluster['NumberOfNodes'] == 0:
text_file.write(f'UnusedIdleCluster: Yes\n')
# Check for deprecated or vulnerable Redshift versions
if cluster['ClusterVersion'] == 'your-deprecated-version':
text_file.write(f'[Vulnerability] DeprecatedVulnerableVersion: Yes\n')
count += 1
# Check for publicly accessible snapshots
response = redshift_client.describe_cluster_snapshots()
publicly_accessible_snapshots = [snapshot for snapshot in response['Snapshots'] if
snapshot['SnapshotType'] == 'manual' and snapshot[
'SnapshotIdentifier'] != 'your-identifier']
for snapshot in publicly_accessible_snapshots:
text_file.write(
f'SnapshotIdentifier: {snapshot["SnapshotIdentifier"]}\n[Vulnerability] PubliclyAccessible: Yes\n')
count += 1
text_file.write(f'{count} vulnerabilities found\n\n')
print('Redshift Checks Completed')
def run():
redshift_checks()