From 3794ce2bbc0ec4e1390d5b9a12efe5d680e65660 Mon Sep 17 00:00:00 2001 From: Evan Griffiths <56087052+evangriffiths@users.noreply.github.com> Date: Thu, 19 Sep 2024 11:42:54 +0100 Subject: [PATCH] Improve matching of langfuse traces with resolved bets. Add example (#418) --- .../match_bets_with_langfuse_traces.py | 57 ++++++++++ .../tools/langfuse_client_utils.py | 107 ++++++++++++++---- 2 files changed, 139 insertions(+), 25 deletions(-) create mode 100644 examples/monitor/match_bets_with_langfuse_traces.py diff --git a/examples/monitor/match_bets_with_langfuse_traces.py b/examples/monitor/match_bets_with_langfuse_traces.py new file mode 100644 index 00000000..07c07390 --- /dev/null +++ b/examples/monitor/match_bets_with_langfuse_traces.py @@ -0,0 +1,57 @@ +from datetime import datetime + +from langfuse import Langfuse +from web3 import Web3 + +from prediction_market_agent_tooling.config import APIKeys +from prediction_market_agent_tooling.markets.data_models import ResolvedBet +from prediction_market_agent_tooling.markets.omen.omen import OmenAgentMarket +from prediction_market_agent_tooling.tools.langfuse_client_utils import ( + ProcessMarketTrace, + ResolvedBetWithTrace, + get_trace_for_bet, + get_traces_for_agent, +) + +if __name__ == "__main__": + api_keys = APIKeys() + assert api_keys.bet_from_address == Web3.to_checksum_address( + "0xe7aa88a1d044e5c987ecce55ae8d2b562a41b72d" # prophetgpt4 + ) + start_time = datetime(2024, 9, 13) + langfuse = Langfuse( + secret_key=api_keys.langfuse_secret_key.get_secret_value(), + public_key=api_keys.langfuse_public_key, + host=api_keys.langfuse_host, + ) + + traces = get_traces_for_agent( + agent_name="DeployablePredictionProphetGPT4TurboFinalAgent", + trace_name="process_market", + from_timestamp=start_time, + has_output=True, + client=langfuse, + ) + print(f"All traces: {len(traces)}") + process_market_traces = [] + for trace in traces: + if process_market_trace := ProcessMarketTrace.from_langfuse_trace(trace): + process_market_traces.append(process_market_trace) + print(f"All process_market_traces: {len(process_market_traces)}") + + bets: list[ResolvedBet] = OmenAgentMarket.get_resolved_bets_made_since( + better_address=api_keys.bet_from_address, + start_time=start_time, + end_time=None, + ) + print(f"All bets: {len(bets)}") + + # All bets should have a trace, but not all traces should have a bet + # (e.g. if all markets are deemed unpredictable), so iterate over bets + bets_with_traces: list[ResolvedBetWithTrace] = [] + for bet in bets: + trace = get_trace_for_bet(bet, process_market_traces) + if trace: + bets_with_traces.append(ResolvedBetWithTrace(bet=bet, trace=trace)) + + print(f"Matched bets with traces: {len(bets_with_traces)}") diff --git a/prediction_market_agent_tooling/tools/langfuse_client_utils.py b/prediction_market_agent_tooling/tools/langfuse_client_utils.py index 1d132424..0f99d75b 100644 --- a/prediction_market_agent_tooling/tools/langfuse_client_utils.py +++ b/prediction_market_agent_tooling/tools/langfuse_client_utils.py @@ -1,18 +1,58 @@ +import typing as t from datetime import datetime +import numpy as np from langfuse import Langfuse from langfuse.client import TraceWithDetails +from pydantic import BaseModel -from prediction_market_agent_tooling.loggers import logger from prediction_market_agent_tooling.markets.data_models import ( ProbabilisticAnswer, ResolvedBet, Trade, + TradeType, ) from prediction_market_agent_tooling.markets.omen.omen import OmenAgentMarket from prediction_market_agent_tooling.tools.utils import add_utc_timezone_validator +class ProcessMarketTrace(BaseModel): + timestamp: datetime + market: OmenAgentMarket + answer: ProbabilisticAnswer + trades: list[Trade] + + @property + def buy_trade(self) -> Trade: + buy_trades = [t for t in self.trades if t.trade_type == TradeType.BUY] + if len(buy_trades) == 1: + return buy_trades[0] + raise ValueError("No buy trade found") + + @staticmethod + def from_langfuse_trace( + trace: TraceWithDetails, + ) -> t.Optional["ProcessMarketTrace"]: + market = trace_to_omen_agent_market(trace) + answer = trace_to_answer(trace) + trades = trace_to_trades(trace) + + if not market or not answer or not trades: + return None + + return ProcessMarketTrace( + market=market, + answer=answer, + trades=trades, + timestamp=trace.timestamp, + ) + + +class ResolvedBetWithTrace(BaseModel): + bet: ResolvedBet + trace: ProcessMarketTrace + + def get_traces_for_agent( agent_name: str, trace_name: str, @@ -47,11 +87,18 @@ def get_traces_for_agent( return all_agent_traces -def trace_to_omen_agent_market(trace: TraceWithDetails) -> OmenAgentMarket: - assert trace.input is not None, "Trace input is None" - assert trace.input["args"] is not None, "Trace input args is None" +def trace_to_omen_agent_market(trace: TraceWithDetails) -> OmenAgentMarket | None: + if not trace.input: + return None + if not trace.input["args"]: + return None assert len(trace.input["args"]) == 2 and trace.input["args"][0] == "omen" - return OmenAgentMarket.model_validate(trace.input["args"][1]) + try: + # If the market model is invalid (e.g. outdated), it will raise an exception + market = OmenAgentMarket.model_validate(trace.input["args"][1]) + return market + except Exception: + return None def trace_to_answer(trace: TraceWithDetails) -> ProbabilisticAnswer: @@ -78,25 +125,35 @@ def get_closest_datetime_from_list( def get_trace_for_bet( - bet: ResolvedBet, traces: list[TraceWithDetails] -) -> TraceWithDetails | None: - # Get traces with the same market id - traces_for_bet = [ - t for t in traces if trace_to_omen_agent_market(t).id == bet.market_id - ] - - # In-case there are multiple traces for the same market, get the closest trace to the bet - closest_trace_index = get_closest_datetime_from_list( - add_utc_timezone_validator(bet.created_time), - [t.timestamp for t in traces_for_bet], - ) - # Sanity check - the trace should be after the bet - if traces_for_bet[closest_trace_index].timestamp < add_utc_timezone_validator( - bet.created_time - ): - logger.warning( - f"No trace for bet on market {bet.market_id} at time {bet.created_time} found" - ) + bet: ResolvedBet, traces: list[ProcessMarketTrace] +) -> ProcessMarketTrace | None: + # Filter for traces with the same market id + traces = [t for t in traces if t.market.id == bet.market_id] + + # Filter for traces with the same bet outcome and amount + traces_for_bet: list[ProcessMarketTrace] = [] + for t in traces: + # Cannot use exact comparison due to gas fees + if t.buy_trade.outcome == bet.outcome and np.isclose( + t.buy_trade.amount.amount, bet.amount.amount + ): + traces_for_bet.append(t) + + if not traces_for_bet: return None + elif len(traces_for_bet) == 1: + return traces_for_bet[0] + else: + # In-case there are multiple traces for the same market, get the closest + # trace to the bet + closest_trace_index = get_closest_datetime_from_list( + add_utc_timezone_validator(bet.created_time), + [t.timestamp for t in traces_for_bet], + ) + # Sanity check - the trace should be after the bet + if traces_for_bet[closest_trace_index].timestamp < add_utc_timezone_validator( + bet.created_time + ): + return None - return traces_for_bet[closest_trace_index] + return traces_for_bet[closest_trace_index]