-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmatrix_maker.R
227 lines (199 loc) · 7.48 KB
/
matrix_maker.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
data_dir="your datat dir here"
#Setting up the billion item matrices to be used in regression
computeResidualReturn = function(dt, values,dir,trimOutliers = NULL,sparse =TRUE) {
# Create date index mapping for simplicity
df_dates = tibble(
trans_date = seq(min(dt$trade_date),
max(dt$trade_date),
by='days')) %>%
# Drop weekends to save on dummies (discuss!)
filter(!weekdays(trans_date) %in% c('Samstag', 'Sonntag')) %>%
mutate(trans_period = row_number())
dt = dt %>%
inner_join(df_dates, by = c('trade_date' = 'trans_date')) %>%
arrange(cusip, trade_date) %>%
group_by(cusip) %>%
mutate(t1 = trans_period,
t0 = lag(trans_period, n = 1)) %>%
ungroup() %>%
# Make sure yield info is available
filter(complete.cases(.) == TRUE)
# trim outliers
if (!is.null(trimOutliers)) {
tmp1 = quantile(dt$y, trimOutliers, na.rm = TRUE)
tmp2 = quantile(dt$y, 1-trimOutliers, na.rm = TRUE)
dt = dt %>%
filter(y >= tmp1 & y <=tmp2)
}
time_start = min(dt$t0)
time_end = max(dt$t1)
time_diff = time_end - time_start
timeIdx = seq(time_start, time_end)
# Fill in time matrix as sparse matrix
idx = dt$t0 < timeIdx[1] & dt$t1 >= timeIdx[1]
col=as.numeric(idx)
sparse_new=as(col, "sparseMatrix")
MatrixMaker2 = function(timeIdx,dt){
for (tm in 2:length(timeIdx)) {
# If column value is between start and end, set to 1 otherwise 0
idx = dt$t0 < timeIdx[tm] & dt$t1 >= timeIdx[tm]
col=as.numeric(idx)
sparse_new=cbind(sparse_new, as(col, "sparseMatrix"))
}
return(sparse_new)
}
# OR Fill in time matrix as matrix
MatrixMaker = function(timeIdx,dt){
time_matrix = array(0, dim = c(nrow(dt), time_diff + 1))
# colnames(time_matrix) = paste0("time_", seq(time_start, time_end))
for (tm in 1:length(timeIdx)) {
# If column value is between start and end, set to 1 otherwise 0
idx = dt$t0 < timeIdx[tm] & dt$t1 >= timeIdx[tm]
time_matrix[idx, tm] = 1
#This is an alternate methodology for a repeat sales model
# time_matrix[df_tmp$t0 == tm, tm - time_start] <- -1
# time_matrix[df_tmp$t1 == tm, tm - time_start] <- 1
}
return(time_matrix)
}
if (sparse == FALSE){
print("matrix nonsparse")
tic()
time_matrix=MatrixMaker(timeIdx,dt)
toc()
return(time_matrix)
}
else{
print("matrix sparse")
tic()
time_matrix=MatrixMaker2(timeIdx,dt)
toc()
id=Sys.getpid() #get session id so it doesn't get lost which dataframe is which in parralelization
saveRDS(time_matrix,paste0(dir,values,'__',id,".RDS"))
#save y-val
saveRDS(dt$y,paste0(dir,"y_vals/",values,'__',id,"_y.RDS"))
return()
}
}
#Parallel call
main =function(dataframes,values,dir){
iterations=length(dataframes)
id=Sys.getpid()
saveRDS(dataframes,paste0(dir,id,".RDS"))
for (x in 1:iterations){
computeResidualReturn(dataframes[[x]],values=x,dir)
}
}
#####################################################################################
#Running function parallelized vs non-parallelizized
#non cluster version on a single dataframe
print("Non cluster run")
tic()
q=main(sorted_dfs[[1]])
toc()
#Cluster run on all dataframes
print("Cluster run")
value=1
tic()
print("setting Up cluster")#executes in 15 min
cl=makeCluster(spec=5)
clusterEvalQ(cl,{library(dplyr)
library(SparseM)
library(Matrix)
library(tictoc)})
clusterExport(cl,c("computeResidualReturn",'main','value','data_dir'))
print("Done setting Up cluster")
vals=clusterApply(cl,sorted_dfs,main,values=value,dir=data_dir)
stopCluster(cl)
print("done Executing")
toc()
#####################################################################################
#Data checks, confirming which rating/liquidity/maturity category went to which file
#Finding which chunk the files written to came from out of the larger list of list of dataframes
group_Finder = function(dataframes,larger_df_list){
for (x in 1:length(dataframes)){
if (identical(dataframes[[x]][[1]],larger_df_list[[1]])){
return(x)
}
}
}
#For an individual file named via the session_id it was ran and its identifier of index, create a non sparse matrix and compare the results
compareMatricesIndiv= function(all_dfs,df_to_check,session_id){
sparse_to_full=data.frame(as.matrix(readRDS(paste0(data_dir,df_to_check,"__",session_id,".RDS"))))
group=readRDS(paste0(data_dir,session_id,".RDS"))
group_number=group_Finder(all_dfs,group)
trad_matrix=data.frame(computeResidualReturn(dt=all_dfs[[group_number]][[df_to_check]],value=1,sparse=FALSE))
if (identical(sparse_to_full,trad_matrix)){
print("pass")
}
else{
print("fail")
}
}
#Run the comparison across all the dfs assigned to that session id
compareMatricesAll = function(all_dfs,session_id,dir){
files=grep(as.character(session_id),list.files(dir),value=TRUE)
files=gsub(paste0(as.character(session_id),".RDS"),"",files)
number_files_on_core=max(as.numeric(gsub("__","",files)),na.rm=TRUE)
for (x in 11:number_files_on_core){
compareMatricesIndiv(all_dfs,x,session_id)
}
}
#Finding session id numbers
files=grep("__",list.files(data_dir),value=TRUE)
files=setdiff(list.files(data_dir),files)
files=gsub(".RDS","",files)
sessions=as.numeric(files)
sessions=sessions[!is.na(sessions)]
compareMatricesIndiv(sorted_dfs,10,sessions[[1]])
compareMatricesIndiv(sorted_dfs,10,sessions[[4]])
compareMatricesAll(sorted_dfs,sessions[[2]],data_dir)
compareMatricesAll(sorted_dfs,sessions[[3]],data_dir)
#Which category is which dataframe, the y values (bond returns), naming the files as such
saveNames_y= function(all_dfs,session_id){
group=readRDS(paste0(data_dir,session_id,".RDS"))
group_number=group_Finder(all_dfs,group)
list_dfs=all_dfs[[group_number]]
names=df_combo[[group_number]]
data_dir_orig=data_dir
data_dir=paste0(data_dir,"y_vals/")
files=grep(as.character(session_id),list.files(data_dir),value=TRUE)
files=gsub(paste0(as.character(session_id),"_y.RDS"),"",files)
number_files_on_core=max(as.numeric(gsub("__","",files)),na.rm=TRUE)
for (x in 1:number_files_on_core){
file=readRDS(paste0(data_dir,x,'__',session_id,"_y.RDS"))
saveRDS(file,paste0(data_dir,paste(names[[x]],collapse="___"),".RDS"))
session_name = paste0(data_dir,x,'__',session_id,"_y.RDS")
new_dir= paste0(data_dir_orig,"sessions/")
system(paste0('mv ',session_name," ",new_dir))
}
}
for (x in sessions){
saveNames_y(sorted_dfs,x)
}
#Which category is which dataframe, the matrices, naming the files as such
saveNames= function(all_dfs,session_id){
group=readRDS(paste0(data_dir,session_id,".RDS"))
group_number=group_Finder(all_dfs,group)
list_dfs=all_dfs[[group_number]]
names=df_combo[[group_number]]
files=grep(as.character(session_id),list.files(data_dir),value=TRUE)
files=gsub(paste0(as.character(session_id),".RDS"),"",files)
number_files_on_core=max(as.numeric(gsub("__","",files)),na.rm=TRUE)
for (x in 1:number_files_on_core){
file=readRDS(paste0(data_dir,x,'__',session_id,".RDS"))
saveRDS(file,paste0(data_dir,paste(names[[x]],collapse="___"),".RDS"))
session_name = paste0(data_dir,x,'__',session_id,".RDS")
new_dir= paste0(data_dir,"sessions/")
system(paste0('mv ',session_name," ",new_dir))
}
}
for (x in sessions){
saveNames(sorted_dfs,x)
}
#Moving session id files to a new folder called sessions
for (x in sessions){
session_name=paste0(data_dir,x,".RDS")
new_dir= paste0(data_dir,"sessions/")
system(paste0('mv ',session_name," ",new_dir))
}