forked from evankozliner/merkle-tree
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MerkleTree.py
205 lines (172 loc) · 7.41 KB
/
MerkleTree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
""" Evan Kozliner """
import os
import math
import hashlib
import copy
from typing import *
from Node import Node
class MerkleTree:
def __init__(self, items: List[str]):
""" Items are some list of data or relative paths you would like to act
as leaves in the tree.
"""
if len(items) <= 0:
raise Exception("items must contain at least 1" + \
"element for a valid merkle tree.")
self.is_built = False
self.root_hash = None
self.node_table = {}
self.max_height = math.ceil(math.log(len(items), 2))
self.leaves = [self._leafify(hash_)
for hash_ in [self._md5sum(item)
for item in items]]
if items and len(items) > 0:
self.tree_hash()
def tree_hash(self) -> None:
""" Builds a merkle tree by adding leaves one at a time to a stack,
and combining leaves in the stack when they are of the same height.
Expected items to be an array of type Node.
Also constructs node_table, a dict containing hashes that map to
individual nodes for auditing purposes.
Psuedo code can be found here:
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.84.9700&rep=rep1&type=pdf
"""
stack = []
self._handle_solo_node_case()
leaves_cp = copy.deepcopy(self.leaves)
while self.root_hash == None:
if len(stack) >= 2 and stack[-1].height == stack[-2].height:
mom = stack.pop()
dad = stack.pop()
child_hash = self._md5sum(mom.hash + dad.hash)
child = Node(mom, dad, child_hash)
self.node_table[child_hash] = child
mom.child = child
dad.child = child
if child.height == self.max_height:
self.root_hash = child.hash
stack.append(child)
elif len(self.leaves) > 0:
leaf = self.leaves.pop()
self.node_table[leaf.hash] = leaf
stack.append(leaf)
# Handle case where last 2 nodes do not match in height by "graduating"
# last node
else:
stack[-1].height += 1
self.is_built = True
self.leaves = leaves_cp
def audit(self, data: str, proof_hashes: List[str]) -> bool:
""" Returns a boolean testing if a data (a file or object)
is contained in the merkle tree.
proof_hashes are the nodes to hash the hash of data with
in order from the bottom of the tree to the second-to-last
level. len(proof_hashes) is expected to be the height of the
tree, ceil(log2(n)), as one node is needed for proof per layer.
If the tree has not been built, returns False for any data.
Expects a relative path for data if data is a file.
"""
if self.root_hash == None:
return False
hash_ = self._md5sum(data)
# A one element tree does not make much sense, but if one exists
# we simply need to check if the files hash is the correct root
if self.max_height == 0 and hash_ == self.root_hash:
return True
if self.max_height == 0 and hash_ != self.root_hash:
return False
proof_hashes_cp = copy.copy(proof_hashes)
return self._audit(hash_, proof_hashes_cp)
def get_branch(self, item: str) -> List[str]:
""" Returns an authentication path for an item (not hashed) in
the Merkle tree as a list in order from the top of the tree
to the bottom.
"""
if not self.is_built:
raise Exception("The Merkle Tree must be built before an \
authentication path is found.")
hash_ = self._md5sum(item)
if not hash_ in self._get_leaf_hashes():
raise Exception("The requested item is not in the merkle tree.")
return self._get_branch_by_hash(hash_)
def traverse(self) -> List[List[str]]:
""" Provides the authentication path for all the leaves in the tree e.g.
[hash_leaf_node_1: [auth_hash_1, auth_hash_2...]
...
]
"""
return [[leaf_hash, self._get_branch_by_hash(leaf_hash)]
for leaf_hash in self._get_leaf_hashes()]
def _leafify(self, data: str) -> Node:
leaf = Node(None, None, data)
leaf.height = 0
return leaf
def _get_branch_by_hash(self, hash_: str) -> List[str]:
""" Returns an authentication path as a list in order from the top
to the bottom of the tree (assumes preconditions have been checked).
"""
path = []
while hash_ != self.root_hash:
node = self.node_table[hash_]
child = node.child
spouse = child.get_parent_by_spouse(hash_)
path.append(spouse.hash)
hash_ = child.hash
# Keep the root hash on the auth path, even though clients should
# already have it.
path.append(hash_)
path.reverse()
return path
def _md5sum(self, data: str) -> str:
""" Returns an md5 hash of data.
If data is a file it is expected to contain its full path.
"""
data = str(data)
m = hashlib.md5()
if os.path.isfile(data):
try:
f = open(data, 'rb')
except:
raise Exception('ERROR: unable to open %s' % data)
# Avoid reading the whole file all at once in case of large file
while True:
d = f.read(8096)
if not d:
break
m.update(d)
f.close()
# Otherwise it could be either 1) a directory 2) miscellaneous data (like json)
else:
m.update(data.encode('utf-8'))
return m.hexdigest()
def _audit(self, questioned_hash: str, proof_hashes: List[str]) -> bool:
""" Tests if questioned_hash is a member of the merkle tree by
hashing it with its sibling until the root hash is reached.
"""
proof_hash = proof_hashes.pop()
if not proof_hash in self.node_table.keys():
return False
sibling = self.node_table[proof_hash]
child = sibling.child
# Because the order in which the hashes are concatenated matters,
# we must test to see if questioned_hash is the "mother" or "father"
# of its child (the hash is always build as mother + father).
if child.mom.hash == questioned_hash:
actual_hash = self._md5sum(questioned_hash + sibling.hash)
elif child.dad.hash == questioned_hash:
actual_hash = self._md5sum(sibling.hash + questioned_hash)
else:
return False
if actual_hash != child.hash:
return False
if actual_hash == self.root_hash:
return True
return self._audit(actual_hash, proof_hashes)
def _handle_solo_node_case(self,) -> None:
# The earlier method for building the tree will fail in a one node case
if len(self.leaves) == 1:
solo_node = self.leaves.pop()
self.root_hash = solo_node.hash
self.node_table[solo_node.hash] = solo_node
def _get_leaf_hashes(self) -> List[str]:
return [node.hash for node in self.leaves]