-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathCTPMsgQueue.h
318 lines (282 loc) · 13.9 KB
/
CTPMsgQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
#pragma once
#include "CTPStruct.h"
#include "LockFreeQ.h"
#include <iostream>
#include "FunctionCallBackSet.h"
using namespace std;
class CTPMsgQueue
{
//响应队列中可能出现的消息类型(按字母排序)
enum EnumMsgType
{
E_fnOnConnect,
E_fnOnDisconnect,
E_fnOnErrRtnOrderAction,
E_fnOnErrRtnOrderInsert,
E_fnOnRspError,
E_fnOnRspOrderAction,
E_fnOnRspOrderInsert,
E_fnOnRspQryDepthMarketData,
E_fnOnRspQryInstrument,
E_fnOnRspQryInstrumentCommissionRate,
E_fnOnRspQryInstrumentMarginRate,
E_fnOnRspQryInvestorPosition,
E_fnOnRspQryInvestorPositionDetail,
E_fnOnRspQryOrder,
E_fnOnRspQryTrade,
E_fnOnRspQryTradingAccount,
E_fnOnRtnDepthMarketData,
E_fnOnRtnInstrumentStatus,
E_fnOnRtnOrder,
E_fnOnRtnTrade,
};
struct SMsgItem
{
EnumMsgType type; //消息类型
void* pApi; //指向类对象的指针
CThostFtdcRspInfoField RspInfo; //响应信息
bool bIsLast; //是否最后一个包
union{
int nRequestID;
ConnectionStatus Status;
};
union{
CThostFtdcDepthMarketDataField DepthMarketData; //E_fnOnRspQryDepthMarketData与E_fnOnRtnDepthMarketData返回的数据格式相同
CThostFtdcInputOrderField InputOrder;
CThostFtdcInputOrderActionField InputOrderAction;
CThostFtdcInstrumentField Instrument;
CThostFtdcInstrumentCommissionRateField InstrumentCommissionRate;
CThostFtdcInstrumentMarginRateField InstrumentMarginRate;
CThostFtdcInstrumentStatusField InstrumentStatus;
CThostFtdcInvestorPositionField InvestorPosition;
CThostFtdcInvestorPositionDetailField InvestorPositionDetail;
CThostFtdcOrderField Order;
CThostFtdcOrderActionField OrderAction;
CThostFtdcRspUserLoginField RspUserLogin;
CThostFtdcTradeField Trade;
CThostFtdcTradingAccountField TradingAccount;
};
};
public:
CTPMsgQueue(void)
{
m_hThread = NULL;
m_bRunning = false;
//回调函数地址指针
m_fnOnConnect = NULL;
m_fnOnDisconnect = NULL;
m_fnOnErrRtnOrderAction = NULL;
m_fnOnErrRtnOrderInsert = NULL;
m_fnOnRspError = NULL;
m_fnOnRspOrderAction = NULL;
m_fnOnRspOrderInsert = NULL;
m_fnOnRspQryDepthMarketData = NULL;
m_fnOnRspQryInstrument = NULL;
m_fnOnRspQryInstrumentCommissionRate = NULL;
m_fnOnRspQryInstrumentMarginRate = NULL;
m_fnOnRspQryInvestorPosition = NULL;
m_fnOnRspQryInvestorPositionDetail = NULL;
m_fnOnRspQryOrder = NULL;
m_fnOnRspQryTrade = NULL;
m_fnOnRspQryTradingAccount = NULL;
m_fnOnRtnDepthMarketData = NULL;
m_fnOnRtnInstrumentStatus = NULL;
m_fnOnRtnOrder = NULL;
m_fnOnRtnTrade = NULL;
m_hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
}
virtual ~CTPMsgQueue(void)
{
StopThread();
Clear();
CloseHandle(m_hEvent);
}
public:
//清空队列
void Clear();
//可以由外部发起,顺序处理队列触发回调函数
bool Process();
//由内部启动线程,内部主动调用Process触发回调
void StartThread();
void StopThread();
//将外部的函数地址注册到队列(按字母排序)
void RegisterCallback(fnOnConnect pCallback){ m_fnOnConnect = pCallback; }
void RegisterCallback(fnOnDisconnect pCallback){ m_fnOnDisconnect = pCallback; }
void RegisterCallback(fnOnErrRtnOrderAction pCallback){ m_fnOnErrRtnOrderAction = pCallback; }
void RegisterCallback(fnOnErrRtnOrderInsert pCallback){ m_fnOnErrRtnOrderInsert = pCallback; }
void RegisterCallback(fnOnRspError pCallback){ m_fnOnRspError = pCallback; }
void RegisterCallback(fnOnRspOrderAction pCallback){ m_fnOnRspOrderAction = pCallback; }
void RegisterCallback(fnOnRspOrderInsert pCallback){ m_fnOnRspOrderInsert = pCallback; }
void RegisterCallback(fnOnRspQryDepthMarketData pCallback){ m_fnOnRspQryDepthMarketData = pCallback; }
void RegisterCallback(fnOnRspQryInstrument pCallback){ m_fnOnRspQryInstrument = pCallback; }
void RegisterCallback(fnOnRspQryInstrumentCommissionRate pCallback){ m_fnOnRspQryInstrumentCommissionRate = pCallback; }
void RegisterCallback(fnOnRspQryInstrumentMarginRate pCallback){ m_fnOnRspQryInstrumentMarginRate = pCallback; }
void RegisterCallback(fnOnRspQryInvestorPosition pCallback){ m_fnOnRspQryInvestorPosition = pCallback; }
void RegisterCallback(fnOnRspQryInvestorPositionDetail pCallback){ m_fnOnRspQryInvestorPositionDetail = pCallback; }
void RegisterCallback(fnOnRspQryOrder pCallback){ m_fnOnRspQryOrder = pCallback; }
void RegisterCallback(fnOnRspQryTrade pCallback){ m_fnOnRspQryTrade = pCallback; }
void RegisterCallback(fnOnRspQryTradingAccount pCallback){ m_fnOnRspQryTradingAccount = pCallback; }
void RegisterCallback(fnOnRtnDepthMarketData pCallback){ m_fnOnRtnDepthMarketData = pCallback; }
void RegisterCallback(fnOnRtnInstrumentStatus pCallback){ m_fnOnRtnInstrumentStatus = pCallback; }
void RegisterCallback(fnOnRtnOrder pCallback){ m_fnOnRtnOrder = pCallback; }
void RegisterCallback(fnOnRtnTrade pCallback){ m_fnOnRtnTrade = pCallback; }
void RegisterCallback(FunctionCallBackSet *CallbackSet);//注册所有函数
//响应结果处理后入队列(按字母排序)
void Input_OnConnect(void* pApi, CThostFtdcRspUserLoginField *pRspUserLogin, ConnectionStatus result);
void Input_OnDisconnect(void* pApi, CThostFtdcRspInfoField *pRspInfo, ConnectionStatus step);
void Input_OnErrRtnOrderAction(void* pTraderApi, CThostFtdcOrderActionField *pOrderAction, CThostFtdcRspInfoField *pRspInfo);
void Input_OnErrRtnOrderInsert(void* pTraderApi, CThostFtdcInputOrderField *pInputOrder, CThostFtdcRspInfoField *pRspInfo);
void Input_OnRspError(void* pApi, CThostFtdcRspInfoField* pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspOrderAction(void* pTraderApi, CThostFtdcInputOrderActionField *pInputOrderAction, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspOrderInsert(void* pTraderApi, CThostFtdcInputOrderField *pInputOrder, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryDepthMarketData(void* pTraderApi, CThostFtdcDepthMarketDataField *pDepthMarketData, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryInvestorPosition(void* pTraderApi, CThostFtdcInvestorPositionField *pInvestorPosition, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryInvestorPositionDetail(void* pTraderApi, CThostFtdcInvestorPositionDetailField *pInvestorPositionDetail, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryInstrument(void* pTraderApi, CThostFtdcInstrumentField *pInstrument, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryInstrumentCommissionRate(void* pTraderApi, CThostFtdcInstrumentCommissionRateField *pInstrumentCommissionRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryInstrumentMarginRate(void* pTraderApi, CThostFtdcInstrumentMarginRateField *pInstrumentMarginRate, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryOrder(void* pTraderApi, CThostFtdcOrderField *pOrder, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryTrade(void* pTraderApi, CThostFtdcTradeField *pTrade, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRspQryTradingAccount(void* pTraderApi, CThostFtdcTradingAccountField *pTradingAccount, CThostFtdcRspInfoField *pRspInfo, int nRequestID, bool bIsLast);
void Input_OnRtnDepthMarketData(void* pMdApi, CThostFtdcDepthMarketDataField *pDepthMarketData);
void Input_OnRtnInstrumentStatus(void* pTraderApi, CThostFtdcInstrumentStatusField *pInstrumentStatus);
void Input_OnRtnOrder(void* pTraderApi, CThostFtdcOrderField *pOrder);
void Input_OnRtnTrade(void* pTraderApi, CThostFtdcTradeField *pTrade);
private:
friend DWORD WINAPI ProcessThread(LPVOID lpParam);
void RunInThread();
//响应结果直接入队列
void _Input_MD(SMsgItem* pMsgItem);
void _Input_TD(SMsgItem* pMsgItem);
//队列中的消息判断分发
void _Output_MD(SMsgItem* pMsgItem);
void _Output_TD(SMsgItem* pMsgItem);
//响应输出
void Output_OnConnect(SMsgItem* pItem)
{
if (m_fnOnConnect)
(*m_fnOnConnect)(pItem->pApi, &pItem->RspUserLogin, pItem->Status);
}
void Output_OnDisconnect(SMsgItem* pItem)
{
if (m_fnOnDisconnect)
(*m_fnOnDisconnect)(pItem->pApi, &pItem->RspInfo, pItem->Status);
}
void Output_OnErrRtnOrderAction(SMsgItem* pItem)
{
if (m_fnOnErrRtnOrderAction)
(*m_fnOnErrRtnOrderAction)(pItem->pApi, &pItem->OrderAction, &pItem->RspInfo);
}
void Output_OnErrRtnOrderInsert(SMsgItem* pItem)
{
if (m_fnOnErrRtnOrderInsert)
(*m_fnOnErrRtnOrderInsert)(pItem->pApi, &pItem->InputOrder, &pItem->RspInfo);
}
void Output_OnRspError(SMsgItem* pItem)
{
if (m_fnOnRspError)
(*m_fnOnRspError)(pItem->pApi, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspOrderAction(SMsgItem* pItem)
{
if (m_fnOnRspOrderAction)
(*m_fnOnRspOrderAction)(pItem->pApi, &pItem->InputOrderAction, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspOrderInsert(SMsgItem* pItem)
{
if (m_fnOnRspOrderInsert)
(*m_fnOnRspOrderInsert)(pItem->pApi, &pItem->InputOrder, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryInvestorPosition(SMsgItem* pItem)
{
if (m_fnOnRspQryInvestorPosition)
(*m_fnOnRspQryInvestorPosition)(pItem->pApi, &pItem->InvestorPosition, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryInvestorPositionDetail(SMsgItem* pItem)
{
if (m_fnOnRspQryInvestorPositionDetail)
(*m_fnOnRspQryInvestorPositionDetail)(pItem->pApi, &pItem->InvestorPositionDetail, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryDepthMarketData(SMsgItem* pItem)
{
if (m_fnOnRspQryDepthMarketData)
(*m_fnOnRspQryDepthMarketData)(pItem->pApi, &pItem->DepthMarketData, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryInstrument(SMsgItem* pItem)
{
if (m_fnOnRspQryInstrument)
(*m_fnOnRspQryInstrument)(pItem->pApi, &pItem->Instrument, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryInstrumentCommissionRate(SMsgItem* pItem)
{
if (m_fnOnRspQryInstrumentCommissionRate)
(*m_fnOnRspQryInstrumentCommissionRate)(pItem->pApi, &pItem->InstrumentCommissionRate, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryInstrumentMarginRate(SMsgItem* pItem)
{
if (m_fnOnRspQryInstrumentMarginRate)
(*m_fnOnRspQryInstrumentMarginRate)(pItem->pApi, &pItem->InstrumentMarginRate, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryOrder(SMsgItem* pItem)
{
if (m_fnOnRspQryOrder)
(*m_fnOnRspQryOrder)(pItem->pApi, &pItem->Order, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryTrade(SMsgItem* pItem)
{
if (m_fnOnRspQryTrade)
(*m_fnOnRspQryTrade)(pItem->pApi, &pItem->Trade, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRspQryTradingAccount(SMsgItem* pItem)
{
if (m_fnOnRspQryTradingAccount)
(*m_fnOnRspQryTradingAccount)(pItem->pApi, &pItem->TradingAccount, &pItem->RspInfo, pItem->nRequestID, pItem->bIsLast);
}
void Output_OnRtnDepthMarketData(SMsgItem* pItem)
{
if (m_fnOnRtnDepthMarketData)
(*m_fnOnRtnDepthMarketData)(pItem->pApi, &pItem->DepthMarketData);
}
void Output_OnRtnInstrumentStatus(SMsgItem* pItem)
{
if (m_fnOnRtnInstrumentStatus)
(*m_fnOnRtnInstrumentStatus)(pItem->pApi, &pItem->InstrumentStatus);
}
void Output_OnRtnOrder(SMsgItem* pItem)
{
if (m_fnOnRtnOrder)
(*m_fnOnRtnOrder)(pItem->pApi, &pItem->Order);
}
void Output_OnRtnTrade(SMsgItem* pItem)
{
if (m_fnOnRtnTrade)
(*m_fnOnRtnTrade)(pItem->pApi, &pItem->Trade);
}
private:
volatile bool m_bRunning;
HANDLE m_hEvent;
HANDLE m_hThread;
MSQueue<SMsgItem*> m_queue_MD; //响应队列
MSQueue<SMsgItem*> m_queue_TD; //响应队列
//回调函数指针(按字母排序)
fnOnConnect m_fnOnConnect;
fnOnDisconnect m_fnOnDisconnect;
fnOnErrRtnOrderAction m_fnOnErrRtnOrderAction;
fnOnErrRtnOrderInsert m_fnOnErrRtnOrderInsert;
fnOnRspError m_fnOnRspError;
fnOnRspOrderAction m_fnOnRspOrderAction;
fnOnRspOrderInsert m_fnOnRspOrderInsert;
fnOnRspQryDepthMarketData m_fnOnRspQryDepthMarketData;
fnOnRspQryInstrument m_fnOnRspQryInstrument;
fnOnRspQryInstrumentCommissionRate m_fnOnRspQryInstrumentCommissionRate;
fnOnRspQryInstrumentMarginRate m_fnOnRspQryInstrumentMarginRate;
fnOnRspQryInvestorPosition m_fnOnRspQryInvestorPosition;
fnOnRspQryInvestorPositionDetail m_fnOnRspQryInvestorPositionDetail;
fnOnRspQryOrder m_fnOnRspQryOrder;
fnOnRspQryTrade m_fnOnRspQryTrade;
fnOnRspQryTradingAccount m_fnOnRspQryTradingAccount;
fnOnRtnDepthMarketData m_fnOnRtnDepthMarketData;
fnOnRtnInstrumentStatus m_fnOnRtnInstrumentStatus;
fnOnRtnOrder m_fnOnRtnOrder;
fnOnRtnTrade m_fnOnRtnTrade;
};