-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathParallelProcessing-rPy.qmd
627 lines (491 loc) · 28.7 KB
/
ParallelProcessing-rPy.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
---
title: "Pairs Stock Trading with Parallel Computing with R and Python"
format:
html:
page-layout: full
grid:
body-width: 900px
---
## Introduction
- 이 노트북은 교보 후원으로 진행된 5월 데이터 과학회 MeetUp에서 발표합니다.
- 쿼토 (Quarto)와 reticulate는 R과 Python을 동일한 노트북에서 작업하는 것을 가능하게 해 주는 유용한 도구입니다.
- 이번 발표의 내용은 inflearn에 공개된 Python 기반 퀀트 강의에 기반하고 있습니다.
- Baseline 노트북은 케글 (<https://www.kaggle.com/code/dtmanager1979/stock-trading-eda-scheduled>)과 깃허브 (<https://github.com/SongYouk/PairsTrading/blob/main/src/doParallel/doParallel.qmd>)에 공개되어 있습니다.
- 발표는 크게 두 가지의 내용으로 Python과 R을 동시에 사용하게 해주는 유용한 도구인 reticulate를 소개하는 내용과 빠른 데이터 전처리를 위한 병렬처리를 다루는 내용으로 이루어집니다.
## Python 환경 복원하기
- Python 분석 환경 관리는 mini-conda와 yaml을 사용해서 관리하고 reticulate를 사용해서 quarto에서 python conda 환경에 접근 가능합니다.
```{r}
library(reticulate)
# reticulate::use_condaenv(condaenv = "C:\\ProgramData\\miniconda3\\envs\\myenv-finance")
reticulate::use_condaenv(condaenv = "/opt/miniconda3/envs/myenv-finance-mac")
# set current working directory
setwd("/Users/songyouk/PairsTrading/R-meetup-ParallelProcessing")
```
```{python}
import pandas as pd
import yfinance as yf
import datetime
url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
sp500 = pd.read_html(url)
sp500_list = sp500[0]["Symbol"].tolist()
end_time = datetime.datetime.today()
start_time = end_time - pd.DateOffset(months = 26)
# download data from yahoo finance
data_1d = yf.download(sp500_list, start = start_time, end = end_time, interval = "1d", progress = False)["Adj Close"]
# split data into three parts
# 365 days for correlation matrix
start_time_correlation = start_time
final_time_correlation = start_time_correlation + pd.DateOffset(months = 12)
# 365 days for backtesting
start_time_backtesting = final_time_correlation + pd.DateOffset(days = 1)
final_time_backtesting = end_time - pd.DateOffset(days = 60)
# 60 days for reassurance for backtesting with high frequency (Intraday)
start_time_reassurance = final_time_backtesting + pd.DateOffset(days = 1)
final_time_reassurance = end_time
# check if the duration of reassurance is 60 days which means the difference between final_time_reassurance and start_time_reassurance is 59 days
final_time_reassurance - start_time_reassurance
```
```{python}
# in below, final_time_backtesting was final_time_correlation, but the truth has been found that some of pairs had high correlation for the 1st year, but in the second year, the correlation dropped, so decided to select pairs which have high correlation for 2 years.
data_1d_corr = data_1d.loc[start_time_correlation:final_time_backtesting].copy() #inclusive:inclusive
correlation_matrix = data_1d_corr.corr()
correlation_matrix
```
```{python}
correlation_values = correlation_matrix.values.flatten()
correlation_values = correlation_values[(correlation_values != 1)]
# save correlation_values into data folder
correlation_values = pd.DataFrame(correlation_values, columns=["Correlation"])
correlation_values.to_csv("./data/correlation_values.csv")
```
```{python}
import numpy as np
# Create a boolean mask for the upper triangle of the matrix
mask_upper_triangle = np.triu(np.ones(correlation_matrix.shape), k=1)
# Apply the mask to the correlation matrix, this will remain values only in the upper triangle
upper_corr_matrix = np.multiply(correlation_matrix, mask_upper_triangle)
# Reshape the DataFrame from a 2D table to a 1D series, preserving row and column labels
stacked_corr = upper_corr_matrix.stack()
# Sort the correlation values in descending order
sorted_corr = stacked_corr.sort_values(ascending=False)
# Select the top 3000 pairs
high_correlated_pairs = sorted_corr.index[0:3000].tolist()
# Create a DataFrame to store the high correlated pairs
high_correlated_pairs_pd = pd.DataFrame(high_correlated_pairs, columns=["Stock1", "Stock2"])
# save high_correlated_pairs into data folder
high_correlated_pairs_pd.to_csv("./data/high_correlated_pairs.csv")
data_1d.to_csv("./data/data_1d.csv")
```
## plotly를 사용한 고상관 주식의 시계열 분석
```{python message=FALSE}
from ipywidgets import interact, Output, widgets
from plotly import graph_objs as go
from IPython.display import display, clear_output
# Create dropdown widget for pairs
pair_dropdown = widgets.Dropdown(
options=[(pair, pair) for pair in high_correlated_pairs[10:]],
)
# Create the output widget for displaying the plot
plot_output = Output()
# Display the empty output widget once
display(plot_output)
# Function to update graph
def update_graph(selected_pair):
with plot_output:
# remove the graph in the output cell already
clear_output(wait=True)
# Extract individual stocks from the selected pair
fig = go.Figure()
# Add trace for stock1 using the primary y-axis
fig.add_trace(go.Scatter(x = data_1d.index, y = data_1d[selected_pair[0]], name = selected_pair[0], yaxis = "y1"))
# Add trace for stock2 using the secondary y-axis
fig.add_trace(go.Scatter(x = data_1d.index, y = data_1d[selected_pair[1]], name = selected_pair[1], yaxis = "y2"))
# Update layout to include a second y-axis
fig.update_layout(
yaxis=dict(title=selected_pair[0], titlefont=dict(color="blue"), tickfont=dict(color="blue")),
yaxis2=dict(title=selected_pair[1], titlefont=dict(color="red"), tickfont=dict(color="red"), overlaying="y", side="right"),
title="Stock Prices Over Time",
xaxis_title="Date"
)
fig.show()
# Create interactive widget
interact(update_graph, selected_pair=pair_dropdown);
# Run the app
# shinyApp(ui = ui, server = server)
```
```{python}
import matplotlib.pyplot as plt
# plot histogram
plt.hist(correlation_values, bins = 50, edgecolor = "black")
plt.show()
```
## Python 객체지향 문법의 dplyr 문법으로의 전환
- Python 객체지향 문법을 써서 데이터의 Life Cycle을 만들어 냅니다.
- Piping을 사용하는 R의 dplyr 문법도 데이터의 Life Cycle을 직관적으로 볼 수 있는 훌륭한 도구입니다.
- Python IDE로서는 아직까지는 Visual Studio Code가 좀 더 유리한 면이 있습니다. (ex. Pylance를 이용한 도움말 접근)
- reticulate 사용을 위해서는 RStudio가 좀 더 유리합니다. (ex. rendering)
- 아이러니하지만 함수형 dplyr 문법이 객체지향 Python OOP (Object-Oriented-Programming)를 학습하는데 큰 도움이 되었습니다.
```{python}
class PairTradingFinancialAnalysis:
# attributes (속성)
def __init__(self, pair, df_whole, window, zscore_threshold, margin_init, margin_ratio):
self.stock1, self.stock2 = pair[0], pair[1]
self.window = window
self.zscore_threshold = zscore_threshold
self.margin_init = margin_init
self.margin_ratio = margin_ratio
self.margin = margin_init
self.df_pair = df_whole.loc[:, pair].copy()
self.df_signal_summary = pd.DataFrame()
self.df_margin = pd.DataFrame()
def __repr__(self):
return f"PairTradingFinancialAnalysis(pair = {self.stock1} and {self.stock2}, window = {self.window}, zscore_threshold = {self.zscore_threshold}, margin_init = {self.margin_init}, margin_ratio = {self.margin_ratio})"
# methods (메소드)
def zscore_calculation(self):
"""
주어진 주식 Pair에 윈도우를 기반으로 가격 비율의 이동 평균, 이동 표준 편차를 기반으로 zscore를 계산합니다.
Attributes (속성):
- self.pair (튜플): 분석할 주식 쌍의 주식 심볼을 포함하는 튜플입니다.
- self.df_pair (DataFrame): 분석할 주식에 대한 주식 가격을 포함하는 DataFrame입니다.
- self.window (정수): 이동 평균과 이동 편차을 계산하는 데 사용되는 일수 (days)입니다.
Reseults (결과):
- 현재 가격 비율과 주어진 윈도우의 과거로부터의 이동 평균, 이동 표준 편차를 추가하여 zscore를 계산하여 self.df를 업데이트합니다.
"""
self.df_pair["ratio"] = self.df_pair[self.stock1] / self.df_pair[self.stock2]
self.df_pair["ma"] = self.df_pair["ratio"].rolling(window=self.window).mean().shift(1)
self.df_pair["msd"] = self.df_pair["ratio"].rolling(window=self.window).std().shift(1)
self.df_pair["zscore"] = (self.df_pair["ratio"] - self.df_pair["ma"]) / self.df_pair["msd"]
def signal_calculation(self):
"""
zscore와 zscore_threshold를 비교하여 거래 신호를 계산합니다.
Attributes (속성):
- self.df_pair (DataFrame): zscore_calculation에서 얻은 DataFrame입니다.
- self.zscore_threshold (float): 거래 신호를 결정하는 데 사용되는 기준 값입니다.
Reseults (결과):
- zscore (stock1/stock2에 비례), zscore_threshold, 그리고 신호를 포함하여 self.df_pair를 업데이트합니다.
- zscore > zscore_threshold는 stock1이 stock2에 비해 통계적으로 유의미하게 고평가되었다는 것을 의미하고 이는 stock1을 매도(short)하고 stock2를 매입(Long)해야 함을 의미하고 이를 signal = -1로 표시합니다.
- zscore < -zscore_threshold는 stock1이 stock2에 비해 통계적으로 유의미하게 저평가되었다는 것을 의미하고 이는 stock1을 매입(Long)하고 stock2를 매도(Short)해야 함을 의미하고 이를 signal = 1로 표시합니다.
- zscore가 -1과 1 사이에 있는 경우에는 stock1과 stock2가 통계적으로 유의미하게 평가되지 않았다는 것을 의미하고 이는 stock1과 stock2를 매도하거나 매입하지 않아야 함을 의미하고 이를 signal = 0으로 표시합니다.
- zscore가 5보다 크거나 -5보다 작은 경우에는 거래를 하지 않습니다. 왜냐하면 이러한 경우는 통계의 표준에서 너무 크게 벗어난 경우이기 (예를 들어, 주식 가격의 폭락) 때문에 어떤 결정을 바꾸는 것이 리스크가 될 수 있습니다. 주식 가격의 폭락이나 폭등을 기준으로 거래를 하는 방법론도 있으나 우리의 방법론은 철저히 통계에 기반하기 때문에 통계로 해석이 어려운 특수한 경우는 거래에서 제외합니다.
- 위에 명시된 경우에 해당되지 않는 경우는 signal을 유지함을 의미하므로 기존 signal을 ffill()을 사용하여 앞으로 채우고, 남은 NaN 값을 0으로 채웁니다.
"""
import numpy as np
self.df_pair['signal'] = np.nan
self.df_pair['signal'] = np.where((self.df_pair['zscore'] > self.zscore_threshold) & (self.df_pair['zscore'] < 5), -1, self.df_pair['signal'])
self.df_pair['signal'] = np.where((self.df_pair['zscore'] < -self.zscore_threshold) & (self.df_pair['zscore'] > -5), 1, self.df_pair['signal'])
self.df_pair['signal'] = np.where((self.df_pair['zscore'] > -1) & (self.df_pair['zscore'] < 1), 0, self.df_pair['signal'])
self.df_pair['signal'] = self.df_pair['signal'].ffill()
self.df_pair['signal'] = self.df_pair['signal'].fillna(0)
def signal_summary(self):
"""
self.df_pair를 signal을 바탕으로 그룹화하고, 시작 및 종료 날짜, 시작 및 종료 가격을 계산하여 self.df_signal_summary를 생성합니다.
Attributes (속성):
- self.df_pair (DataFrame): signal_calculation에서 얻은 DataFrame입니다.
Returns (결과):
DataFrame: 시작 및 종료 날짜, 시작 및 종료 가격, 그리고 신호를 포함하는 self.df_signal_summary를 생성합니다.
"""
self.df_pair["signal_group"] = self.df_pair["signal"].diff().ne(0).cumsum()
self.df_pair["time"] = self.df_pair.index
self.df_signal_summary = (self.df_pair
.groupby("signal_group")
.agg({"signal": "first",
"time": "first",
self.stock1: ["first"],
self.stock2: ["first"]})
.reset_index(drop=True)
)
self.df_signal_summary.columns = ["signal", "time_start","stock1_start_price", "stock2_start_price"]
self.df_signal_summary["time_end"] = self.df_signal_summary["time_start"].shift(-1)
self.df_signal_summary["stock1_final_price"] = self.df_signal_summary["stock1_start_price"].shift(-1)
self.df_signal_summary["stock2_final_price"] = self.df_signal_summary["stock2_start_price"].shift(-1)
self.df_signal_summary.loc[self.df_signal_summary.index[-1], "time_end"] = self.df_pair.index[-1]
self.df_signal_summary.loc[self.df_signal_summary.index[-1], "stock1_final_price"] = self.df_pair[self.stock1].iloc[-1]
self.df_signal_summary.loc[self.df_signal_summary.index[-1], "stock2_final_price"] = self.df_pair[self.stock2].iloc[-1]
# reorder columns
self.df_signal_summary = self.df_signal_summary[["signal", "time_start", "time_end", "stock1_start_price", "stock1_final_price", "stock2_start_price", "stock2_final_price"]]
def margin_calculation(self):
'''
주식 쌍을 사고 팔 때의 "수수료"와 "가격 조정"을 고려하여 마진 (자산, 레버리지를 위한 담보금)을 계산합니다.
https://www.interactivebrokers.com/en/pricing/commissions-stocks.php
수수료 세부 정보:
- 매입 (buy) 수수료: 주당 $0.005 (최소 $1, 거래 가치의 최대 1%)
- 매도 (sell) 수수료: 주당 $0.005 (최소 $1, 거래 가치의 최대 1%) + 판매 가치의 0.000008 (SEC Transaction Fee) + 주당 $0.000166 (FINRA Trading Activity Fee)
가격 조정:
- 매입 & 매도 가격: 보수적으로 3 pips 가격 조정을 하도록 하겠습니다. 일반적으로 1.5 pips (1 pip = 0.0001)를 사용합니다.
- 매입 & 매도 가격: 1.0003 (매입) & 0.9997 (매도) --> 결국 가격 조정은 브로커의 이익을 위한 것입니다.
Attributes (속성):
- self.margin_init (float): 초기 담보 금액입니다.
- self.margin_rate (float): 레버리지 계좌의 마진 비율입니다. 예를 들어 margin이 3000이고 마진 비율이 0.25이면, 레버리지 계좌의 총 투자 가능 금액은 12,000입니다.
- self.df_signal_summary (DataFrame): signal_summary에서 생성된 DataFrame입니다.
Returns (결과):
- DataFrame: self.df_signal_summary에서 signal이 있는 행만 추출하여 df_margin을 생성하고, 이를 margin을 계산하여 업데이트합니다.
'''
import math
# Initial buying power and margin setup
margin = self.margin_init
buying_power = margin/ self.margin_ratio
# Calculate margin for each stock pair
df_margin = self.df_signal_summary.copy()
df_margin = df_margin[df_margin['signal'].isin([1, -1])]
for index, row in df_margin.iterrows(): # https://www.w3schools.com/python/pandas/ref_df_iterrows.asp
# Calculate the number of units for each stock pair
stock1_units = math.floor((0.5 * buying_power) / row["stock1_start_price"])
stock2_units = math.floor((0.5 * buying_power) / row["stock2_start_price"])
# Calculate commissions for buying and selling
if row["signal"] == 1:
commision_buy = min(max(stock1_units * 0.005, 1), 0.5 * buying_power * 0.01)
commision_sell = min(max(stock2_units * 0.005, 1), 0.5 * buying_power * 0.01) + 0.000008 * 0.5 * buying_power + 0.000166 * stock2_units
total_commission = commision_buy + commision_sell
else:
commision_buy = min(max(stock2_units * 0.005, 1), 0.5 * buying_power * 0.01)
commision_sell = min(max(stock1_units * 0.005, 1), 0.5 * buying_power * 0.01) + 0.000008 * 0.5 * buying_power + 0.000166 * stock1_units
total_commission = commision_buy + commision_sell
# Calculate margin based on signal
if row["signal"] == 1: # Buy stock1 and sell stock2
margin += ((row["stock1_final_price"] * 0.9997 - row["stock1_start_price"] * 1.0003) * stock1_units -
(row["stock2_final_price"] * 1.0003 - row["stock2_start_price"] * 0.9997) * stock2_units) - total_commission
else:
margin += ((row["stock2_final_price"] * 0.9997 - row["stock2_start_price"] * 1.0003) * stock2_units -
(row["stock1_final_price"] * 1.0003 - row["stock1_start_price"] * 0.9997) * stock1_units) - total_commission
# Update margin and buying power for each iteration
df_margin.loc[index, "margin"] = margin
buying_power = margin / self.margin_ratio
self.margin = margin
self.df_margin = df_margin
def trading_summary(self):
"""
페어 트레이딩 전략에 대한 요약 정보를 제공합니다.
Attributes (속성):
- self.df_summary (DataFrame): margin_calculation에서 얻은 DataFrame입니다.
Returns (결과):
아래 정보를 포함하는 딕셔너리를 반환합니다.
- 'pair': 분석되는 쌍.
- 'window': 이동 평균을 계산하는 데 사용되는 일수.
- 'zscore_threshold': 거래 신호를 결정하는 데 사용되는 기준 값.
- 'margin': 거래 후 마진.
"""
self.zscore_calculation()
self.signal_calculation()
self.signal_summary()
self.margin_calculation()
trading_result = {
'pair': (self.stock1, self.stock2),
'window': self.window,
'zscore_threshold': self.zscore_threshold,
'margin': self.margin
}
return trading_result
```
```{python}
PairTradingFinancialAnalysis_obj = PairTradingFinancialAnalysis(pair = ("MSFT", "AAPL"), df_whole = data_1d[start_time_backtesting:final_time_backtesting], window = 3, zscore_threshold = 2.0, margin_init = 3000, margin_ratio = 2.0)
PairTradingFinancialAnalysis_obj.trading_summary()
```
```{python}
PairTradingFinancialAnalysis_obj = PairTradingFinancialAnalysis(pair = ("MSFT", "AAPL"), df_whole = data_1d[start_time_backtesting:final_time_backtesting], window = 3, zscore_threshold = 2.0, margin_init = 3000, margin_ratio = 2.0)
PairTradingFinancialAnalysis_obj.zscore_calculation()
PairTradingFinancialAnalysis_obj.df_pair
```
```{python}
PairTradingFinancialAnalysis_obj.signal_calculation()
PairTradingFinancialAnalysis_obj.df_pair
```
```{python}
PairTradingFinancialAnalysis_obj.signal_summary()
PairTradingFinancialAnalysis_obj.df_signal_summary
```
```{python}
PairTradingFinancialAnalysis_obj.margin_calculation()
PairTradingFinancialAnalysis_obj.df_margin.head(10)
```
```{r, message=FALSE}
library(data.table)
library(dplyr)
library(zoo)
# Define the trading summary function in R
trading_summary <- function(df_whole, pair, window, zscore_threshold, margin_init, margin_ratio) {
stock1 <- pair[[1]]
stock2 <- pair[[2]]
pair <- c(stock1, stock2)
# Extract the pair data from the whole dataset
df_pair <- df_whole[, c("Date",pair)]
# Calculate the ratio and its rolling statistics
ratio <- df_pair[[2]] / df_pair[[3]]
ma <- frollmean(ratio, window, align = "right", fill = NA)
msd <- frollapply(ratio, window, sd, align = "right", fill = NA)
zscore <- (ratio - dplyr::lag(ma)) / dplyr::lag(msd)
# Calculate signals based on z-score threshold
signal <- ifelse(zscore > zscore_threshold & zscore < 5, -1, NA)
signal <- ifelse(zscore < -zscore_threshold & zscore > -5, 1, signal)
signal <- ifelse(zscore > -1 & zscore < 1, 0, signal)
signal <- na.locf(signal, na.rm = FALSE)
signal[is.na(signal)] <- 0
df_pair <- data.frame(df_pair, ratio = ratio, moving_mean = ma, moving_std = msd, zscore= zscore, signal = signal)
df_pair <- df_pair %>%
mutate(
# Check where signal changes compared to previous value
change = signal != lag(signal, default = first(signal)),
# Group these changes
signal_group = cumsum(change)
) %>%
# Optionally remove the 'change' column if you don't need it
select(-change)
df_signal_summary <- df_pair %>%
group_by(signal_group) %>%
summarize(
signal = first(signal, order_by = Date),
time_start = first(Date, order_by = Date),
stock1_start_price = first(.data[[stock1]], order_by = Date),
stock2_start_price = first(.data[[stock2]], order_by = Date)
) %>%
ungroup()
# add new column time_end, stock1_end_price and stock2_end_price with lead function
df_signal_summary <- df_signal_summary %>%
mutate(
time_end = lead(time_start),
stock1_end_price = lead(stock1_start_price),
stock2_end_price = lead(stock2_start_price)
)
# Fill the last row with the last date and price from df_pair
df_signal_summary[nrow(df_signal_summary), "time_end"] <- df_pair$Date[nrow(df_pair)]
df_signal_summary[nrow(df_signal_summary), "stock1_end_price"] <- df_pair[[stock1]][nrow(df_pair)]
df_signal_summary[nrow(df_signal_summary), "stock2_end_price"] <- df_pair[[stock2]][nrow(df_pair)]
# reorder columns for better readability "signal", "time_start", "time_end", "stock1_start_price", "stock1_final_price", "stock2_start_price", "stock2_final_price"
df_signal_summary <- df_signal_summary[, c("signal", "time_start", "time_end", "stock1_start_price", "stock1_end_price", "stock2_start_price", "stock2_end_price")]
# subset from df_signal_summary for the rows of signal is 1 or -1
df_margin <- df_signal_summary %>% filter(signal %in% c(1, -1))
margin <- margin_init
df_margin$margin <- NA
if (nrow(df_margin) > 0) {
for (i in 1:nrow(df_margin)){
buying_power <- margin / margin_ratio
stock1_units <- floor((0.5 * buying_power) / df_margin$stock1_start_price[i])
stock2_units <- floor((0.5 * buying_power) / df_margin$stock2_start_price[i])
if (df_margin$signal[i] == 1){
commision_buy <- min(max(stock1_units * 0.005, 1), 0.5 * buying_power * 0.01)
commision_sell <- min(max(stock2_units * 0.005, 1), 0.5 * buying_power * 0.01) + 0.000008 * 0.5 * buying_power + 0.000166 * stock2_units
total_commission <- commision_buy + commision_sell
} else {
commision_buy <- min(max(stock2_units * 0.005, 1), 0.5 * buying_power * 0.01)
commision_sell <- min(max(stock1_units * 0.005, 1), 0.5 * buying_power * 0.01) + 0.000008 * 0.5 * buying_power + 0.000166 * stock1_units
total_commission <- commision_buy + commision_sell
}
if (df_margin$signal[i] == 1){
margin <- margin + ((df_margin$stock1_end_price[i] * 0.9997 - df_margin$stock1_start_price[i] * 1.0003) * stock1_units -
(df_margin$stock2_end_price[i] * 1.0003 - df_margin$stock2_start_price[i] * 0.9997) * stock2_units) - total_commission
} else {
margin <- margin + ((df_margin$stock2_end_price[i] * 0.9997 - df_margin$stock2_start_price[i] * 1.0003) * stock2_units -
(df_margin$stock1_end_price[i] * 1.0003 - df_margin$stock1_start_price[i] * 0.9997) * stock1_units) - total_commission
}
df_margin$margin[i] <- margin
buying_power <- margin / margin_ratio
}
}
# Return a dataframe containing the trading results and margin
return(data.frame(
pair1 = pair[1],
pair2 = pair[2],
window = window,
zscore_threshold = zscore_threshold,
margin = margin # Placeholder for the calculated margin
))
}
```
```{python}
data_1d_backtesting = data_1d[start_time_backtesting:final_time_backtesting].copy()
data_1d_backtesting.to_csv("./data/data_1d_backtesting.csv")
# data_1d_backtesting
```
## Nested For Loop: Python의 List Comprehension vs. R의 apply
```{python}
def trading_summary_wt_parameters(df_whole, pair, margin_init, margin_ratio):
pair_trading_summary_ls = [
PairTradingFinancialAnalysis(df_whole = df_whole, pair = pair, margin_init = margin_init, margin_ratio = margin_ratio,
window = window, zscore_threshold = zscore_threshold).trading_summary()
for window in range(3,30,1)
for zscore_threshold in np.linspace(2.0, 4.0, 21)
]
pair_trading_summary_df = pd.DataFrame(pair_trading_summary_ls).sort_values(by='margin', ascending=False)
return pair_trading_summary_df
```
```{r}
data_1d_backtesting <- fread("./data/data_1d_backtesting.csv") %>% as.data.frame()
```
```{r}
trading_summary_wt_parameters <- function(pair, df_whole = data_1d_backtesting, margin_init = 3000, margin_ratio = 0.25) {
param_grid <- expand.grid(
window = 3:29,
zscore_threshold = seq(2.0, 4.0, length.out = 21)
)
results_list <- apply(param_grid, 1, function(params) {
window <- params[1]
zscore_threshold <- params[2]
trading_summary(df_whole, pair, as.numeric(window), as.numeric(zscore_threshold), margin_init, margin_ratio)
})
results_df <- do.call(rbind, results_list) # Convert list of results to a dataframe
results_df <- results_df[order(-results_df$margin), ] # Sort dataframe by 'margin' in descending order
return(results_df)
}
```
## doParallel을 이용한 Parallel Processing
- doParallel을 적용하기 위해서 가장 필요한 작업은 %doPar%에 적용되는 함수가 독립적이어야 하는 것입니다.
- 또한 foreach의 입력 변수는 iterable한 데이터 type이어햐 합니다. (ex. 리스트, 반대예: dataframe)
```{python}
from joblib import Parallel, delayed
import os
num_cores = os.cpu_count()
print(num_cores)
```
```{python}
pairs_trading_summary = Parallel(n_jobs=num_cores)(delayed(trading_summary_wt_parameters)(df_whole = data_1d_backtesting, margin_init = 3000, margin_ratio = 0.25, pair = pair) for pair in high_correlated_pairs[0:30])
```
```{r, message=FALSE}
library(tictoc)
library(doParallel)
high_correlated_pairs <- fread('./data/high_correlated_pairs.csv', header = TRUE) %>% as.data.frame()
# get the first 100 rows from high_correlated_pairs
high_correlated_pairs <- high_correlated_pairs[1:30,]
tic()
high_correlated_pairs_ls <- list()
for (i in 1:nrow(high_correlated_pairs)){
high_correlated_pairs_ls[[i]] <- list(high_correlated_pairs$Stock1[i], high_correlated_pairs$Stock2[i])
}
# write to use all the cores based on the number of cores in the machine
cl <- makeCluster(detectCores())
registerDoParallel(cl)
pairs_trading_summary <- foreach(
pair_input = high_correlated_pairs_ls,
.combine = "rbind",
.packages = c("tidyverse","data.table","zoo")
) %dopar% {
trading_summary_wt_parameters(pair = pair_input)
}
stopCluster(cl)
toc()
```
```{python}
pairs_trading_summary_df = pd.concat(pairs_trading_summary, ignore_index=True)
```
```{python}
top_margin_result = pairs_trading_summary_df.groupby("pair").head(20)
temp_df = (
top_margin_result
.groupby("pair")
.agg(
mean_window=('window', 'mean'),
median_window=('window', 'median'),
std_window=('window', 'std'),
mean_zscore_threshold=('zscore_threshold', 'mean'),
median_zscore_threshold=('zscore_threshold', 'median'),
std_zscore_threshold=('zscore_threshold', 'std'),
mean_margin=('margin', 'mean'),
median_margin=('margin', 'median'),
std_margin=('margin', 'std')
)
.sort_values(by="mean_margin", ascending=False)
.reset_index(inplace=False)
)
temp_df["dist_window"] = temp_df["std_window"] / temp_df["mean_window"]
temp_df["dist_zscore_threshold"] = temp_df["std_zscore_threshold"] / temp_df["mean_zscore_threshold"]
temp_df["dist_margin"] = temp_df["std_margin"] / temp_df["mean_margin"]
pairs_trading_summary_wt_optimum_params_df = temp_df
```