-
Notifications
You must be signed in to change notification settings - Fork 0
/
install_to_asa.py
executable file
·285 lines (199 loc) · 9 KB
/
install_to_asa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#!/usr/bin/env python3
import sys
import os
import requests
import requests.exceptions
import logging
import json
import base64
import re
from pprint import pprint
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.hazmat.backends import default_backend
# Script expects its parameters as environment variables exported from getssl.cfg
# Hostname of the ASA. Required.
hostname = os.getenv('ASA_SSL_HOSTNAME')
# Username, requires privilege 15. Required.
username = os.getenv('ASA_SSL_USERNAME')
# Password. Required.
password = os.getenv('ASA_SSL_PASSWORD')
# Prefix of the trustpoints to create. Should not match any existing trustpoints as this
# script will remove them. Optional. Default: TrustPoint_LE_L
trustpoint_prefix = os.getenv('ASA_SSL_TRUSTPOINT_PREFIX', default='TrustPoint_LE_')
# File containing the certificate to deploy. Required.
cert_file = os.getenv('ASA_SSL_CERT')
# File containing the chain up to and including the root, but NOT including our certificate. Required.
chain_file = os.getenv('ASA_SSL_CHAIN')
# File containing the private key. Required.
key_file = os.getenv('ASA_SSL_KEY')
# Whether to disable SSL validation for the API calls. Needed if deploying for the first time
# or using staging certificates. Not recommended otherwise. Optional. Default: 0.
insecure = int(os.getenv('ASA_SSL_INSECURE', 0))
# Whether to enable debugging output. Usually exported from getssl so getssl can be run with -d.
# Optional. Default: 0
debug = int(os.getenv('_USE_DEBUG', 0)) == 1
# Which interfaces to apply the trustpoint (ssl trustpoint <tp> <interface>). Comma-separated.
# Optional. Default: outside. Can be blank.
interfaces = os.getenv('ASA_SSL_INTERFACES', default='outside').split(',')
# Whether to also set the ikev2 remote-access trustpoint for IKEv2. Set to 'only' this the only
# trustpoint (normal case), set it to 'add' to just add it to the existing list, set it to a number
# to insert it at that line (usually 1), leave it empty to not set it at all.
# Optional. Default: empty
ikev2 = os.getenv('ASA_SSL_IKEV2', default='')
# End of environment variable parameters
# Enable full HTTP debugging
HTTP_DEBUG = False
api_base_url = 'https://' + hostname + '/api'
ssl_verify = True if insecure == 0 else False
# Passphrase used to encrypt the pkcs12 file while it is delivered to the ASA.
pass_phrase = 'cisco123'
# Authentication
#import base64
#base64string = base64.b64encode(('%s:%s' % (username, password)).encode('utf-8'))
#pprint(base64string)
#req.add_header("Authorization", "Basic %s" % base64string)
# HTTP debugging
if HTTP_DEBUG:
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
def load_certificate(cert_file):
with open(cert_file, 'rb') as f:
return x509.load_pem_x509_certificate(f.read(), default_backend())
# The chain file can contain multiple certificates and cryptography has no way
# to load the all. Separate them and load them into an array.
def load_chain(chain_file):
chain_certs = []
with open(chain_file, 'r') as f:
accum = ''
state = 0 # // 1 = in cert
for line in f.readlines():
if state == 0 and line.strip() == '-----BEGIN CERTIFICATE-----':
accum = line
state = 1
elif state == 1:
accum += line
if line.strip() == '-----END CERTIFICATE-----':
state = 0
chain_cert = x509.load_pem_x509_certificate(accum.encode('UTF-8'), default_backend())
chain_certs.append(chain_cert)
return chain_certs
def load_private_key(key_file):
with open(key_file, 'rb') as f:
return serialization.load_pem_private_key(f.read(), None, default_backend())
def api_call(url, http_method, data):
headers = { 'Content-Type': 'application/json', 'User-Agent': 'REST API Agent' }
response = requests.request(http_method, url, data=json.dumps(data), auth=(username, password), headers=headers, verify=ssl_verify)
if response.status_code > 299:
try:
json_error = json.loads(response.text)
if json_error and 'messages' in json_error:
code = json_error['messages'][0]['code']
details = json_error['messages'][0]['details']
raise requests.exceptions.HTTPError(code + ': ' + details)
elif json_error and 'response' in json_error:
# cli errors are just in the response message
raise requests.exceptions.HTTPError(' '.join(json_error['response']))
except ValueError:
pass
response.raise_for_status()
return response
def list_trustpoints():
trustpoints = []
response = json.loads(api_call(api_base_url + '/certificate/identity', 'GET', {} ).text)
for item in response['items']:
trustpoints.append(item['name'])
return trustpoints
def delete_trustpoint(trustpoint):
api_call(api_base_url + '/certificate/identity/' + trustpoint, 'DELETE', {} )
def exec(cmds):
if not isinstance(cmds, list):
cmds = [ cmds ]
if debug:
pprint(cmds)
return json.loads(api_call(api_base_url + '/cli', 'POST', { 'commands': cmds } ).text)
def get_ikev2_trustpoints():
trustpoints = []
output = exec('sh run crypto ikev2 | begin crypto ikev2 remote-access trustpoint')
lines = output['response'][0].strip().split("\n")
for line in lines:
match = re.match(r"^\s*crypto ikev2 remote-access trustpoint ([^ ]+)$", line.strip())
if match:
trustpoints.append(match.group(1))
return trustpoints
# 1. Load certificate and convert to pkcs12 format
cert = load_certificate(cert_file)
chain_certs = load_chain(chain_file)
key = load_private_key(key_file)
# Convert to pkcs12
p12 = pkcs12.serialize_key_and_certificates('mycert'.encode('UTF-8'), key, cert, chain_certs,
serialization.BestAvailableEncryption(pass_phrase.encode('UTF-8')))
# Conert to cisco-format base64
p12_base64 = "-----BEGIN PKCS12-----\n" + base64.encodebytes(p12).decode('utf-8') + "-----END PKCS12-----\n"
# Create trustpoint name based on prefix and serial number in hex
serial_hex = hex(cert.serial_number).split('x')[-1]
trustpoint = trustpoint_prefix + serial_hex
if debug:
print("Trustpoint name is %s" % (trustpoint))
# 2. Upload the certificate to a new trustpoint if it doesn't exist
existing_trustpoints = list_trustpoints()
if trustpoint not in existing_trustpoints:
if debug:
print("Trustpoint does not exist, adding")
data = {
'kind': 'object#IdentityCertificate',
'name': trustpoint,
'certText': p12_base64.splitlines(),
'certPass': pass_phrase,
}
url = api_base_url + '/certificate/identity'
response = api_call(url, 'POST', data)
if debug:
print("Added trustpoint")
else:
if debug:
print("Trustpoint already exists")
# 3. Assign the trustpoint to the requested interfaces
cmds = []
for interface in interfaces:
cmds.append('ssl trust-point %s %s' % (trustpoint, interface))
if len(cmds) > 0:
if debug:
print("Assigning cert to interfaces")
exec(cmds)
# TODO: Add the ability to assign certificates to SNI hostnames instead, using
# ssl trust-point <trustpoint> domain <sni-hostname>
# 4. Assign the trustpoint to IKEv2
if ikev2 != '':
if debug:
print("Assigning ikev2 remote-access trustpoint")
ikev2_trustpoints = get_ikev2_trustpoints()
cmds = []
if trustpoint not in ikev2_trustpoints:
# add it
if (ikev2 == 'only' or ikev2 == 'add') :
cmds.append('crypto ikev2 remote-access trustpoint %s' % (trustpoint))
elif ikev2.isdigit():
cmds.append('crypto ikev2 remote-access trustpoint %s line %s' % (trustpoint, ikev2))
if ikev2 == 'only':
# remove anything apart from ourselves
for old_trustpoint in ikev2_trustpoints:
if old_trustpoint != trustpoint:
cmds.append('no crypto ikev2 remote-access trustpoint %s' % (old_trustpoint))
if len(cmds) > 0:
exec(cmds)
# 5. remove any old certificates created by this script:
# - that were installed by this script (so begin with trustpoint_prefix)
# - that don't begin with trustpoint_prefix + current_serial (because multiple can be installed for the CA, e.g. TP_MYSERIAL and TP_MYSERIAL-1
# NB. A unique prefix is required if the device has more than one LE certificate, or this script will remove the others!
for old_trustpoint in existing_trustpoints:
if old_trustpoint.startswith(trustpoint_prefix) and not old_trustpoint.startswith(trustpoint):
if debug:
print('Removing old trustpoint: %s' % (old_trustpoint))
delete_trustpoint(old_trustpoint)