This repository has been archived by the owner on Aug 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmove_labels_to_datalake.py
121 lines (95 loc) · 4.65 KB
/
move_labels_to_datalake.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from concurrent.futures import ThreadPoolExecutor
import time
import itertools
import mwapi
import pyarrow as pa
import pandas as pd
import pyarrow.parquet as pq
from get_labels import load_labels
import json
import pickle
siteList = dict(pickle.load(open("data/wikimedia_sites.pickle",'rb')))
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
def get_editor_traits(labels, context, out_schema):
rev_ids = [json.loads(label)['rev_id'] for label in labels]
# special case for wikidata, esbooks
host = siteList[context]
if context == "wikidatawiki":
host= "https://wikidata.org".format(context.replace("wiki",""))
elif context == "eswikibooks":
host= "https://es.wikibooks.org".format(context.replace("wiki",""))
elif context == "eswikiquote":
host= "https://es.wikiquote.org".format(context.replace("wiki",""))
##simple wiki uses the same model as enwiki
user_agent="Ores bias analysis project by Nate TeBlunthuis <groceryheist@uw.edu>"
session = mwapi.Session(host,user_agent)
batches = grouper(rev_ids, 50)
def table_results(batch, context):
resultset = batch['query'].get('pages', [])
if resultset == []:
print("no pages found in batch for {0}".format(context))
import pdb; pdb.set_trace()
for _, page_id in enumerate(resultset):
row = {}
result = resultset[page_id]
row['wiki'] = context
row['ns'] = result['ns']
row['title'] = result['title']
row['pageid'] = int(result['pageid'])
revisions = result['revisions']
for rev in revisions:
row['revid'] = int(rev['revid'])
row['parentid'] = int(rev['parentid'])
# there are some deleted revisions where we don't get to know the user, let's just exclude them.
if 'user' in rev:
row['user'] = rev['user']
row['userid'] = int(rev['userid'])
yield row
def keep_trying(call, *args, **kwargs):
try:
result = call(*args, **kwargs)
return result
except Exception as e:
print(e)
time.sleep(1)
return keep_trying(call, *args, **kwargs)
with ThreadPoolExecutor(100) as executor:
# get revision metadata
revision_batches = executor.map(lambda batch:keep_trying(call=session.get, action="query", prop='revisions', rvprop=['ids','user','userid'], revids=batch), batches)
badrevids = []
rows = []
for batch in revision_batches:
if 'badrevids' in batch['query']:
badrevids.append(batch['query']['badrevids'])
for row in table_results(batch, context):
yield row
def move_labels_to_datalake(label_files, wikis):
fs = pa.hdfs.connect(host='an-coord1001.eqiad.wmnet', port=10000)
fs = fs.connect()
parquet_path = "/user/nathante/ores_bias_data/ores_label_editors"
if fs.exists(parquet_path):
fs.rm(parquet_path, recursive=True)
out_schema = ['wiki', 'ns','pageid','title','revid','parentid','user','userid']
print("collecting userids")
for label_file, context in zip(label_files, wikis):
if label_file is not None:
labels = load_labels(label_file)
rows = get_editor_traits(labels,context, out_schema)
pddf = pd.DataFrame(rows)
pddf.to_pickle("ores_label_editors.pickle")
out_table = pa.Table.from_pandas(pddf)
pq.write_to_dataset(out_table, root_path=parquet_path, partition_cols=['wiki'], filesystem=fs, flavor='spark')
print ("pushed labels for {0}".format(context))
## conn.close()
# query = """INSERT INTO nathante.ores_label_editors PARTITION (wiki='{0}') """.format(context)
# query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
# cursor.executemany(query, list(tuple(r) for r in rows))
# conn = connect(host='an-coord1001.eqiad.wmnet', port=10000, auth_mechanism='PLAIN')
# cursor = conn.cursor()
# cursor.execute("DROP TABLE nathante.ores_label_editors")
# cursor.execute("CREATE EXTERNAL TABLE nathante.ores_label_editors(ns string, pageid bigint, title string, revid bigint, parentid bigint, user string, userid bigint) PARTITIONED BY (wiki string) STORED AS PARQUET LOCATION '/user/nathante/ores_bias/nathante.ores_label_editors' ")
# cursor.execute("SET hive.exec.dynamic.partition.mode=nonstrict")