forked from csmith-project/csmith
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hypothesiscsmith.py
172 lines (145 loc) · 5.24 KB
/
hypothesiscsmith.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import os
import subprocess
import tempfile
import sys
import shutil
import struct
import click
from hypothesis.strategies._internal.strategies import SearchStrategy
from hypothesis.strategies._internal.core import defines_strategy
from hypothesis.internal.conjecture.data import ConjectureData
from hypothesis.errors import InvalidArgument
from threading import RLock
HERE = os.path.dirname(__file__)
CSMITH = os.path.join(HERE, "src", "csmith")
class CsmithState(object):
def __init__(self, data):
self.__data = data
self.__write_buffer = bytearray([0] * 4)
self.__tempdir = None
self.__proc = self.__pipeout = self.__pipein = None
def write_result(self, n):
if self.__pipeout is None:
self.__pipeout = open(self.__result_channel, "wb")
out = self.__pipeout
buf = self.__write_buffer
struct.pack_into(">L", buf, 0, n)
out.write(buf)
out.flush()
def ack(self):
self.write_result(0)
def read_command(self):
if self.__pipein is None:
self.__pipein = open(self.__command_channel, "rb")
pin = self.__pipein
while True:
c = pin.read(1)
if not c:
continue
n = c[0]
return pin.read(n).decode("ascii")
def cleanup_process(self):
if self.__proc is not None:
try:
self.__proc.wait(timeout=1)
except subprocess.TimeoutExpired:
self.__proc.kill()
self.__proc.wait(timeout=1)
if self.__proc.returncode != 0:
with open(self.__errfile) as i:
raise Exception(
f"Subprocess call terminated abnormally with return "
f"code {self.__proc.returncode} and output {repr(i.read())}"
)
def gen(self):
assert self.__tempdir is None
try:
self.__tempdir = tempfile.mkdtemp()
env = dict(os.environ)
self.__command_channel = os.path.join(
self.__tempdir, "hypothesisfifo.commands"
)
self.__result_channel = os.path.join(
self.__tempdir, "hypothesisfifo.results"
)
env["HYPOTHESISFIFOCOMMANDS"] = self.__command_channel
env["HYPOTHESISFIFORESULTS"] = self.__result_channel
os.mkfifo(self.__result_channel)
os.mkfifo(self.__command_channel)
output_name = os.path.join(self.__tempdir, "gen.c")
self.__errfile = os.path.join(self.__tempdir, "stderr")
with open(self.__errfile, "w") as o:
self.__proc = subprocess.Popen(
[CSMITH, "-o", output_name],
env=env,
stdout=o,
stderr=o,
)
while True:
line = self.read_command()
if line == "TERMINATE":
self.ack()
self.cleanup_process()
break
elif line == "RAND":
value = self.__data.draw_bits(31)
self.write_result(value)
elif line.startswith("START "):
_, label = line.split()
self.__data.start_example(label.strip())
self.ack()
elif line == "END":
self.__data.stop_example()
self.ack()
# Terminated improperly
elif not line:
self.cleanup_process()
assert (
False
), "Improper response from subprocess that terminated normally"
break
else:
raise Exception("Unknown command %r" % (line,))
with open(output_name) as i:
return i.read()
finally:
for f in (self.__pipeout, self.__pipein):
if f is not None:
f.close()
if self.__proc is not None:
self.__proc.kill()
self.__pipeout = self.__pipein = None
if self.__tempdir is not None:
shutil.rmtree(self.__tempdir)
class CsmithStrategy(SearchStrategy):
def do_draw(self, data):
return CsmithState(data).gen()
INSTALL_LOCK = RLock()
@defines_strategy
def csmith():
"""A strategy for generating C programs, using Csmith."""
if not os.path.exists(CSMITH):
try:
INSTALL_LOCK.acquire()
if not os.path.exists(CSMITH):
subprocess.check_call(["./configure"], cwd=HERE)
subprocess.check_call(["make"], cwd=HERE)
finally:
INSTALL_LOCK.release()
assert os.path.exists(CSMITH)
return CsmithStrategy()
@click.group()
def main():
"""Commands for working with Hypothesis generated Csmith
programs."""
pass
@main.command()
@click.argument("filename")
def show(filename):
"""Show the generated program associated with a previous Hypothesis
representation of it."""
with open(filename, "rb") as i:
data = ConjectureData.for_buffer(i.read())
print(data.draw(csmith()))
if __name__ == "__main__":
main()