-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhtextract
executable file
·109 lines (94 loc) · 3.28 KB
/
htextract
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/python
import argparse
import sys
import os
import os.path
import re
import StringIO
import tarfile
import warnings
import time
import imp
import tempfile
from dask.distributed import Client
from contextlib import closing
from collections import Counter
import simplejson
import htpar
def make_extractor(key):
return lambda x: x.get(key, None)
def process_paths(iname, opts):
"""Process a tar input file to a tar output file."""
storage = htpar.GenericStorage()
keys = htpar.get_keyfun(opts["keys"])
assert callable(keys)
if opts.get("getkeys"):
extractor = lambda x: tuple(sorted(x.keys()))
elif "fields" in opts:
extractor = lambda x: tuple(x.get(f) for f in opts["fields"].split(","))
istream = storage.open_read(iname)
assert istream is not None, iname
reader = htpar.tarrecords(istream, keys=keys)
result = []
for record in reader:
try:
value = extractor(record)
except:
value = None
result.append(value)
if opts["counter"] > 0:
return Counter(result)
else:
return result
if __name__=="__main__":
parser = argparse.ArgumentParser("Mapping sharded tar files.")
parser.add_argument("-c", "--counter", type=int, default=-1,
help="produce summary statistics for the fields")
parser.add_argument("-K", "--getkeys", action="store_true",
help="get list of keys instead of fields")
parser.add_argument("-f", "--fields", default="__key__",
help="select fields to extract (default: __key__)")
parser.add_argument("-d", "--dask", default=None,
help="dask-scheduler address")
parser.add_argument("-k", "--keys", default="base_plus_ext",
help="sample key extraction function")
parser.add_argument("-F", "--format", default=None,
help="output format")
parser.add_argument("inputs")
args = parser.parse_args()
## Compute the set of paths to iterate over.
ifmt, ninputs = htpar.split_sharded_path(args.inputs)
if ninputs is None:
inputs = [ifmt]
else:
inputs = [ifmt % i for i in range(ninputs)]
## Map over the path pairs in parallel.
client = Client(args.dask)
print client
opts = dict(keys=args.keys, fields=args.fields, getkeys=args.getkeys,
counter=args.counter)
results = client.map(process_paths, inputs, [opts]*len(inputs))
if args.counter > 0:
summary = Counter()
for result in client.gather(results):
summary.update(result)
for k, v in summary.most_common(args.counter):
print k, v
sys.exit(0)
if args.format is None:
for result in client.gather(results):
for item in result:
if isinstance(item, list):
item = "\t".join(item)
print item
sys.exit(0)
if args.format=="repr":
for result in client.gather(results):
for item in result:
print repr(item)
sys.exit(0)
if args.format=="json":
for result in client.gather(results):
for item in result:
print simplejson.dumps(item, indent=4)
sys.exit(0)