Skip to content

Commit

Permalink
Merge pull request #227 from SocialChangeLab/feature-improve-stats
Browse files Browse the repository at this point in the history
refactor(impact): use all available data as controls
  • Loading branch information
davidpomerenke authored Oct 25, 2024
2 parents 53f2000 + 7db0f2e commit 7baf7b6
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 119 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
"media_impact_monitor.api:app",
"--host=0.0.0.0",
"--port=8000",
"--reload",
"--reload-dir=backend-python/media_impact_monitor"
// "--reload",
// "--reload-dir=backend-python/media_impact_monitor"
],
"jinja": true
}
Expand Down
2 changes: 1 addition & 1 deletion backend-python/media_impact_monitor/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def app_lifespan(app: FastAPI):
yield


sentry_sdk.init(dsn=SENTRY_DSN, traces_sample_rate=1.0, profiles_sample_rate=1.0)
# sentry_sdk.init(dsn=SENTRY_DSN, traces_sample_rate=1.0, profiles_sample_rate=1.0)


app = FastAPI(**metadata, lifespan=app_lifespan)
Expand Down
102 changes: 25 additions & 77 deletions backend-python/media_impact_monitor/impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ def get_impact(q: ImpactSearch) -> Impact:
events = get_events(
EventSearch(
source="acled",
organizers=[q.organizer],
topic="climate_change",
start_date=q.start_date,
end_date=q.end_date,
)
)
n_event_days = events["date"].nunique()
events_from_org = events[events["organizers"].apply(lambda x: q.organizer in x)]
n_event_days = events_from_org["date"].nunique()
if n_event_days < 5:
return Impact(
method_applicability=False,
Expand All @@ -50,85 +51,32 @@ def get_impact(q: ImpactSearch) -> Impact:
],
impact_estimates=None,
)
applicabilities = []
lims_list = []
dfs = dict()
for topic in trends.columns:
trend = trends[topic]
if trend.empty:
applicabilities.append(False)
lims_list.append([f"No media data available for {q.impacted_trend}."])
continue
trend.name = "count"
appl, limitations, impact = get_impact_for_single_trend(
events=events,
trend=trend,
method=q.method,
aggregation=q.impacted_trend.aggregation,
)
dfs[topic] = impact
applicabilities.append(appl)
lims_list.append(limitations)
assert len(set(applicabilities)) == 1, "All topics should have same applicability."
assert (
len(set([str(lims) for lims in lims_list])) == 1
), "All topics should have same limitations."
match q.method:
case "interrupted_time_series":
raise NotImplementedError("Interrupted time series is not up to date.")
case "synthetic_control":
raise NotImplementedError("Synthetic control is not yet implemented.")
case "time_series_regression":
mean_impacts, limitations = time_series_regression(
events=events, article_counts=trends
)
case _:
raise ValueError(f"Unsupported method: {q.method}")
org = q.organizer
n_days = 7 - 1
return Impact(
method_applicability=applicabilities[0],
method_limitations=lims_list[0],
method_applicability=True,
method_limitations=[],
impact_estimates={
topic: ImpactEstimate(
topic_name: ImpactEstimate(
absolute_impact=MeanWithUncertainty(
mean=impact["mean"].loc[n_days],
ci_upper=impact["ci_upper"].loc[n_days],
ci_lower=impact["ci_lower"].loc[n_days],
p_value=impact["p_value"].loc[n_days],
mean=topic_impact[org]["mean"],
ci_upper=topic_impact[org]["ci_upper"],
ci_lower=topic_impact[org]["ci_lower"],
p_value=topic_impact[org]["p_value"],
),
absolute_impact_time_series=[
DatedMeanWithUncertainty(**d)
for d in dfs[topic].reset_index().to_dict(orient="records")
],
# relative_impact=MeanWithUncertainty( # TODO: calculate relative impact
# mean=1.0,
# ci_upper=1.0,
# ci_lower=1.0,
# ),
# relative_impact_time_series=[],
absolute_impact_time_series=[],
)
for topic, impact in dfs.items()
}
if applicabilities[0]
else None,
for topic_name, topic_impact in mean_impacts.items()
},
)


def get_impact_for_single_trend(
events: pd.DataFrame,
trend: pd.DataFrame,
method: Method,
aggregation="daily",
) -> tuple[bool, list[str], pd.DataFrame]:
hidden_days_before_protest = 7
horizon = 28
match method:
case "interrupted_time_series":
mean_impact, limitations = interrupted_time_series(
events=events,
article_counts=trend,
horizon=horizon,
hidden_days_before_protest=hidden_days_before_protest,
aggregation=aggregation,
)
case "synthetic_control":
raise NotImplementedError("Synthetic control is not yet implemented.")
case "time_series_regression":
mean_impact, limitations = time_series_regression(
events=events,
article_counts=trend,
aggregation=aggregation,
)
case _:
raise ValueError(f"Unsupported method: {method}")
return True, limitations, mean_impact
# TODO: divide impact by number of events on that day (by the same org)
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pandas as pd
import statsmodels.api as sm

from media_impact_monitor.types_ import Aggregation
from collections import Counter
from itertools import chain


def add_lags(df: pd.DataFrame, lags: list[int]):
Expand Down Expand Up @@ -39,61 +40,77 @@ def regress(
):
"""Get regression result where the outcome is `day` days after the treatment."""
lags = range(1, lags + 1)
media_df = pd.DataFrame(media_df, columns=["count"])
# protest_df = add_lags(protest_df, lags=[])
media_df = add_lags(media_df, lags=[4, 5, 6, 7, 8])
outcomes = media_df.columns

placebo = False
if placebo:
protest_df = protest_df.copy().sample(frac=1)
results = {}
protest_df = add_lags(protest_df, lags=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13])
media_df = add_lags(media_df, lags=[4, 5, 6, 7, 8, 9, 10, 11, 12, 13])
# protest_df = add_emws(protest_df)
# media_df = add_emws(media_df, spans=[14])
df = pd.concat([protest_df, media_df], axis=1)
df = add_weekday_dummies(df)
treatment = "protest"
outcome = "count"
df[outcome] = df[outcome].shift(-day)
if cumulative:
# TODO write tests for this
if day < 0:
indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=-day)
df[outcome] = -df[outcome].rolling(window=indexer).sum()
else:
df[outcome] = df[outcome].rolling(day + 1).sum()
df = df.dropna()
placebo = False
if placebo:
df[treatment] = df.sample(frac=1)[treatment].to_list()
X = df.drop(columns=[outcome])
y = df[outcome]
model = sm.OLS(y, sm.add_constant(X))
model = model.fit(cov_type="HC3")
alpha = 0.1
return {
"date": day,
"mean": model.params[treatment],
"p_value": model.pvalues[treatment],
"ci_lower": model.conf_int(alpha=alpha)[0][treatment],
"ci_upper": model.conf_int(alpha=alpha)[1][treatment],
}
for outcome in outcomes:
df[outcome] = df[outcome].shift(-day)
if cumulative:
# TODO write tests for this
if day < 0:
indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=-day)
df[outcome] = -df[outcome].rolling(window=indexer).sum()
else:
df[outcome] = df[outcome].rolling(day + 1).sum()
df = df.dropna()
X = df.drop(columns=[outcome])
y = df[outcome]
model = sm.OLS(y, sm.add_constant(X))
model = model.fit(cov_type="HC3")
alpha = 0.1
results[outcome] = {}
for org in protest_df.columns:
results[outcome][org] = {
"date": day,
"mean": model.params[org],
"p_value": model.pvalues[org],
"ci_lower": model.conf_int(alpha=alpha)[0][org],
"ci_upper": model.conf_int(alpha=alpha)[1][org],
}
return results


def agg_protests(df: pd.DataFrame):
start = df["date"].min() # HACK
end = df["date"].max() # HACK
df = df.groupby("date")["date"].count().to_frame("protest")
def agg_protests(events: pd.DataFrame):
start = events["date"].min() # HACK
end = events["date"].max() # HACK
# we want to reduce the amount of organizers for our simple regression
# (when using multilevel models later, we can relax this)
# only use primary organizers for each event
primary_orgs = events["organizers"].apply(lambda x: x[0] if x else None)
# only keep most frequent organizers
orgs = [k for k, v in Counter(primary_orgs).items() if k and v > 10]
# create dummy columns for each organizer
orgs_df = pd.DataFrame({org: primary_orgs == org for org in orgs})
orgs_df["date"] = events["date"]
orgs_df = orgs_df.set_index("date")
# create time series by counting protests per day for each organizer
ts = orgs_df.groupby("date").agg({org: "any" for org in orgs}).astype(int)
idx = pd.date_range(start=start, end=end)
df = df.reindex(idx).fillna(0)
return df
ts = ts.reindex(idx).fillna(0)
return ts


def estimate_impact(
events: pd.DataFrame,
article_counts: pd.Series,
aggregation: Aggregation,
cumulative: bool = True,
lags: int = 14,
outcome_days: list[int] = range(-14, 14),
):
protest_df = agg_protests(events)
n = protest_df["protest"].sum()
limitations = [f"There have only been {n} protests."]
return regress(
protest_df, article_counts, day=7, lags=lags, cumulative=cumulative
), []
# TODO: do this per day:
impacts = pd.DataFrame(
[
regress(
Expand All @@ -103,4 +120,5 @@ def estimate_impact(
]
)
impacts = impacts.set_index("date")[["mean", "ci_lower", "ci_upper", "p_value"]]
limitations = []
return impacts, limitations

0 comments on commit 7baf7b6

Please sign in to comment.