This repository has been archived by the owner on Aug 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_label_user_histories.py
executable file
·76 lines (58 loc) · 3.02 KB
/
get_label_user_histories.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# run through spark-submit
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext(conf=SparkConf())
spark = SparkSession(sc)
import pyspark.sql.functions as f
reader = spark.read
# ok we can get the table
label_editors = reader.parquet("/user/nathante/ores_bias_data/ores_label_editors")
# get the edit history
mw_hist = reader.table("wmf.mediawiki_history")
mw_hist = mw_hist.filter(f.col("snapshot") == "2019-03")
mw_hist = mw_hist.filter(f.col("event_entity") == "revision")
mw_hist = mw_hist.select(
["revision_id", "event_timestamp", "event_user_id", "wiki_db"])
mw_hist = mw_hist.withColumn("timestamp",
f.from_utc_timestamp(
f.col("event_timestamp"), tz="utc"))
# we want to identify newcomers and anons
# anons are just the folks without a userid
label_editors = label_editors.withColumn("is_anon", (f.col('userid') == 0))
# newcomers are not anons
non_anons = label_editors.filter(f.col("is_anon") == False)
# find the edits by the editors
edit_histories = non_anons.join(mw_hist,
on=[non_anons.userid == mw_hist.event_user_id,
non_anons.wiki == mw_hist.wiki_db,
non_anons.revid >= mw_hist.revision_id])
# group by wiki and user
gb = edit_histories.groupBy(['wiki', 'user'])
gb = gb.agg(f.min("timestamp").alias("time_first_edit"),
f.max("timestamp").alias("time_last_edit"),
f.count("revision_id").alias("N_edits"))
gb = gb.cache()
#sqlContext.registerFunction("time_delta", lambda y,x:(datetime.strptime(y, '%Y-%m-%d %H:%M:%S.%f')-datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f')).total_seconds())
gb = gb.withColumn("time_since_first_edit_s", f.unix_timestamp(
f.col("time_last_edit")) - f.unix_timestamp(f.col("time_first_edit")))
gb = gb.withColumn("days_since_first_edit",
f.col("time_since_first_edit_s") / 60 / 60 / 24)
# count the number of prior edits before
gb = gb.withColumn("is_newcomer", (f.col("N_edits") <= 5)
| (f.col("days_since_first_edit") < 30))
gb = gb.withColumnRenamed("wiki", "lwiki")
label_editors = label_editors.join(gb,
on=[label_editors.user == gb.user,
label_editors.wiki == gb.lwiki],
how='left_outer')
label_editors = label_editors.drop("lwiki")
label_editors = label_editors.withColumn("is_newcomer_2",
f.when(f.isnull(
f.col("is_newcomer")),
False).otherwise(
f.col("is_newcomer")))
label_editors = label_editors.drop("is_newcomer")
label_editors = label_editors.withColumnRenamed("is_newcomer_2", "is_newcomer")
pddf = label_editors.toPandas()
pddf.to_pickle("data/labeled_newcomers_anons.pickle")
pddf.to_csv("data/labeled_newcomers_anons.tsv", sep='\t')