-
Notifications
You must be signed in to change notification settings - Fork 20
/
common_methods.py
402 lines (364 loc) · 13.2 KB
/
common_methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# -*- coding: utf-8 -*-
"""
Created on Fri Jul 27 10:59:17 2018
@author: antonio constandinou
"""
# COMMON METHODS
import numpy as np
import pandas as pd
import statsmodels.tsa.stattools as ts
import os
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import functools
from itertools import combinations
import statsmodels.api as sm
def build_dict_of_arrays(list_of_tups):
"""
create a dictionary from list of tuples. key = sector, values = array of tickers
pertaining to a given sector
args:
list_of_tups: list of tickers matched with their sector
returns:
dictionary
"""
sector_dict = {}
for stock_sector in list_of_tups:
sector = stock_sector[1]
ticker = stock_sector[0]
if sector not in sector_dict:
sector_dict[sector] = [ticker]
else:
sector_dict[sector].append(ticker)
return sector_dict
def data_array_merge(data_array):
"""
merge all dfs into one dfs
args:
data_array: array of pandas df
returns:
merged_df, single pandas dataframe
"""
merged_df = functools.reduce(lambda left,right: pd.merge(left,right,on='Date'), data_array)
merged_df.set_index('Date', inplace=True)
return merged_df
def fetch_last_day_mth(year_, conn):
"""
return date of the last day of data we have for a given year in our Postgres DB.
args:
year_: year, type int
conn: a Postgres DB connection object
return:
integer, last trading day of year that we have data for
"""
cur = conn.cursor()
SQL = """
SELECT MAX(date_part('day', date_price)) FROM daily_data
WHERE date_price BETWEEN '%s-12-01' AND '%s-12-31'
"""
cur.execute(SQL, [year_,year_])
data = cur.fetchall()
cur.close()
last_day = int(data[0][0])
return last_day
def fetch_last_day_any_mth(year_, mth_, conn):
"""
return date of the last day of data we have for a given month and year in our Postgres DB.
args:
year_: year, type int
mth_: month, type int
conn: a Postgres DB connection object
return:
integer, last trading day of year that we have data for
"""
cur = conn.cursor()
SQL = """
SELECT MAX(date_part('day', date_price)) FROM daily_data
WHERE date_price BETWEEN '%s-%s-01' AND '%s-%s-30'
"""
cur.execute(SQL, [year_,mth_, year_, mth_])
data = cur.fetchall()
cur.close()
last_day = int(data[0][0])
return last_day
def find_cointegrated_pairs(data, p_value=0.01):
"""
statsmodels.tsa.stattools coint method for identifying pairs
args:
data: needs to be pd_df where each column = individual ticker Adj_Close
p_value: threshold for accepting a pairs model (float), default 0.01
returns:
score_matrix (np.array), pvalue_matrix (np.array), pairs (array)
"""
n = data.shape[1]
score_matrix = np.zeros((n, n))
pvalue_matrix = np.ones((n, n))
keys = data.keys()
pairs = []
for i in range(n):
for j in range(i+1, n):
S1 = data[keys[i]]
S2 = data[keys[j]]
result = ts.coint(S1, S2)
score = result[0]
pvalue = result[1]
score_matrix[i, j] = score
pvalue_matrix[i, j] = pvalue
if pvalue < p_value:
pairs.append((keys[i], keys[j]))
return score_matrix, pvalue_matrix, pairs
def load_db_credential_info(f_name_path):
"""
load text file holding our database credential info and the database name
args:
f_name_path: name of file preceded with "\\", type string
returns:
array of 4 values that should match text file info
"""
cur_path = os.getcwd()
# lets load our database credentials and info
f = open(cur_path + f_name_path, 'r')
lines = f.readlines()[1:]
lines = lines[0].split(',')
return lines
def load_db_tickers_start_date(start_date, conn):
"""
return a list of stock tickers that have data on the start_date arg provided
args:
start_date: datetime object to be used to query or PostgreSQL database
conn: a Postgres DB connection object
returns:
list of tuples
"""
# convert start_date to string for our SQL query
date_string = start_date.strftime("%Y-%m-%d")
cur = conn.cursor()
SQL = """
SELECT ticker FROM symbol
WHERE id IN
(SELECT DISTINCT(stock_id)
FROM daily_data
WHERE date_price = %s)
"""
cur.execute(SQL, (date_string,))
data = cur.fetchall()
return data
def load_db_tickers_sectors(start_date, conn):
"""
return a list of tuples. each tuple is a ticker paired with it's sector
args:
start_date: datetime object to be used to query or PostgreSQL database
conn: a Postgres DB connection object
returns:
list of tuples
"""
# convert start_date to string for our SQL query
date_string = start_date.strftime("%Y-%m-%d")
cur = conn.cursor()
SQL = """
SELECT ticker, sector FROM symbol
WHERE id IN
(SELECT DISTINCT(stock_id)
FROM daily_data
WHERE date_price = %s)
"""
cur.execute(SQL, (date_string,))
data = cur.fetchall()
return data
def load_df_stock_data_array(stocks, start_date, end_date, conn):
"""
return an array where each element is a dataframe of loaded data
args:
stocks: tuple of strings, each string is ticker
start_date: datetime object to filter our pandas dataframe
end_date: datetime object to filter our pandas dataframe
conn: a Postgres DB connection object
returns:
array of pandas dataframe, each dataframe is stock data
"""
array_pd_dfs = []
cur = conn.cursor()
SQL = """
SELECT date_price, adj_close_price
FROM daily_data
INNER JOIN symbol ON symbol.id = daily_data.stock_id
WHERE symbol.ticker LIKE %s
"""
# for each ticker in our pair
for ticker in stocks:
# fetch our stock data from our Postgres DB
cur.execute(SQL, (ticker,))
results = cur.fetchall()
# create a pandas dataframe of our results
stock_data = pd.DataFrame(results, columns=['Date', ticker])
# ensure our data is in order of date
stock_data = stock_data.sort_values(by=['Date'], ascending = True)
# convert our column to float
stock_data[ticker] = stock_data[ticker].astype(float)
# filter our column based on a date range
mask = (stock_data['Date'] > start_date) & (stock_data['Date'] <= end_date)
# rebuild our dataframe
stock_data = stock_data.loc[mask]
# re-index the data
stock_data = stock_data.reset_index(drop=True)
# append our df to our array
array_pd_dfs.append(stock_data)
return array_pd_dfs
def load_pairs_stock_data(pair, start_date, end_date, conn):
"""
return a list of tuples. each tuple is a ticker paired with it's sector
args:
pair: tuple of two strings, each string is ticker
start_date: datetime object to filter our pandas dataframe
end_date: datetime object to filter our pandas dataframe
conn: a Postgres DB connection object
returns:
array of pandas dataframe, each dataframe is stock data
"""
array_pd_dfs = []
cur = conn.cursor()
SQL = """
SELECT date_price, adj_close_price
FROM daily_data
INNER JOIN symbol ON symbol.id = daily_data.stock_id
WHERE symbol.ticker LIKE %s
"""
# for each ticker in our pair
for ticker in pair:
# fetch our stock data from our Postgres DB
cur.execute(SQL, (ticker,))
results = cur.fetchall()
# create a pandas dataframe of our results
stock_data = pd.DataFrame(results, columns=['Date', 'Adj_Close'])
# ensure our data is in order of date
stock_data = stock_data.sort_values(by=['Date'], ascending = True)
# convert our column to float
stock_data['Adj_Close'] = stock_data['Adj_Close'].astype(float)
# filter our column based on a date range
mask = (stock_data['Date'] > start_date) & (stock_data['Date'] <= end_date)
# rebuild our dataframe
stock_data = stock_data.loc[mask]
# re-index the data
stock_data = stock_data.reset_index(drop=True)
# append our df to our array
array_pd_dfs.append(stock_data)
return array_pd_dfs
def pair_data_verifier(array_df_data, pair_tickers, threshold=10):
"""
merge two dataframes, verify if we still have the same number of data we originally had.
use an inputted threshold that tells us whether we've lost too much data in our merge or not.
args:
array_df_data: array of two pandas dataframes
pair_tickers: tuple of both tickers
threshold: integer, max number of days of data we can be missing after merging two
dataframes of data.
default = 10 to represent 10 days.
returns:
boolean False or new merged pandas dataframe
False: if our new merged dataframe is missing too much data (> threshold)
merged pandas dataframe: if our pd.dataframe index length is < threshold
"""
stock_1 = pair_tickers[0]
stock_2 = pair_tickers[1]
df_merged = pd.merge(array_df_data[0], array_df_data[1], left_on=['Date'], right_on=['Date'], how='inner')
new_col_names = ['Date', stock_1, stock_2]
df_merged.columns = new_col_names
# round columns
df_merged[stock_1] = df_merged[stock_1].round(decimals = 2)
df_merged[stock_2] = df_merged[stock_2].round(decimals = 2)
new_size = len(df_merged.index)
old_size_1 = len(array_df_data[0].index)
old_size_2 = len(array_df_data[1].index)
# print("Pairs: {0} and {1}".format(stock_1, stock_2))
# print("New merged df size: {0}".format(new_size))
# print("{0} old size: {1}".format(stock_1, old_size_1))
# print("{0} old size: {1}".format(stock_2, old_size_2))
# time.sleep(2)
if (old_size_1 - new_size) > threshold or (old_size_2 - new_size) > threshold:
print("This pair {0} and {1} were missing data.".format(stock_1, stock_2))
return False
else:
return df_merged
"""PLOT METHODS"""
def plot_price_series(df, ts1, ts2, start_date, end_date):
months = mdates.MonthLocator() # every month
fig, ax = plt.subplots()
ax.plot(df.index, df[ts1], label=ts1)
ax.plot(df.index, df[ts2], label=ts2)
ax.xaxis.set_major_locator(months)
ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
ax.grid(True)
fig.autofmt_xdate()
plt.xlabel('Month/Year')
plt.ylabel('Price ($)')
plt.title('%s and %s Daily Prices' % (ts1, ts2))
plt.legend()
plt.show()
def plot_scatter_series(df, ts1, ts2):
plt.xlabel('%s Price ($)' % ts1)
plt.ylabel('%s Price ($)' % ts2)
plt.title('%s and %s Price Scatterplot' % (ts1, ts2))
plt.scatter(df[ts1], df[ts2])
plt.show()
def plot_residuals(df):
months = mdates.MonthLocator() # every month
fig, ax = plt.subplots()
ax.plot(df.index, df["res"], label="Residuals")
ax.xaxis.set_major_locator(months)
ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %Y'))
ax.grid(True)
fig.autofmt_xdate()
plt.xlabel('Month/Year')
plt.ylabel('Price ($)')
plt.title('Residual Plot')
plt.legend()
plt.plot(df["res"])
plt.show()
def remove_ticker(ticker, array_pairs_to_clean):
"""
output a new array of tuples with specific ticker removed
args:
ticker: ticker to remove, type string
array_pairs_to_clean: array of tuples
returns:
array of tuples
"""
clean_pairs = []
for pair in array_pairs_to_clean:
if ticker in pair:
continue
else:
clean_pairs.append(pair)
return clean_pairs
def write_dict_text(f_name, dict_):
"""
write dictionary info to text file.
each line in text file = key, value
value is stripped of brackts "(" and ")" and single quotes "'"
args:
f_name: file_name to create, type string
dict_: python dictionary
returns:
NoneType
"""
f_name = f_name + ".txt"
file_to_write = open(f_name, 'w')
for sector, ticker_arr in dict_.items():
for ele in ticker_arr:
new_str = (sector + "," + str(ele)).replace("(","").replace(")","").replace("'","").replace(" ","")
file_to_write.write("%s\n" % (new_str,))
print("{0} file created.".format(f_name))
def write_results_text_file(f_name, sub_array):
"""
write an array to text file. This python script will write data into script directory.
args:
f_name: name of our file to be written, type string
sub_array: array of our data
returns:
None
"""
# lets write elements of array to a file
f_name = f_name + ".txt"
file_to_write = open(f_name, 'w')
for ele in sub_array:
file_to_write.write("%s\n" % (ele,))