-
Notifications
You must be signed in to change notification settings - Fork 1
/
php-analyser.py
535 lines (426 loc) · 24.8 KB
/
php-analyser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
import sys, json
from AST.stmt_expr import Stmt_Expression
from AST.expr_assign import Expr_Assign
from AST.expr_variable import Expr_Variable
from AST.expr_const_fetch import Expr_Const_Fetch
from AST.stmt_if import Stmt_If
from AST.stmt_else import Stmt_Else
from AST.stmt_elseif import Stmt_ElseIf
from AST.stmt_switch import Stmt_Switch
from AST.stmt_case import Stmt_Case
from AST.stmt_nop import Stmt_Nop
from AST.stmt_while import Stmt_While
from AST.stmt_for import Stmt_For
from AST.stmt_break import Stmt_Break
from AST.name import Name
from AST.arg import Arg
from AST.expr_funccall import Expr_FuncCall
from AST.symbol_table import Symbol_Table
from AST.binopexpr import Binop_Expr
from AST.implicit_checker import Implicit_Checker
from AST.expr_not import Expr_Not
from AST.inc_dec import Inc_Dec
from AST.expr_array_dim_fetch import Expr_Array_Dim_Fetch
from policy import Policy
from vulnerability import Vulnerability
from copy import deepcopy
# for coloured output
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def main(argv, arg):
# check the number of arguments received
if (arg != 3):
print(bcolors.FAIL + "Usage: python php-analyser.py <code_slice>.json <vuln_patter>.json" + bcolors.ENDC)
sys.exit(1)
# get ast_slice content
try:
with open(argv[1], 'r') as file:
json_ast = file.read()
except FileNotFoundError: #file doesn't exist or wrong path
print(bcolors.FAIL + "File containing AST slice not found." + bcolors.ENDC)
sys.exit(1)
# get pattern content
try:
with open(argv[2], 'r') as file:
json_pattern = file.read()
except FileNotFoundError:
print(bcolors.FAIL + "File containing vulnerability pattern not found." + bcolors.ENDC)
sys.exit(1)
parsed_patterns = json.loads(json_pattern)
parsed_ast = json.loads(json_ast)
# get output filename
# slices_ast/1a-basic-flow.json -> output/1a-basic-flow.output.json
B = [x for x in argv[1].split('/') if x.strip()]
B = [x for x in B[1].split('.') if x.strip()]
output_file = 'output/' + B[0] + '.output.json'
# create vulnerabilities from pattern
vulnerabilities = []
for pattern in parsed_patterns:
vulnerabilities.append(Vulnerability(pattern['vulnerability'], pattern['sources'], pattern['sanitizers'], pattern['sinks'], pattern['implicit'], output_file))
# create policies for each vulnerability
policies = []
for vulnerability in vulnerabilities:
policy = Policy(vulnerability.get_sources(), vulnerability)
policies.append(policy)
print(policy)
output = []
for policy in policies:
print(bcolors.OKBLUE + "POLICY: " + bcolors.ENDC + str(policy))
# create the AST nodes for the corresponding json
symbol_table = Symbol_Table()
# create implicit checker
if policy.get_vulnerability().is_implicit():
implicit_checker = Implicit_Checker()
else:
implicit_checker = None
create_nodes(parsed_ast, symbol_table, policy, implicit_checker)
output += policy.get_vulnerability().output
with open(output_file, 'w') as outfile:
json.dump(output, outfile, ensure_ascii=False, indent=4)
def create_nodes(parsed_ast, symbol_table=None, policy=None, implicit_checker=None):
"""
Given a json, parse it and create the corresponding AST nodes
"""
if (type(parsed_ast) == list): # if we receive a list of instructions (list of dictionaries)
instructions = []
for instruction in parsed_ast:
instructions.append(create_nodes(instruction, symbol_table, policy, implicit_checker)) #create the nodes for each instruction
for instruction in instructions:
print(bcolors.HEADER + "Instruction: " + bcolors.ENDC + str(instruction))
return instructions
elif (type(parsed_ast) == dict): # if we receive a single instruction
# get the type of the node we're analyzing
node_type = parsed_ast['nodeType']
# <--- EXPRESSION --->
if (node_type == "Stmt_Expression"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
return Stmt_Expression(create_nodes(parsed_ast['expr'], symbol_table, policy, implicit_checker))
# <--- ASSIGNMENT --->
elif ("Expr_Assign" in node_type):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
rval = create_nodes(parsed_ast['expr'], symbol_table, policy, implicit_checker)
lval = create_nodes(parsed_ast['var'], symbol_table, policy, implicit_checker)
# Only remove variable from being a self source on assigns
# i.e. not on += operations
if (node_type == "Expr_Assign"):
# initialized variables: remove from source
if not lval.is_source():
lval.del_source(lval.name)
# Join sources of rval and lval
sources = policy.lub(deepcopy(lval.get_sources()), deepcopy(rval.get_sources()))
# Join sanitized sources of rval and lval
sanitized_sources = policy.lub(deepcopy(lval.get_sanitized_sources()), deepcopy(rval.get_sanitized_sources()))
# Propagate sanitizers to lval
for sanitizer in rval.get_sanitizers():
lval.add_sanitizer(sanitizer)
# add implicit sources, sanitized sources and sanitizers to variable
if policy.get_vulnerability().is_implicit():
sources = policy.lub(sources, deepcopy(implicit_checker.get_flat_sources()))
sanitized_sources = policy.lub(sanitized_sources, deepcopy(implicit_checker.get_flat_sanitized_sources()))
for implicit_sanitizer in implicit_checker.get_flat_sanitizers():
lval.add_sanitizer(implicit_sanitizer)
# Propagate sources to lval
lval.set_sources(sources)
print("SOURCES: " + str(lval.get_sources()))
# Propagate sanitized sources to lval
lval.set_sanitized_sources(sanitized_sources)
print("SANITIZED SOURCES: " + str(sanitized_sources))
# eOutput vulnerabilities
if lval.is_sink():
if policy.get_vulnerability().is_implicit(): # output implicit vulnerabilities
for implicit_source in implicit_checker.get_flat_sources():
policy.get_vulnerability().add_instance(implicit_source, lval.get_name(), True, [])
for implicit_sanitized_source in implicit_checker.get_flat_sanitized_sources():
implicit_sanitizers_list_copy = deepcopy(implicit_checker.get_flat_sanitizers())
policy.get_vulnerability().add_instance(implicit_sanitized_source, lval.get_name(), False, implicit_sanitizers_list_copy)
# output explicit vulnerabilities
for source in policy.lub(rval.get_sources(), lval.get_sources()):
source_copy = deepcopy(source)
policy.get_vulnerability().add_instance(source_copy, lval.get_name(), True, [])
for sanitized_source in rval.get_sanitized_sources():
sanitized_source_copy = deepcopy(sanitized_source)
sanitizers_list_copy = deepcopy(rval.get_sanitizers())
policy.get_vulnerability().add_instance(sanitized_source_copy, lval.get_name(), False, sanitizers_list_copy)
return Expr_Assign(lval, rval)
# <--- VARIABLE --->
elif (node_type == "Expr_Variable"):
name = "$" + parsed_ast['name']
print(bcolors.OKGREEN + node_type + " -> " + name + bcolors.ENDC)
variable = symbol_table.get_variable(name)
if variable is None:
print('variable is not in symtable')
variable = Expr_Variable(name, policy.get_vultype(name))
symbol_table.add_variable(variable)
return variable
# <--- BINARY EXPRESSIONS --->
elif ("Expr_BinaryOp" in node_type):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
left = create_nodes(parsed_ast['left'], symbol_table, policy, implicit_checker)
right = create_nodes(parsed_ast['right'], symbol_table, policy, implicit_checker)
expr = Binop_Expr(left, right)
# set the expression's sources, sanitized_sources and sanitizers
expr.set_sources(policy.lub(left.get_sources(), right.get_sources()))
expr.set_sanitized_sources(policy.lub(left.get_sanitized_sources(), right.get_sanitized_sources()))
sanitizers = []
for sanitizer in left.get_sanitizers() + right.get_sanitizers():
if sanitizer not in sanitizers:
sanitizers.append(sanitizer)
expr.set_sanitizers(sanitizers)
return expr
# <--- SCALARS --->
elif ("Scalar_" in node_type):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
return Stmt_Expression(parsed_ast['value'])
# <--- IF --->
elif (node_type == "Stmt_If"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
cond = create_nodes(parsed_ast['cond'], symbol_table, policy, implicit_checker)
# push context into implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.push(cond.get_sources(), cond.get_sanitizers(), cond.get_sanitized_sources())
# create copies of the original symbol_table
symbol_table_if = deepcopy(symbol_table)
symbol_table_else = deepcopy(symbol_table)
symbol_table_initial = deepcopy(symbol_table)
# visit statements in if and else bodies with a copy of the original context symbol_table
stmts = create_nodes(parsed_ast['stmts'], symbol_table_if, policy, implicit_checker)
else_clause = create_nodes(parsed_ast['else'], symbol_table_else, policy, implicit_checker)
# merge resulting symbol_table from if branch with resulting symbol_table from else branch
merged_symbol_table, common_variables = symbol_table_if.merge_symbols(symbol_table_else, policy)
# add variables that had not been initialized before the if
symbol_table.add_missing_variables(merged_symbol_table, common_variables)
# elseifs
elseif_list = parsed_ast['elseifs']
elseifs = []
for elseif in elseif_list: # for each esleif use the initial symbol_table and propagate the changes
symbol_table_elseif = deepcopy(symbol_table_initial)
elseifs.append(create_nodes(elseif, symbol_table_elseif, policy, implicit_checker))
# propagate changed symbol table
merged_symbol_table, common_variables = symbol_table_elseif.merge_symbols(symbol_table, policy)
symbol_table.add_missing_variables(merged_symbol_table, common_variables)
# pop context out of implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.pop()
return Stmt_If(cond, stmts, elseifs, else_clause)
# <--- STMT ELSE --->
elif (node_type == "Stmt_Else"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
stmts = create_nodes(parsed_ast['stmts'], symbol_table, policy, implicit_checker)
return Stmt_Else(stmts)
# <--- STMT ELSEIF OR CASE--->
elif (node_type == "Stmt_ElseIf") or (node_type == "Stmt_Case"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
cond = create_nodes(parsed_ast['cond'], symbol_table, policy, implicit_checker)
# push context into implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.push(cond.get_sources(), cond.get_sanitizers(), cond.get_sanitized_sources())
symbol_table_stmts = deepcopy(symbol_table)
stmts = create_nodes(parsed_ast['stmts'], symbol_table_stmts, policy, implicit_checker)
symbol_table.add_missing_variables(symbol_table_stmts, symbol_table_stmts.get_variables())
# pop context out of implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.pop()
if node_type == "Stmt_Case":
return Stmt_Case(cond, stmts)
else:
return Stmt_ElseIf(cond, stmts)
# <--- FUNCTION CALL, ECHO --->
elif (node_type == "Expr_FuncCall") or (node_type == "Stmt_Echo"):
if (node_type == "Expr_FuncCall"):
name = parsed_ast['name']['parts'][0]
args_list = parsed_ast['args']
else:
name = "echo"
args_list = parsed_ast['exprs']
print(bcolors.OKGREEN + node_type + " -> " + name + bcolors.ENDC)
args = []
for arg in args_list:
args.append(create_nodes(arg, symbol_table, policy, implicit_checker))
funcall = Expr_FuncCall(name, args, policy.get_vultype(name))
if funcall.is_sanitizer():
funcall.add_sanitizer([funcall.get_name()])
if policy.get_vulnerability().is_implicit():
sources = policy.lub(deepcopy(funcall.get_sources()), deepcopy(implicit_checker.get_flat_sources()))
sanitized_sources = policy.lub(deepcopy(funcall.get_sanitized_sources()), deepcopy(implicit_checker.get_flat_sanitized_sources()))
for implicit_sanitizer in implicit_checker.get_flat_sanitizers():
funcall.add_sanitizer(implicit_sanitizer)
# sources contain sources from unsanitized flows
# sanitized_sources contain sources from sanitized flows
for arg in args:
# function sources: l.u.b. with the arg's
if policy.get_vulnerability().is_implicit():
sources = policy.lub(sources, deepcopy(arg.get_sources()))
sanitized_sources = policy.lub(sanitized_sources, deepcopy(arg.get_sanitized_sources()))
else:
sources = policy.lub(deepcopy(funcall.get_sources()), deepcopy(arg.get_sources()))
sanitized_sources = policy.lub(deepcopy(funcall.get_sanitized_sources()), deepcopy(arg.get_sanitized_sources()))
# function sanitizers: union with the arg's
for sanitizer in arg.get_sanitizers():
if funcall.is_sanitizer() and sanitizer not in funcall.get_sanitizers():
sanitizer = [funcall.get_name()] + sanitizer # add funcal name to beginning of list
funcall.add_sanitizer(sanitizer)
# function sanitized sources: l.u.b. with the arg's
funcall.set_sanitized_sources(sanitized_sources)
funcall.set_sources(sources)
# -------------------- #
# explicit leaks
# sensitive function: add sources and sanitized sources
if funcall.is_sink():
for source in arg.get_sources():
# the arg's (unsanitized) sources are the function's
policy.get_vulnerability().add_instance(source, funcall.get_name(), True, [])
for sanitized_source in arg.get_sanitized_sources():
# the arg's sanitized sources are the function's
sanitizers_list_copy = deepcopy(arg.get_sanitizers())
policy.get_vulnerability().add_instance(sanitized_source, funcall.get_name(), False, sanitizers_list_copy)
# if the function is a sanitizer: all of its sources are now sanitized
if funcall.is_sanitizer():
funcall.add_sanitized_sources(funcall.get_sources())
funcall.sources = []
# implicit leaks
if funcall.is_sink():
if policy.get_vulnerability().is_implicit():
for implicit_source in implicit_checker.get_flat_sources():
policy.get_vulnerability().add_instance(implicit_source, funcall.get_name(), True, [])
for implicit_sanitized_source in implicit_checker.get_flat_sanitized_sources():
implicit_sanitized_source_copy = deepcopy(implicit_sanitized_source)
policy.get_vulnerability().add_instance(implicit_sanitized_source, funcall.get_name(), False, implicit_sanitized_source_copy)
return funcall
# <--- STMT SWITCH --->
elif (node_type == "Stmt_Switch"):
cond = create_nodes(parsed_ast['cond'], symbol_table, policy, implicit_checker)
# push context into implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.push(cond.get_sources(), cond.get_sanitizers(), cond.get_sanitized_sources())
symbol_table_switch = deepcopy(symbol_table)
cases = create_nodes(parsed_ast['cases'], symbol_table_switch, policy, implicit_checker)
# add variables that had not been initialized before the switch
symbol_table.add_missing_variables(symbol_table_switch, symbol_table_switch.get_variables())
# pop context out of implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.pop()
return Stmt_Switch(cond, cases)
# <--- STMT CASE --->
elif (node_type == "Stmt_Case"):
cond = create_nodes(parsed_ast['cond'], symbol_table, policy, implicit_checker)
stmts = create_nodes(parsed_ast['stmts'], symbol_table, policy, implicit_checker)
return Stmt_Case(cond, stmts)
# <--- NAME --->
elif (node_type == "Name"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
return Name(parsed_ast['parts'])
# <--- ARG --->
elif (node_type == "Arg"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
value = create_nodes(parsed_ast['value'], symbol_table, policy, implicit_checker)
return Arg(value)
# <--- BREAK --->
elif (node_type == "Stmt_Break"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
return Stmt_Break(parsed_ast['num'])
# <--- STMT NOP --->
elif (node_type == "Stmt_Nop"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
return Stmt_Nop()
# <--- STMT WHILE --->
elif (node_type == "Stmt_While"):
symtable_body = deepcopy(symbol_table)
last_symtable = None
# we iterate the node several times, because taintedness from can spread
while True:
condition = create_nodes(parsed_ast['cond'], symtable_body, policy, implicit_checker)
# implicit leaks: push the condition's sources, sanitizers and sanitized_sources
if policy.get_vulnerability().is_implicit():
implicit_checker.push(condition.get_sources(), condition.get_sanitizers(), condition.get_sanitized_sources())
stmts = create_nodes(parsed_ast['stmts'], symtable_body, policy, implicit_checker)
if last_symtable is not None:
oldLastSymtable = deepcopy(last_symtable)
last_symtable, _ = last_symtable.merge_symbols(symtable_body, policy)
# if merging the current symtable with the previous changed nothing, we can leave
if oldLastSymtable == last_symtable:
# pop context out of implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.pop()
break
# first iteration
else:
last_symtable = deepcopy(symtable_body)
# pop context out of implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.pop()
# add what's new from the execution of the loop
symbol_table.add_missing_variables(last_symtable, last_symtable.get_variables())
return Stmt_While(condition, stmts)
# <--- FOR --->
elif (node_type == "Stmt_For"):
init = create_nodes(parsed_ast['init'], symbol_table, policy, implicit_checker)
symtable_body = deepcopy(symbol_table)
last_symtable = None
while True:
sources, sanitizers, sanitized_sources = [], [], []
conditions = []
conds = parsed_ast['cond']
for cond in conds:
condition = create_nodes(cond, symtable_body, policy, implicit_checker)
conditions.append(condition)
# gather sources, sanitizers and sanitized sources
if policy.get_vulnerability().is_implicit():
# sources
sources = policy.lub(sources, condition.get_sources())
# sanitized_sources
sanitized_sources = policy.lub(sanitized_sources, condition.get_sanitized_sources())
# sanitizers
for sanitizer in condition.get_sanitizers():
if sanitizer not in sanitizers:
sanitizers.append(sanitizer)
# push context into implicit_checker stack
if policy.get_vulnerability().is_implicit():
implicit_checker.push(condition.get_sources(), condition.get_sanitizers(), condition.get_sanitized_sources())
stmts = create_nodes(parsed_ast['stmts'], symtable_body, policy, implicit_checker)
loop = create_nodes(parsed_ast['loop'], symtable_body, policy, implicit_checker)
if last_symtable is not None:
oldLastSymtable = deepcopy(last_symtable)
last_symtable, _ = last_symtable.merge_symbols(symtable_body, policy)
if oldLastSymtable == last_symtable:
# pop context out of implicit_checker stacks
if policy.get_vulnerability().is_implicit():
implicit_checker.pop()
break
else:
last_symtable = deepcopy(symtable_body)
symbol_table.add_missing_variables(last_symtable, last_symtable.get_variables())
return Stmt_For(init, conditions, loop, stmts)
# <--- CONST FETCH --->
elif (node_type == "Expr_ConstFetch"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
name = create_nodes(parsed_ast['name'], symbol_table, policy, implicit_checker)
return Expr_Const_Fetch(name)
# <--- BITWISE/BOOLEAN NOT --->
elif (node_type == "Expr_BitwiseNot") or (node_type == "Expr_BooleanNot"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
expr = create_nodes(parsed_ast['expr'], symbol_table, policy, implicit_checker)
return Expr_Not(expr)
# <--- POST/PRE-DEC/INC --->
elif (node_type == "Expr_PostInc") or (node_type == "Expr_PostDec") or (node_type == "Expr_PreDec") or (node_type == "Expr_PreInc"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
var = create_nodes(parsed_ast['var'], symbol_table, policy, implicit_checker)
return Inc_Dec(var)
# <--- ARRAYDIMFETCH --->
elif (node_type == "Expr_ArrayDimFetch"):
print(bcolors.OKGREEN + node_type + bcolors.ENDC)
var = create_nodes(parsed_ast['var'], symbol_table, policy, implicit_checker)
dim = create_nodes(parsed_ast['dim'], symbol_table, policy, implicit_checker)
return Expr_Array_Dim_Fetch(var, dim)
# <--- CONTINUE, COMMENTS, ... EVERYTHING ELSE --->
else: # discard the node
return None
if __name__== "__main__":
main(sys.argv, len(sys.argv))