forked from wmbusmeters/wmbusmeters
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkem-import.py
229 lines (205 loc) · 10.3 KB
/
kem-import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# A Python script that decrypts the content of Kamstrup KEM file and imports meter files
# to wmbusmeters' config folder.
#
# The KEM file is a (sometimes zipped) xml file that contains xml-encrypted data using
# the xml-enc standard (http://www.w3.org/2001/04/xmlenc). The password needed to decrypt
# the xml-encrypted data can either be the CustomerId or something else selected by the
# person that has created the KEM file using Kamstrup software.
#
# This script takes the encrypted KEM file and decrypts its content (it automaticly detects
# the zip archive and extracts the kem file from it). The result is a XML with a list of meters
# with their types, serial numbers, keys, etc. The script prints the information about each meter
# to the console and populates wmbusmeters' config folder with corresponding meter files.
# Optionally, decrypted KEM file content can be saved to a file (option -o).
#
# Usage: python kem-import.py [options] <kem_file> <password>
# kem_file ... the name of the KEM file to decrypt or a zip archive having the KEM file in it
# password ... original password used to encrypt the content (16 characters maximum)
# options ... use -h switch to get a list of options with their descriptions
#
# The script is Python2/Python3 compatible.
#
from __future__ import print_function
import os
import sys
import re
import errno
import argparse
import base64
import zipfile
from xml.dom import minidom
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
help_prog = "Decrypts Kamstrup KEM file and imports meter information into wmbusmeters' config folder."
help_epilog = ""
help_kem = "The name of the KEM file to decrypt or a name of a zip archive that contains the encrypted KEM file."
help_pwd = "The original password used to encrypt the KEM file content (16 characters maximum)."
help_cfg = """Location of config files for wmbusmeters (default location is the current working directory).
This option has the same meaning as --useconfig option of the wmbusmeters daemon, i.e. --useconfig=/ will
populate /etc/wmbusmeters.d folder (must be run with sudo) and --useconfig=. (the default) populates
./etc/wmbusmeters.d folder. If the destination folder does not exist, it will be created; existing meter
files will be overwritten."""
help_dry = "No meter files will be created, only the info will be printed on the console."
help_out = "Save the decrypted KEM file content into a given file."
# define command line arguments
argparser = argparse.ArgumentParser(description=help_prog, epilog=help_epilog)
argparser.add_argument('kem_file', help=help_kem)
argparser.add_argument('password', help=help_pwd)
argparser.add_argument("-c", "--useconfig", type=str, action='store', dest='config', default=os.getcwd(), help=help_cfg)
argparser.add_argument("-n", "--dryrun", action='store_true', dest='dryrun', help=help_dry)
argparser.add_argument("-o", "--output", type=str, action='store', dest='output', help=help_out)
# parse command line arguments
args = argparser.parse_args()
# adjust the config folder location to full target path
args.config = args.config.strip(os.path.sep) + '/etc/wmbusmeters.d/'
# test if input file exists
if (not os.path.isfile(args.kem_file)):
print('ERROR: The input KEM file does not exist.')
sys.exit(1)
#end if
kem_file_content = None
# test if input file is a zip file
# yes: find a file with .kem extension in the zip and extract content of that file
# no : load the content of the input file directly
if (zipfile.is_zipfile(args.kem_file)):
print("Detected a zip file on input ... extracting")
with zipfile.ZipFile(args.kem_file,'r') as zipobj:
file_list = zipobj.namelist()
for file_name in file_list:
if file_name.endswith('.kem') or file_name.endswith('.kem2'):
kem_file_content = zipobj.read(file_name)
break
#end if
#end for
if (not kem_file_content):
print("ERROR: The zip file '%s' does not seem to contain any '.kem' file." % (args.kem_file))
sys.exit(1)
#end if
else:
# read content of the kem file
with open(args.kem_file,'r') as f:
kem_file_content = f.read()
#end if
# read and parse KEM file content and extract its encrypted part
# (KEM file is a a XML file formated according to http://www.w3.org/2001/04/xmlenc)
try:
xmldoc = minidom.parseString(kem_file_content)
encrypedtext = xmldoc.getElementsByTagName('CipherValue')[0].firstChild.nodeValue
encrypeddata = base64.b64decode(encrypedtext)
except:
print('ERROR: The KEM file does not seem to contain encrypted data.')
sys.exit(1)
# KEM file data is encrypted with AES-128-CBC cipher and decryption
# requires a 128bit key and 128bit initialization vector. In the case of KEM file,
# the initialization vector is the same as the key. If password length is less
# than 16 characters (128 bits), we pad the key with zeros up to its full length.
key = bytes(str(args.password).encode("utf-8"))
if (len(key) < 16): key += (16-len(key)) * b"\0"
# content decryption
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.CBC(key), backend=backend)
decryptor = cipher.decryptor()
decryptedtext = decryptor.update(encrypeddata) + decryptor.finalize()
try:
decodedtext = decryptedtext.decode('utf-8')
except UnicodeDecodeError:
print("ERROR: Looks like password is wrong - decryption failed!")
sys.exit(1)
# save decrypted XML file if requested
if (args.output):
f = open(args.output, 'wb')
f.write(decryptedtext)
f.close()
#end if
# create the folder for meter files if it does not exists
if (not args.dryrun) and (not os.path.exists(args.config)):
print("Creating target folder:", args.config)
try:
os.makedirs(args.config)
except OSError as e:
if (e.errno in [errno.EPERM, errno.EACCES]):
print(e)
print("You may need to use 'sudo' to access the target config folder.")
sys.exit(1)
else:
raise
#end if
def print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey):
# meter model identification
# CONTRIBUTING NOTE: additional meter types supported by wmbusmeters can be put here
# if their identification in KEM file is known
if (meterName == 'MC302') and (meterModel.startswith('302T')):
wmbusmeters_driver = 'multical302'
elif (meterName == 'MC303') and (meterModel.startswith('303')):
wmbusmeters_driver = 'multical303'
elif (meterName == 'MC403') and (meterModel.startswith('403')):
wmbusmeters_driver = 'multical403'
elif (meterName == 'MC21') and (meterModel.startswith('021')):
wmbusmeters_driver = 'multical21'
elif (meterName == 'MC603') and (meterModel.startswith('603')):
wmbusmeters_driver = 'multical603'
elif (meterName == 'KWM2210'):
wmbusmeters_driver = 'flowiq2200'
else:
wmbusmeters_driver = None
# print info to console
print('Found meter', meterName, '('+meterModel+')')
print(' number :', meterNum)
print(' serial :', meterSerial)
print(' type :', meterType)
print(' driver :', wmbusmeters_driver)
print(' config :', meterConfig)
print(' key :', meterKey)
if (not args.dryrun):
# save meter file in config folder if meter is supported by wmbusmeters
if (wmbusmeters_driver is not None):
try:
f = open(args.config+meterSerial, 'w')
f.write("name=%s\n" % (meterNum))
f.write("driver=%s\n" % (wmbusmeters_driver))
f.write("id=%s\n" % (meterSerial))
f.write("key=%s\n" % (meterKey))
f.close()
print(' meter file:', args.config+meterSerial)
except (IOError, OSError) as e:
if (e.errno in [errno.EPERM, errno.EACCES]):
print(e)
print("You may need to use 'sudo' to access the target config folder.")
sys.exit(1)
else:
raise
else:
print(' !! unknow meter type, meter file has not been created')
#end if
#end for
# parse the decrypted KEM file content and create corresponding meter files in
# wmbusmeters' config folder for each meter found in the XML file
trimmedText = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', decryptedtext.decode('utf-8'))
xmldoc = minidom.parseString(trimmedText)
if xmldoc.documentElement.tagName == "MetersInOrder":
for e in xmldoc.getElementsByTagName('Meter'):
# read information from source XML file
meterName = e.getElementsByTagName('MeterName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionType')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('MeterNo')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNo')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('VendorId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNo')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNo')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('DEK')[0].firstChild.nodeValue
print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey)
elif xmldoc.documentElement.tagName == "Devices":
for e in xmldoc.getElementsByTagName('Device'):
# read information from source XML file
meterName = e.getElementsByTagName('ShortName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionTypeName')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('CustomerDeviceNumber')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNumber')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('ManufacturerId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNumber')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNumber')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('Value')[0].firstChild.nodeValue
print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey)
else:
print("ERROR: Unable to extract details from file")
sys.exit(1)