From 7db0f2e8666abc9932d41cd88c9e57be87d5273b Mon Sep 17 00:00:00 2001 From: David Pomerenke <46022183+davidpomerenke@users.noreply.github.com> Date: Fri, 25 Oct 2024 13:26:57 +0200 Subject: [PATCH] refactor(impact): use all available data as controls --- .vscode/launch.json | 4 +- backend-python/media_impact_monitor/api.py | 2 +- backend-python/media_impact_monitor/impact.py | 102 +++++------------- .../time_series_regression.py | 96 ++++++++++------- 4 files changed, 85 insertions(+), 119 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index df6730de..72576544 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -19,8 +19,8 @@ "media_impact_monitor.api:app", "--host=0.0.0.0", "--port=8000", - "--reload", - "--reload-dir=backend-python/media_impact_monitor" + // "--reload", + // "--reload-dir=backend-python/media_impact_monitor" ], "jinja": true } diff --git a/backend-python/media_impact_monitor/api.py b/backend-python/media_impact_monitor/api.py index 7235ed34..842444b6 100644 --- a/backend-python/media_impact_monitor/api.py +++ b/backend-python/media_impact_monitor/api.py @@ -69,7 +69,7 @@ async def app_lifespan(app: FastAPI): yield -sentry_sdk.init(dsn=SENTRY_DSN, traces_sample_rate=1.0, profiles_sample_rate=1.0) +# sentry_sdk.init(dsn=SENTRY_DSN, traces_sample_rate=1.0, profiles_sample_rate=1.0) app = FastAPI(**metadata, lifespan=app_lifespan) diff --git a/backend-python/media_impact_monitor/impact.py b/backend-python/media_impact_monitor/impact.py index 6ec95bd5..3637f867 100644 --- a/backend-python/media_impact_monitor/impact.py +++ b/backend-python/media_impact_monitor/impact.py @@ -26,12 +26,13 @@ def get_impact(q: ImpactSearch) -> Impact: events = get_events( EventSearch( source="acled", - organizers=[q.organizer], + topic="climate_change", start_date=q.start_date, end_date=q.end_date, ) ) - n_event_days = events["date"].nunique() + events_from_org = events[events["organizers"].apply(lambda x: q.organizer in x)] + n_event_days = events_from_org["date"].nunique() if n_event_days < 5: return Impact( method_applicability=False, @@ -50,85 +51,32 @@ def get_impact(q: ImpactSearch) -> Impact: ], impact_estimates=None, ) - applicabilities = [] - lims_list = [] - dfs = dict() - for topic in trends.columns: - trend = trends[topic] - if trend.empty: - applicabilities.append(False) - lims_list.append([f"No media data available for {q.impacted_trend}."]) - continue - trend.name = "count" - appl, limitations, impact = get_impact_for_single_trend( - events=events, - trend=trend, - method=q.method, - aggregation=q.impacted_trend.aggregation, - ) - dfs[topic] = impact - applicabilities.append(appl) - lims_list.append(limitations) - assert len(set(applicabilities)) == 1, "All topics should have same applicability." - assert ( - len(set([str(lims) for lims in lims_list])) == 1 - ), "All topics should have same limitations." + match q.method: + case "interrupted_time_series": + raise NotImplementedError("Interrupted time series is not up to date.") + case "synthetic_control": + raise NotImplementedError("Synthetic control is not yet implemented.") + case "time_series_regression": + mean_impacts, limitations = time_series_regression( + events=events, article_counts=trends + ) + case _: + raise ValueError(f"Unsupported method: {q.method}") + org = q.organizer n_days = 7 - 1 return Impact( - method_applicability=applicabilities[0], - method_limitations=lims_list[0], + method_applicability=True, + method_limitations=[], impact_estimates={ - topic: ImpactEstimate( + topic_name: ImpactEstimate( absolute_impact=MeanWithUncertainty( - mean=impact["mean"].loc[n_days], - ci_upper=impact["ci_upper"].loc[n_days], - ci_lower=impact["ci_lower"].loc[n_days], - p_value=impact["p_value"].loc[n_days], + mean=topic_impact[org]["mean"], + ci_upper=topic_impact[org]["ci_upper"], + ci_lower=topic_impact[org]["ci_lower"], + p_value=topic_impact[org]["p_value"], ), - absolute_impact_time_series=[ - DatedMeanWithUncertainty(**d) - for d in dfs[topic].reset_index().to_dict(orient="records") - ], - # relative_impact=MeanWithUncertainty( # TODO: calculate relative impact - # mean=1.0, - # ci_upper=1.0, - # ci_lower=1.0, - # ), - # relative_impact_time_series=[], + absolute_impact_time_series=[], ) - for topic, impact in dfs.items() - } - if applicabilities[0] - else None, + for topic_name, topic_impact in mean_impacts.items() + }, ) - - -def get_impact_for_single_trend( - events: pd.DataFrame, - trend: pd.DataFrame, - method: Method, - aggregation="daily", -) -> tuple[bool, list[str], pd.DataFrame]: - hidden_days_before_protest = 7 - horizon = 28 - match method: - case "interrupted_time_series": - mean_impact, limitations = interrupted_time_series( - events=events, - article_counts=trend, - horizon=horizon, - hidden_days_before_protest=hidden_days_before_protest, - aggregation=aggregation, - ) - case "synthetic_control": - raise NotImplementedError("Synthetic control is not yet implemented.") - case "time_series_regression": - mean_impact, limitations = time_series_regression( - events=events, - article_counts=trend, - aggregation=aggregation, - ) - case _: - raise ValueError(f"Unsupported method: {method}") - return True, limitations, mean_impact - # TODO: divide impact by number of events on that day (by the same org) diff --git a/backend-python/media_impact_monitor/impact_estimators/time_series_regression.py b/backend-python/media_impact_monitor/impact_estimators/time_series_regression.py index 9f2c54c5..5abdfe2c 100644 --- a/backend-python/media_impact_monitor/impact_estimators/time_series_regression.py +++ b/backend-python/media_impact_monitor/impact_estimators/time_series_regression.py @@ -1,7 +1,8 @@ import pandas as pd import statsmodels.api as sm -from media_impact_monitor.types_ import Aggregation +from collections import Counter +from itertools import chain def add_lags(df: pd.DataFrame, lags: list[int]): @@ -39,61 +40,77 @@ def regress( ): """Get regression result where the outcome is `day` days after the treatment.""" lags = range(1, lags + 1) - media_df = pd.DataFrame(media_df, columns=["count"]) - # protest_df = add_lags(protest_df, lags=[]) - media_df = add_lags(media_df, lags=[4, 5, 6, 7, 8]) + outcomes = media_df.columns + + placebo = False + if placebo: + protest_df = protest_df.copy().sample(frac=1) + results = {} + protest_df = add_lags(protest_df, lags=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]) + media_df = add_lags(media_df, lags=[4, 5, 6, 7, 8, 9, 10, 11, 12, 13]) # protest_df = add_emws(protest_df) # media_df = add_emws(media_df, spans=[14]) df = pd.concat([protest_df, media_df], axis=1) df = add_weekday_dummies(df) - treatment = "protest" - outcome = "count" - df[outcome] = df[outcome].shift(-day) - if cumulative: - # TODO write tests for this - if day < 0: - indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=-day) - df[outcome] = -df[outcome].rolling(window=indexer).sum() - else: - df[outcome] = df[outcome].rolling(day + 1).sum() - df = df.dropna() - placebo = False - if placebo: - df[treatment] = df.sample(frac=1)[treatment].to_list() - X = df.drop(columns=[outcome]) - y = df[outcome] - model = sm.OLS(y, sm.add_constant(X)) - model = model.fit(cov_type="HC3") - alpha = 0.1 - return { - "date": day, - "mean": model.params[treatment], - "p_value": model.pvalues[treatment], - "ci_lower": model.conf_int(alpha=alpha)[0][treatment], - "ci_upper": model.conf_int(alpha=alpha)[1][treatment], - } + for outcome in outcomes: + df[outcome] = df[outcome].shift(-day) + if cumulative: + # TODO write tests for this + if day < 0: + indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=-day) + df[outcome] = -df[outcome].rolling(window=indexer).sum() + else: + df[outcome] = df[outcome].rolling(day + 1).sum() + df = df.dropna() + X = df.drop(columns=[outcome]) + y = df[outcome] + model = sm.OLS(y, sm.add_constant(X)) + model = model.fit(cov_type="HC3") + alpha = 0.1 + results[outcome] = {} + for org in protest_df.columns: + results[outcome][org] = { + "date": day, + "mean": model.params[org], + "p_value": model.pvalues[org], + "ci_lower": model.conf_int(alpha=alpha)[0][org], + "ci_upper": model.conf_int(alpha=alpha)[1][org], + } + return results -def agg_protests(df: pd.DataFrame): - start = df["date"].min() # HACK - end = df["date"].max() # HACK - df = df.groupby("date")["date"].count().to_frame("protest") +def agg_protests(events: pd.DataFrame): + start = events["date"].min() # HACK + end = events["date"].max() # HACK + # we want to reduce the amount of organizers for our simple regression + # (when using multilevel models later, we can relax this) + # only use primary organizers for each event + primary_orgs = events["organizers"].apply(lambda x: x[0] if x else None) + # only keep most frequent organizers + orgs = [k for k, v in Counter(primary_orgs).items() if k and v > 10] + # create dummy columns for each organizer + orgs_df = pd.DataFrame({org: primary_orgs == org for org in orgs}) + orgs_df["date"] = events["date"] + orgs_df = orgs_df.set_index("date") + # create time series by counting protests per day for each organizer + ts = orgs_df.groupby("date").agg({org: "any" for org in orgs}).astype(int) idx = pd.date_range(start=start, end=end) - df = df.reindex(idx).fillna(0) - return df + ts = ts.reindex(idx).fillna(0) + return ts def estimate_impact( events: pd.DataFrame, article_counts: pd.Series, - aggregation: Aggregation, cumulative: bool = True, lags: int = 14, outcome_days: list[int] = range(-14, 14), ): protest_df = agg_protests(events) - n = protest_df["protest"].sum() - limitations = [f"There have only been {n} protests."] + return regress( + protest_df, article_counts, day=7, lags=lags, cumulative=cumulative + ), [] + # TODO: do this per day: impacts = pd.DataFrame( [ regress( @@ -103,4 +120,5 @@ def estimate_impact( ] ) impacts = impacts.set_index("date")[["mean", "ci_lower", "ci_upper", "p_value"]] + limitations = [] return impacts, limitations