Skip to content

Commit

Permalink
Switched exam result auditing encryption to NaCl
Browse files Browse the repository at this point in the history
  • Loading branch information
rhysyngsun authored and Tasawer Nawaz committed Apr 18, 2019
1 parent 2444119 commit 7096c12
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 255 deletions.
24 changes: 24 additions & 0 deletions exams/management/commands/create_nacl_keypair.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Command to generate a NaCl private and public key"""

from django.core.management.base import BaseCommand

from nacl.public import PrivateKey
from nacl.encoding import Base64Encoder


class Command(BaseCommand):
"""Creates a NaCl private and public key"""
help = 'Creates a NaCl private and public key'

def handle(self, *args, **options):
"""Handle the command"""
private_key = PrivateKey.generate()
public_key = private_key.public_key

self.stdout.write('--------------------------------------------------------------')
self.stdout.write('Private Key Base64 Encoded:')
self.stdout.write(Base64Encoder.encode(bytes(private_key)).decode("utf-8"))
self.stdout.write('--------------------------------------------------------------')
self.stdout.write('Public Key Base64 Encoded:')
self.stdout.write(Base64Encoder.encode(bytes(public_key)).decode("utf-8"))
self.stdout.write('--------------------------------------------------------------')
136 changes: 44 additions & 92 deletions exams/pearson/audit.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,27 @@
"""Exam auditing"""
import base64
import logging
import os
import re

from boto.s3 import (
connection,
key,
)
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from pretty_bad_protocol import gnupg
import pretty_bad_protocol._util
from nacl.encoding import Base64Encoder
from nacl.public import PublicKey, SealedBox

from micromasters import utils

SSE_ENCRYPTION_ALGORITHM = 'AES256'
REQUIRED_SETTINGS = [
'EXAMS_AUDIT_ENCRYPTION_FINGERPRINT',
'EXAMS_AUDIT_ENCRYPTION_PUBLIC_KEY',
]

log = logging.getLogger(__name__)

# monkey patch gnupg so it doesn't utf-8 encode our files
# see: https://github.com/isislovecruft/python-gnupg/issues/169
pretty_bad_protocol._util.binary = lambda data: data # pylint: disable=protected-access


class S3AuditStorage:
"""Audit storage mechanism for S3"""

def configure(self):
def _validate(self):
"""
Configures / validates the storage configuration
Expand All @@ -56,6 +46,7 @@ def get_connection(self):
Returns:
boto.s3.connection.S3Connection: a connection to S3
"""
self._validate()
return connection.S3Connection(
settings.EXAMS_AUDIT_AWS_ACCESS_KEY_ID,
settings.EXAMS_AUDIT_AWS_SECRET_ACCESS_KEY
Expand Down Expand Up @@ -93,118 +84,81 @@ def get_s3_key(self, filename, file_type):
)
)

def upload(self, filename, file_type):
def upload(self, filename, data, file_type):
"""
Uploads the file to S3
Args:
filename (str): filename of the encrypted file
data (str): the encrypted data
file_type (str): type of the encrypted file
Returns:
str: the key path in S3 where the file was stored
"""
s3_key = self.get_s3_key(filename, file_type)
s3_key.set_contents_from_filename(
filename,
s3_key.set_contents_from_string(
data,
headers={
'x-amz-server-side-encryption': SSE_ENCRYPTION_ALGORITHM,
}
)
return s3_key.key


class ExamDataAuditor:
def _get_public_key():
"""
Encrypted file auditor for exam requests/responses
"""
REQUEST = 'request'
RESPONSE = 'response'

def __init__(self, gpg=None, store=None):
self.gpg = gpg or gnupg.GPG()
self.store = store or S3AuditStorage()
self.configured = False

@property
def fingerprint(self):
"""The fingerprint for the public GPG key"""
return re.sub(r'\s+', '', settings.EXAMS_AUDIT_ENCRYPTION_FINGERPRINT)

@property
def public_key(self):
"""The public GPG key"""
return base64.b64decode(settings.EXAMS_AUDIT_ENCRYPTION_PUBLIC_KEY).decode('utf-8')
Get the configured PublicKey instance
def get_fingerprints(self):
"""
Get the fingerprints of keys currently loaded on the system
Returns:
list: list of keys
"""
return self.gpg.list_keys().fingerprints

def configure(self):
"""
Performs a one-time configuration of GPG keys
Raises:
ImproperlyConfigured: settings are missing or invalid
Returns:
bool: True if configuration successful
"""
if self.configured:
return
Returns:
PublicKey:
the public key as configured in settings
"""
if not settings.EXAMS_AUDIT_NACL_PUBLIC_KEY:
raise ImproperlyConfigured(
"EXAMS_AUDIT_NACL_PUBLIC_KEY is required but not set"
)

# if any of the settings are not set, we consider this feature not enabled
if not all(getattr(settings, key) for key in REQUIRED_SETTINGS):
raise ImproperlyConfigured(
"One of EXAMS_AUDIT_ENCRYPTION_PUBLIC_KEY or "
"EXAMS_AUDIT_ENCRYPTION_FINGERPRINT is required but not set"
)
return PublicKey(settings.EXAMS_AUDIT_NACL_PUBLIC_KEY, encoder=Base64Encoder)

if self.fingerprint in self.get_fingerprints():
self.configured = True
return

result = self.gpg.import_keys(self.public_key)
def _get_sealed_box():
"""
Get a NaCl SealedBox configured with the public key
if result.counts['not_imported'] != 0:
raise ImproperlyConfigured('Error importing GPG keys for exam auditing: {}'.format(result.summary()))
Returns:
SealedBox:
the configured SealedBox
"""
return SealedBox(_get_public_key())

if self.fingerprint not in result.fingerprints:
raise ImproperlyConfigured(
"Exam auditing GPG key successfully imported but {actual} "
"does not contain fingerprint: {expected}".format(
actual=result.fingerprints,
expected=self.fingerprint,
)
)

self.store.configure()
class ExamDataAuditor:
"""
Encrypted file auditor for exam requests/responses
"""
REQUEST = 'request'
RESPONSE = 'response'

self.configured = True
def __init__(self, store=None):
self.store = store or S3AuditStorage()

def encrypt(self, filename, encrypted_filename):
"""
Encrypts the local file with GPG
Encrypts the local file
Args:
filename (str): absolute path to the local file
encrypted_filename (str): absolute path to the local encrypted file
Returns:
str:
the encrypted data
"""
log.debug('Encrypting file %s to %s', filename, encrypted_filename)

with open(filename, 'rb') as source_file:
self.gpg.encrypt(
source_file,
self.fingerprint,
armor=False,
symmetric=False,
output=encrypted_filename
)
return _get_sealed_box().encrypt(source_file.read())

def upload_encrypted_file(self, filename, file_type):
"""
Expand All @@ -217,11 +171,11 @@ def upload_encrypted_file(self, filename, file_type):
Returns:
str: path to the stored file
"""
encrypted_filename = '{}.gpg'.format(filename)
encrypted_filename = '{}.nacl'.format(filename)
try:
self.encrypt(filename, encrypted_filename)
encrypted_data = self.encrypt(filename, encrypted_filename)

return self.store.upload(encrypted_filename, file_type)
return self.store.upload(encrypted_filename, encrypted_data, file_type)
finally:
utils.safely_remove_file(encrypted_filename)

Expand All @@ -239,8 +193,6 @@ def audit_file(self, filename, file_type):
if not settings.EXAMS_AUDIT_ENABLED:
return None

self.configure()

return self.upload_encrypted_file(filename, file_type)

def audit_response_file(self, filename):
Expand Down
Loading

0 comments on commit 7096c12

Please sign in to comment.