diff --git a/common/options_analysis.py b/common/options_analysis.py index a647e7e..a88a4ff 100644 --- a/common/options_analysis.py +++ b/common/options_analysis.py @@ -1,15 +1,116 @@ import logging import sqlite3 +from dataclasses import dataclass, field +from datetime import date +from enum import Enum +from typing import List, Optional import pandas as pd +class ContractType(Enum): + CALL = "Call" + PUT = "Put" + + +class PositionType(Enum): + LONG = "Long" + SHORT = "Short" + + +class LegType(Enum): + TRADE_OPEN = "TradeOpen" + TRADE_AUDIT = "TradeAudit" + + +@dataclass +class Leg: + """Represents a single leg of a trade (call or put).""" + + contract_type: ContractType + position_type: PositionType + strike_price: float + underlying_price_open: float + premium_open: float = field(init=True) + underlying_price_current: Optional[float] = None + premium_current: Optional[float] = field(default=None) + leg_type: LegType = LegType.TRADE_OPEN + + def __post_init__(self): + # Convert premiums after initialization + self.premium_open = ( + -abs(self.premium_open) + if self.position_type == PositionType.LONG + else abs(self.premium_open) + ) + if self.premium_current is not None: + self.premium_current = ( + -abs(self.premium_current) + if self.position_type == PositionType.LONG + else abs(self.premium_current) + ) + + def __str__(self): + leg_str = ( + f"\n {self.position_type.value} {self.contract_type.value}" + f"\n Strike: ${self.strike_price:,.2f}" + f"\n Underlying Open: ${self.underlying_price_open:,.2f}" + f"\n Premium Open: ${self.premium_open:,.2f}" + ) + if self.underlying_price_current is not None: + leg_str += ( + f"\n Underlying Current: ${self.underlying_price_current:,.2f}" + ) + if self.premium_current is not None: + leg_str += f"\n Premium Current: ${self.premium_current:,.2f}" + return leg_str + + +@dataclass +class Trade: + """Represents a trade.""" + + trade_date: date + expire_date: date + dte: int + status: str + premium_captured: float + closing_premium: Optional[float] = None + closed_trade_at: Optional[date] = None + close_reason: Optional[str] = None + legs: List[Leg] = field(default_factory=list) + + def __str__(self): + trade_str = ( + f"Trade Details:" + f"\n Open Date: {self.trade_date}" + f"\n Expire Date: {self.expire_date}" + f"\n DTE: {self.dte}" + f"\n Status: {self.status}" + f"\n Premium Captured: ${self.premium_captured:,.2f}" + ) + + if self.closing_premium is not None: + trade_str += f"\n Closing Premium: ${self.closing_premium:,.2f}" + if self.closed_trade_at is not None: + trade_str += f"\n Closed At: {self.closed_trade_at}" + if self.close_reason is not None: + trade_str += f"\n Close Reason: {self.close_reason}" + + trade_str += "\n Legs:" + for leg in self.legs: + trade_str += str(leg) + + return trade_str + + class OptionsDatabase: def __init__(self, db_path, table_tag): self.db_path = db_path self.conn = None self.cursor = None self.trades_table = f"trades_dte_{table_tag}" + self.trade_legs_table = f"trade_legs_dte_{table_tag}" self.trade_history_table = f"trade_history_dte_{table_tag}" def connect(self): @@ -23,6 +124,7 @@ def setup_trades_table(self): # Drop existing tables (trade_history first due to foreign key constraint) drop_tables_sql = [ f"DROP TABLE IF EXISTS {self.trade_history_table}", + f"DROP TABLE IF EXISTS {self.trade_legs_table}", f"DROP TABLE IF EXISTS {self.trades_table}", ] @@ -51,6 +153,23 @@ def setup_trades_table(self): CloseReason TEXT ) """ + # Create trade legs table + create_trade_legs_table_sql = f""" + CREATE TABLE IF NOT EXISTS {self.trade_legs_table} ( + HistoryId INTEGER PRIMARY KEY, + TradeId INTEGER, + Date DATE, + StrikePrice REAL, + ContractType TEXT, + PositionType TEXT, + LegType TEXT, + PremiumOpen REAL, + PremiumCurrent REAL, + UnderlyingPriceOpen REAL, + UnderlyingPriceCurrent REAL, + FOREIGN KEY(TradeId) REFERENCES {self.trades_table}(TradeId) + ) + """ # Create trade_history table to track daily prices create_history_table_sql = f""" CREATE TABLE IF NOT EXISTS {self.trade_history_table} ( @@ -64,6 +183,7 @@ def setup_trades_table(self): ) """ self.cursor.execute(create_table_sql) + self.cursor.execute(create_trade_legs_table_sql) self.cursor.execute(create_history_table_sql) logging.info("Tables dropped and recreated successfully") @@ -81,6 +201,157 @@ def setup_trades_table(self): self.conn.commit() + def update_trade_leg(self, existing_trade_id, quote_date, updated_leg: Leg): + update_leg_sql = f""" + INSERT INTO {self.trade_legs_table} ( + TradeId, Date, StrikePrice, ContractType, PositionType, LegType, + PremiumOpen, PremiumCurrent, UnderlyingPriceOpen, UnderlyingPriceCurrent + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + + params = ( + existing_trade_id, + quote_date, + updated_leg.strike_price, + updated_leg.contract_type.value, + updated_leg.position_type.value, + updated_leg.leg_type.value, + updated_leg.premium_open, + updated_leg.premium_current, + updated_leg.underlying_price_open, + updated_leg.underlying_price_current, + ) + + self.cursor.execute(update_leg_sql, params) + self.conn.commit() + + def create_trade_with_multiple_legs(self, trade): + trade_sql = f""" + INSERT INTO {self.trades_table} ( + Date, ExpireDate, DTE, Status, PremiumCaptured, + ClosingPremium, ClosedTradeAt, CloseReason + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """ + trade_params = ( + trade.trade_date, + trade.expire_date, + trade.dte, + trade.status, + trade.premium_captured, + trade.closing_premium, + trade.closed_trade_at, + trade.close_reason, + ) + + self.cursor.execute(trade_sql, trade_params) + trade_id = self.cursor.lastrowid + + leg_sql = f""" + INSERT INTO {self.trade_legs_table} ( + TradeId, Date, StrikePrice, ContractType, PositionType, LegType, + PremiumOpen, PremiumCurrent, UnderlyingPriceOpen, UnderlyingPriceCurrent + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + + for leg in trade.legs: + leg_params = ( + trade_id, + trade.trade_date, + leg.strike_price, + leg.contract_type.value, + leg.position_type.value, + leg.leg_type.value, + leg.premium_open, + leg.premium_current, + leg.underlying_price_open, + leg.underlying_price_current, + ) + self.cursor.execute(leg_sql, leg_params) + + self.conn.commit() + return trade_id + + def load_trade_with_multiple_legs( + self, trade_id: int, leg_type: LegType = LegType.TRADE_OPEN + ) -> Trade: + # First get the trade + trade_sql = f""" + SELECT Date, ExpireDate, DTE, Status, PremiumCaptured, + ClosingPremium, ClosedTradeAt, CloseReason + FROM {self.trades_table} WHERE TradeId = ? + """ + self.cursor.execute(trade_sql, (trade_id,)) + columns = [description[0] for description in self.cursor.description] + trade_row = dict(zip(columns, self.cursor.fetchone())) + + if not trade_row: + raise ValueError(f"Trade with id {trade_id} not found") + + # Then get all legs for this trade + legs_sql = f""" + SELECT StrikePrice, ContractType, PositionType, PremiumOpen, + PremiumCurrent, UnderlyingPriceOpen, UnderlyingPriceCurrent + FROM {self.trade_legs_table} WHERE TradeId = ? AND LegType = ? + """ + self.cursor.execute( + legs_sql, + ( + trade_id, + leg_type.value, + ), + ) + columns = [description[0] for description in self.cursor.description] + leg_rows = [dict(zip(columns, row)) for row in self.cursor.fetchall()] + + # Create legs + legs = [] + for leg_row in leg_rows: + leg = Leg( + contract_type=ContractType(leg_row["ContractType"]), + position_type=PositionType(leg_row["PositionType"]), + strike_price=leg_row["StrikePrice"], + underlying_price_open=leg_row["UnderlyingPriceOpen"], + premium_open=leg_row["PremiumOpen"], + underlying_price_current=leg_row["UnderlyingPriceCurrent"], + premium_current=leg_row["PremiumCurrent"], + ) + legs.append(leg) + + # Create and return trade + return Trade( + trade_date=trade_row["Date"], + expire_date=trade_row["ExpireDate"], + dte=trade_row["DTE"], + status=trade_row["Status"], + premium_captured=trade_row["PremiumCaptured"], + closing_premium=trade_row["ClosingPremium"], + closed_trade_at=trade_row["ClosedTradeAt"], + close_reason=trade_row["CloseReason"], + legs=legs, + ) + + def close_trade(self, existing_trade_id, existing_trade: Trade): + # Update the trade record + update_trade_sql = f""" + UPDATE {self.trades_table} + SET Status = ?, + ClosingPremium = ?, + ClosedTradeAt = ?, + CloseReason = ? + WHERE TradeId = ? + """ + + trade_params = ( + "CLOSED", + existing_trade.closing_premium, + existing_trade.closed_trade_at, + existing_trade.close_reason, + existing_trade_id, + ) + + self.cursor.execute(update_trade_sql, trade_params) + self.conn.commit() + def create_trade( self, date, diff --git a/options-calendar-simple.py b/options-calendar-simple.py index 4d4ac88..33c7a6c 100755 --- a/options-calendar-simple.py +++ b/options-calendar-simple.py @@ -24,7 +24,14 @@ import pandas as pd from common.logger import setup_logging -from common.options_analysis import OptionsDatabase +from common.options_analysis import ( + ContractType, + Leg, + LegType, + OptionsDatabase, + PositionType, + Trade, +) pd.set_option("display.float_format", lambda x: "%.4f" % x) @@ -34,37 +41,40 @@ def update_open_trades(db, quote_date): open_trades = db.get_open_trades() for _, trade in open_trades.iterrows(): - # Get current prices - underlying_price, call_price, put_price = db.get_current_prices( - quote_date, trade["StrikePrice"], trade["ExpireDate"] - ) - - # Add to trade history - db.add_trade_history( - trade["TradeId"], quote_date, underlying_price, call_price, put_price - ) - - # Only if unable to find the latest prices in the data - if underlying_price == 0: - close_reason = "Invalid Close" - else: - close_reason = "Option Expired" + existing_trade_id = trade["TradeId"] + existing_trade = db.load_trade_with_multiple_legs(existing_trade_id) + updated_legs = [] + for leg in existing_trade.legs: + underlying_price, call_price, put_price = db.get_current_prices( + quote_date, leg.strike_price, trade["ExpireDate"] + ) + updated_leg = Leg( + contract_type=leg.contract_type, + position_type=leg.position_type, + strike_price=leg.strike_price, + underlying_price_open=leg.underlying_price_open, + premium_open=leg.premium_open, + underlying_price_current=underlying_price, + premium_current=put_price, + leg_type=LegType.TRADE_AUDIT, + ) + updated_legs.append(updated_leg) + db.update_trade_leg(existing_trade_id, quote_date, updated_leg) # If trade has reached expiry date, close it if quote_date >= trade["ExpireDate"]: logging.info( f"Trying to close trade {trade['TradeId']} at expiry {quote_date}" ) - db.update_trade_status( - trade["TradeId"], - underlying_price, - call_price, - put_price, - quote_date, - "EXPIRED", - close_reason=close_reason, + existing_trade.closing_premium = sum( + l.premium_current for l in updated_legs + ) + existing_trade.closed_trade_at = quote_date + existing_trade.close_reason = "EXPIRED" + db.close_trade(existing_trade_id, existing_trade) + logging.info( + f"Closed trade {trade['TradeId']} with {existing_trade.closing_premium} at expiry" ) - logging.info(f"Closed trade {trade['TradeId']} at expiry") else: logging.info( f"Trade {trade['TradeId']} still open as {quote_date} < {trade['ExpireDate']}" @@ -146,7 +156,9 @@ def parse_args(): def main(args): - table_tag = f"{args.front_dte}_{args.back_dte}" + front_dte = args.front_dte + back_dte = args.back_dte + table_tag = f"{front_dte}_{back_dte}" db = OptionsDatabase(args.db_path, table_tag) db.connect() @@ -157,6 +169,99 @@ def main(args): for quote_date in quote_dates: logging.info(f"Processing {quote_date}") + update_open_trades(db, quote_date) + + # Check if maximum number of open trades has been reached + open_trades = db.get_open_trades() + if len(open_trades) >= args.max_open_trades: + logging.debug( + f"Maximum number of open trades ({args.max_open_trades}) reached. Skipping new trade creation." + ) + continue + + expiry_front_dte, front_dte_found = db.get_next_expiry_by_dte( + quote_date, front_dte + ) + expiry_back_dte, back_dte_found = db.get_next_expiry_by_dte( + quote_date, back_dte + ) + if not expiry_front_dte or not expiry_back_dte: + logging.warning( + f"⚠️ Unable to find front {front_dte} or back {back_dte} expiry. {expiry_front_dte=}, {expiry_back_dte=} " + ) + continue + + logging.info( + f"Quote date: {quote_date} -> {expiry_front_dte=} ({front_dte_found=:.1f}), " + f"{expiry_back_dte=} ({back_dte_found=:.1f})" + ) + front_call_df, front_put_df = db.get_options_by_delta( + quote_date, expiry_front_dte + ) + back_call_df, back_put_df = db.get_options_by_delta( + quote_date, expiry_back_dte + ) + + # Only look at PUTs For now. We are only looking at Calendar PUT Spread + + logging.debug("Front Option") + logging.debug(f"=> PUT OPTION: \n {front_put_df.to_string(index=False)}") + + logging.debug("Back Option") + logging.debug(f"=> PUT OPTION: \n {back_put_df.to_string(index=False)}") + + if front_put_df.empty or back_put_df.empty: + logging.warning( + "⚠️ One or more options are not valid. Re-run with debug to see options found for selected DTEs" + ) + continue + + front_underlying_price = front_call_df["UNDERLYING_LAST"].iloc[0] + front_strike_price = front_call_df["CALL_STRIKE"].iloc[0] + front_call_price = front_call_df["CALL_C_LAST"].iloc[0] + front_put_price = front_put_df["PUT_P_LAST"].iloc[0] + + back_underlying_price = back_call_df["UNDERLYING_LAST"].iloc[0] + back_strike_price = back_call_df["CALL_STRIKE"].iloc[0] + back_call_price = back_call_df["CALL_C_LAST"].iloc[0] + back_put_price = back_put_df["PUT_P_LAST"].iloc[0] + + logging.info( + f"Front Contract: Underlying Price={front_underlying_price:.2f}, Strike Price={front_strike_price:.2f}, Call Price={front_call_price:.2f}, Put Price={front_put_price:.2f}" + ) + logging.info( + f"Back Contract: Underlying Price={back_underlying_price:.2f}, Strike Price={back_strike_price:.2f}, Call Price={back_call_price:.2f}, Put Price={back_put_price:.2f}" + ) + + # create a multi leg trade in database + trade_legs = [ + Leg( + position_type=PositionType.SHORT, + contract_type=ContractType.PUT, + strike_price=front_strike_price, + underlying_price_open=front_underlying_price, + premium_open=front_put_price, + ), + Leg( + position_type=PositionType.LONG, + contract_type=ContractType.PUT, + strike_price=back_strike_price, + underlying_price_open=back_underlying_price, + premium_open=back_put_price, + ), + ] + premium_captured_calculated = sum(leg.premium_open for leg in trade_legs) + trade = Trade( + trade_date=quote_date, + expire_date=expiry_front_dte, + dte=front_dte, + status="OPEN", + premium_captured=premium_captured_calculated, + legs=trade_legs, + ) + trade_id = db.create_trade_with_multiple_legs(trade) + logging.info(f"Trade {trade_id} created in database") + finally: db.disconnect()