-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhtcshard
executable file
·110 lines (96 loc) · 2.77 KB
/
htcshard
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/python
import argparse
import sys
import os
import os.path
import re
import StringIO
import tarfile
import warnings
import time
import imp
import tempfile
from dask.distributed import Client
from contextlib import closing
import htpar
import hashlib
def default_hash(obj, nbuckets, seed=0):
digest = hashlib.md5()
digest.update(str(seed))
digest.update(obj)
raw = int(digest.hexdigest()[:7], 16)
del digest
return raw % nbuckets
def load_mod(modname, source):
import imp
import tempfile
imp.acquire_lock()
if modname in sys.modules:
del sys.modules[modname]
try:
with tempfile.NamedTemporaryFile(suffix=".py") as stream:
stream.write(source)
stream.flush()
return str(imp.load_source(modname, stream.name))
finally:
imp.release_lock()
def read_file(fname):
with open(fname) as stream:
return stream.read()
def input_records(url):
for shard in htpar.path_shards(url):
for record in htpar.tarrecords(shard):
yield record
def process(record):
import platform
start = time.time()
import htcshard_mod
try:
results = htcshard_mod.process(record)
except Exception, e:
e.record = record
return e
stop = time.time()
if not isinstance(results, list):
results = [results]
for result in results:
result["__time__"] = stop - start
result["__worker__"] = "{}:{}".format(platform.node(), os.getpid())
return results
default_source = """
def process(record):
return record
"""
if __name__=="__main__":
parser = argparse.ArgumentParser("Mapping sharded tar files.")
parser.add_argument("-p", "--process", default=None)
parser.add_argument("-d", "--dask", default="localhost:8786")
parser.add_argument("inputs")
parser.add_argument("outputs")
args = parser.parse_args()
client = Client(args.dask)
print client
output_shards = htpar.TarShards(args.outputs)
if args.process is not None:
source = read_file(args.process)
else:
source = default_source
load_mod("htcshard_mod", source)
import htcshard_mod
print client.run(load_mod, "htcshard_mod", source)
failures = []
results = client.map(process, input_records(args.inputs))
count = 0
for fresult in results:
outputs = fresult.result()
if isinstance(outputs, Exception):
print repr(outputs)
continue
if not isinstance(outputs, list):
outputs = [outputs]
for record in outputs:
size, shard = output_shards.write(record)
count += 1
key = record.get("__key__")
print key, size, shard,
print record["__time__"], record["__worker__"]