-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
letsencrypt_cert.py
162 lines (141 loc) · 6.84 KB
/
letsencrypt_cert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Manage certificates from Let's Encrypt.
LetsEncryptCert will create a certificate from letsencrypt to be used by the
associated webserver. LetsEncryptCert uses the http challenge method and it
assumes that there is a web server available for the challenge. It shares a
volume for the certbot data as well as the letsencrypt certificates.
"""
import os
from pathlib import Path
import time
from typing import List
from base_cert import BaseCert
import requests
from self_signed_cert import SelfSignedCert
from utils import get_setting, update_link
class LetsEncryptCert(BaseCert):
"""SSL Certificate class to create and renew certs from Let's Encrypt."""
def __init__(self) -> None:
"""Initialize class from environment variables."""
# pylint: disable=too-many-instance-attributes
# Ten are required in this case.
self.cert_store: str = get_setting("CERT_STORE")
self.server_name: str = get_setting("SERVER_NAME")
self.email: str = get_setting("CERT_EMAIL")
self.max_connect_tries: int = get_setting("MAX_CONNECT_TRIES")
self.staging = get_setting("CERT_STAGING") != "0"
self.le_dir = Path("/etc/letsencrypt")
self.cert_dir = Path(f"{self.le_dir}/live/{self.server_name}")
self.nginx_cert_dir = Path(f"{self.cert_store}/nginx/{self.server_name}")
self.cert = Path(f"{self.cert_dir}/fullchain.pem")
self.renew_before_expiry: int = get_setting("CERT_SELF_RENEWAL")
def create(self) -> None:
"""
Create an SSL Certificate from Let's Encrypt.
The method first creates a self-signed certificate and sets up a symbolic
link from the Nginx configured location to the self-signed certificate.
This allows the Nginx webserver to start. Once Nginx is up, then the
certificate can be requested using the webroot authentication method. If
this is successful, then the symbolic link is moved to point to the
new certificate.
NOTE:
Nginx needs to be restarted/reloaded for it to use the new certificate.
"""
if not self.cert.exists():
# Create a self-signed certificate so that the Nginx webserver can
# come up and be available for the HTTP challenges from letsencrypt
temp_cert = SelfSignedCert(expire=1, renew_before_expiry=0)
temp_cert.create()
# Check to see if we have a certificate from Let's Encrypt by seeing
# the the Nginx webserver is configured to use a certificate in
# the folder where the letsencrypt certificates are stored.
# It does NOT verify that the certificate exists.
is_letsencrypt_cert = False
if self.nginx_cert_dir.is_symlink():
link_target = os.readlink(self.nginx_cert_dir)
if link_target == self.cert_dir:
is_letsencrypt_cert = True
# if we do not have a certificate from letsncrypt, then we:
# 1. wait for the webserver to come up since we use the webroot
# challenge method;
# 2. request a certificate from Let's Encrypt using certbot
# 3. update the Nginx configuration to use the new certificate.
if not is_letsencrypt_cert:
if not self.wait_for_webserver():
print("Could not connect to webserver")
return
# Get additional variables for certbot
domain_list: List[str] = [self.server_name]
domain_list.extend(get_setting("CERT_ADDL_DOMAINS").split())
if self.get_cert(domain_list):
# update the certificate link for the Nginx web server
update_link(self.cert_dir, self.nginx_cert_dir)
self.update_renew_before_expiry(self.server_name, self.renew_before_expiry)
def renew(self) -> None:
"""Renew all letsencrypt certificates that are up for renewal."""
os.system("certbot renew")
def get_le_dir(self) -> Path:
"""Get the directory where Let's Encrypt stores certificate info."""
return self.le_dir
def get_cert(self, domain_list: List[str]) -> bool:
"""
Get a certificate for all the domains in the supplied list.
Get a single certificate that can be used for a list of domains. The
first domain in the list is the
"""
if domain_list is not None:
domain_args = f"-d {' -d '.join(domain_list)}"
if not self.email:
email_arg = "--register-unsafely-without-email"
else:
email_arg = f"--email {self.email}"
staging_arg = "--staging" if self.staging else ""
cert_cmd = (
f"certbot certonly --webroot -w /var/www/certbot "
f"{staging_arg} "
f"{email_arg} "
f"{domain_args} "
"--rsa-key-size 4096 "
"--agree-tos "
"--non-interactive "
)
print(f"Requesting Let's Encrypt Certificate:\n{cert_cmd}")
return os.system(cert_cmd) == 0
return False
def wait_for_webserver(self) -> bool:
"""
Wait until Nginx webserver is up.
wait_for_webserver will wait until the Nginx webserver has started. This
is done by periodically sending an http request to our URL and waiting for
a response of 200 (OK) or 301 (Redirected). We do not allow redirects nor
do we send an https request because the webserver is started up with a
self-signed certificate which will cause an error in these cases.
"""
attempt_count = 0
while attempt_count < self.max_connect_tries:
try:
# Don't allow redirect errors since letsencrypt connects
# on port 80 and since we have a self-signed cert for now
# it will cause errors
resp: requests.Response = requests.get(
f"http://{self.server_name}", allow_redirects=False
)
except requests.ConnectionError:
attempt_count += 1
else:
if resp.status_code in (200, 301):
return True
attempt_count += 1
time.sleep(10)
return False
def update_renew_before_expiry(self, domain: str, renew_before_expiry_period: int) -> None:
"""Update the RENEW_BEFORE_EXPIRY configuration value for 'domain'."""
renew_before_expiry = str(renew_before_expiry_period)
print(f"Setting renew before expiry for {domain} " f"to {renew_before_expiry}")
renew_config = f"{self.le_dir}/renewal/{domain}.conf"
if os.path.exists(renew_config):
os.system(
"sed -i 's/#* *renew_before_expiry = [0-9][0-9]* days/"
f"renew_before_expiry = {renew_before_expiry} days/' "
f" {renew_config}"
)