forked from kubadlugosz/algorithmic-trading
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfx_macd_renko.py
180 lines (160 loc) · 8.08 KB
/
fx_macd_renko.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# =============================================================================
# Automated trading script I - MACD
# Author : Mayank Rasu
# Please report bug/issues in the Q&A section
# =============================================================================
import fxcmpy
import numpy as np
from stocktrends import Renko
import statsmodels.api as sm
import time
import copy
#initiating API connection and defining trade parameters
token_path = "D:\\Udemy\\Quantitative Investing Using Python\\7_API Trading\\key.txt"
con = fxcmpy.fxcmpy(access_token = open(token_path,'r').read(), log_level = 'error', server='demo')
#defining strategy parameters
pairs = ['EUR/USD','GBP/USD','USD/CHF','AUD/USD','USD/CAD'] #currency pairs to be included in the strategy
#pairs = ['EUR/JPY','USD/JPY','AUD/JPY','AUD/NZD','NZD/USD']
pos_size = 10 #max capital allocated/position size for any currency pair
def MACD(DF,a,b,c):
"""function to calculate MACD
typical values a = 12; b =26, c =9"""
df = DF.copy()
df["MA_Fast"]=df["Close"].ewm(span=a,min_periods=a).mean()
df["MA_Slow"]=df["Close"].ewm(span=b,min_periods=b).mean()
df["MACD"]=df["MA_Fast"]-df["MA_Slow"]
df["Signal"]=df["MACD"].ewm(span=c,min_periods=c).mean()
df.dropna(inplace=True)
return (df["MACD"],df["Signal"])
def ATR(DF,n):
"function to calculate True Range and Average True Range"
df = DF.copy()
df['H-L']=abs(df['High']-df['Low'])
df['H-PC']=abs(df['High']-df['Close'].shift(1))
df['L-PC']=abs(df['Low']-df['Close'].shift(1))
df['TR']=df[['H-L','H-PC','L-PC']].max(axis=1,skipna=False)
df['ATR'] = df['TR'].rolling(n).mean()
#df['ATR'] = df['TR'].ewm(span=n,adjust=False,min_periods=n).mean()
df2 = df.drop(['H-L','H-PC','L-PC'],axis=1)
return df2
def slope(ser,n):
"function to calculate the slope of n consecutive points on a plot"
slopes = [i*0 for i in range(n-1)]
for i in range(n,len(ser)+1):
y = ser[i-n:i]
x = np.array(range(n))
y_scaled = (y - y.min())/(y.max() - y.min())
x_scaled = (x - x.min())/(x.max() - x.min())
x_scaled = sm.add_constant(x_scaled)
model = sm.OLS(y_scaled,x_scaled)
results = model.fit()
slopes.append(results.params[-1])
slope_angle = (np.rad2deg(np.arctan(np.array(slopes))))
return np.array(slope_angle)
def renko_DF(DF):
"function to convert ohlc data into renko bricks"
df = DF.copy()
df.reset_index(inplace=True)
df = df.iloc[:,[0,1,2,3,4,5]]
df.columns = ["date","open","close","high","low","volume"]
df2 = Renko(df)
df2.brick_size = round(ATR(DF,120)["ATR"][-1],4)
renko_df = df2.get_bricks()
renko_df["bar_num"] = np.where(renko_df["uptrend"]==True,1,np.where(renko_df["uptrend"]==False,-1,0))
for i in range(1,len(renko_df["bar_num"])):
if renko_df["bar_num"][i]>0 and renko_df["bar_num"][i-1]>0:
renko_df["bar_num"][i]+=renko_df["bar_num"][i-1]
elif renko_df["bar_num"][i]<0 and renko_df["bar_num"][i-1]<0:
renko_df["bar_num"][i]+=renko_df["bar_num"][i-1]
renko_df.drop_duplicates(subset="date",keep="last",inplace=True)
return renko_df
def renko_merge(DF):
"function to merging renko df with original ohlc df"
df = copy.deepcopy(DF)
df["Date"] = df.index
renko = renko_DF(df)
renko.columns = ["Date","open","high","low","close","uptrend","bar_num"]
merged_df = df.merge(renko.loc[:,["Date","bar_num"]],how="outer",on="Date")
merged_df["bar_num"].fillna(method='ffill',inplace=True)
merged_df["macd"]= MACD(merged_df,12,26,9)[0]
merged_df["macd_sig"]= MACD(merged_df,12,26,9)[1]
merged_df["macd_slope"] = slope(merged_df["macd"],5)
merged_df["macd_sig_slope"] = slope(merged_df["macd_sig"],5)
return merged_df
def trade_signal(MERGED_DF,l_s):
"function to generate signal"
signal = ""
df = copy.deepcopy(MERGED_DF)
if l_s == "":
if df["bar_num"].tolist()[-1]>=2 and df["macd"].tolist()[-1]>df["macd_sig"].tolist()[-1] and df["macd_slope"].tolist()[-1]>df["macd_sig_slope"].tolist()[-1]:
signal = "Buy"
elif df["bar_num"].tolist()[-1]<=-2 and df["macd"].tolist()[-1]<df["macd_sig"].tolist()[-1] and df["macd_slope"].tolist()[-1]<df["macd_sig_slope"].tolist()[-1]:
signal = "Sell"
elif l_s == "long":
if df["bar_num"].tolist()[-1]<=-2 and df["macd"].tolist()[-1]<df["macd_sig"].tolist()[-1] and df["macd_slope"].tolist()[-1]<df["macd_sig_slope"].tolist()[-1]:
signal = "Close_Sell"
elif df["macd"].tolist()[-1]<df["macd_sig"].tolist()[-1] and df["macd_slope"].tolist()[-1]<df["macd_sig_slope"].tolist()[-1]:
signal = "Close"
elif l_s == "short":
if df["bar_num"].tolist()[-1]>=2 and df["macd"].tolist()[-1]>df["macd_sig"].tolist()[-1] and df["macd_slope"].tolist()[-1]>df["macd_sig_slope"].tolist()[-1]:
signal = "Close_Buy"
elif df["macd"].tolist()[-1]>df["macd_sig"].tolist()[-1] and df["macd_slope"].tolist()[-1]>df["macd_sig_slope"].tolist()[-1]:
signal = "Close"
return signal
def main():
try:
open_pos = con.get_open_positions()
for currency in pairs:
long_short = ""
if len(open_pos)>0:
open_pos_cur = open_pos[open_pos["currency"]==currency]
if len(open_pos_cur)>0:
if open_pos_cur["isBuy"].tolist()[0]==True:
long_short = "long"
elif open_pos_cur["isBuy"].tolist()[0]==False:
long_short = "short"
data = con.get_candles(currency, period='m5', number=250)
ohlc = data.iloc[:,[0,1,2,3,8]]
ohlc.columns = ["Open","Close","High","Low","Volume"]
signal = trade_signal(renko_merge(ohlc),long_short)
if signal == "Buy":
con.open_trade(symbol=currency, is_buy=True, is_in_pips=True, amount=pos_size,
time_in_force='GTC', stop=-8, trailing_step =True, order_type='AtMarket')
print("New long position initiated for ", currency)
elif signal == "Sell":
con.open_trade(symbol=currency, is_buy=False, is_in_pips=True, amount=pos_size,
time_in_force='GTC', stop=-8, trailing_step =True, order_type='AtMarket')
print("New short position initiated for ", currency)
elif signal == "Close":
con.close_all_for_symbol(currency)
print("All positions closed for ", currency)
elif signal == "Close_Buy":
con.close_all_for_symbol(currency)
print("Existing Short position closed for ", currency)
con.open_trade(symbol=currency, is_buy=True, is_in_pips=True, amount=pos_size,
time_in_force='GTC', stop=-8, trailing_step =True, order_type='AtMarket')
print("New long position initiated for ", currency)
elif signal == "Close_Sell":
con.close_all_for_symbol(currency)
print("Existing long position closed for ", currency)
con.open_trade(symbol=currency, is_buy=False, is_in_pips=True, amount=pos_size,
time_in_force='GTC', stop=-8, trailing_step =True, order_type='AtMarket')
print("New short position initiated for ", currency)
except:
print("error encountered....skipping this iteration")
# Continuous execution
starttime=time.time()
timeout = time.time() + 60*60*1 # 60 seconds times 60 meaning the script will run for 1 hr
while time.time() <= timeout:
try:
print("passthrough at ",time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
main()
time.sleep(300 - ((time.time() - starttime) % 300.0)) # 5 minute interval between each new execution
except KeyboardInterrupt:
print('\n\nKeyboard exception received. Exiting.')
exit()
# Close all positions and exit
for currency in pairs:
print("closing all positions for ",currency)
con.close_all_for_symbol(currency)
con.close()