forked from feixiaobo/go-xxl-job-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgetty_client.go
98 lines (87 loc) · 2.38 KB
/
getty_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package xxl
import (
"fmt"
"github.com/dubbogo/getty"
"github.com/dubbogo/getty/demo/util"
"github.com/dubbogo/gost/sync"
"github.com/tangjun2012/go-xxl-job-client/handler"
"github.com/tangjun2012/go-xxl-job-client/transport"
"net"
"strconv"
"time"
)
const (
cronPeriod = 20e9 / 1e6
queueLen = 4
queueNum = 4
queuePool = 16
maxMsgLen = 102400
wqLen = 512
keepAliveTime = 3 * time.Minute
writeTimeout = 5 * time.Second
ReadBufferSize = 262144
writeBufferSize = 65536
)
type GettyClient struct {
PkgHandler *handler.PackageHandler
EventListener *handler.MessageHandler
}
func NewGettyClient() *GettyClient {
return &GettyClient{
PkgHandler: handler.NewPackageHandler(),
EventListener: &handler.MessageHandler{
GettyClient: &transport.GettyRPCClient{},
},
}
}
func (c *GettyClient) Run(port, taskSize int) {
portStr := ":" + strconv.Itoa(port)
server := getty.NewTCPServer(
getty.WithLocalAddress(portStr),
)
server.RunEventLoop(func(session getty.Session) error {
taskPool := gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(taskSize*queueLen),
gxsync.WithTaskPoolTaskQueueNumber(taskSize+queueNum),
gxsync.WithTaskPoolTaskPoolSize(taskSize*queuePool),
)
err := c.initialSession(session)
if err != nil {
return err
}
session.SetTaskPool(taskPool)
return err
})
util.WaitCloseSignals(server)
}
func (c *GettyClient) initialSession(session getty.Session) (err error) {
tcpConn, ok := session.Conn().(*net.TCPConn)
if !ok {
panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn()))
}
if err = tcpConn.SetNoDelay(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err = tcpConn.SetKeepAlivePeriod(keepAliveTime); err != nil {
return err
}
if err = tcpConn.SetReadBuffer(ReadBufferSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(writeBufferSize); err != nil { //考虑查看日志时候返回数据可能会多,会不会太小?
return err
}
session.SetName("tcp")
session.SetMaxMsgLen(maxMsgLen)
session.SetWQLen(wqLen)
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(writeTimeout)
session.SetCronPeriod(int(cronPeriod))
session.SetWaitTime(time.Second)
session.SetPkgHandler(c.PkgHandler)
session.SetEventListener(c.EventListener)
return err
}