Skip to content

Commit

Permalink
feat: 引入ants协程池降低开销
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengjiaming1-tal committed Aug 22, 2023
1 parent b9556ad commit 529b46d
Show file tree
Hide file tree
Showing 28 changed files with 937 additions and 472 deletions.
19 changes: 19 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright (c) 2021 revzim

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
18 changes: 17 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,28 @@ build:
cd app/bench && go build
cd app/bench_cli && go build

build_linux:
cd app/bench && GOOS=linux GOARCH=amd64 go build
cd app/bench_cli && GOOS=linux GOARCH=amd64 go build

run_bench:
nohup app/bench/bench -f app/bench/etc/bench.yaml &
nohup app/bench/bench -f app/bench/etc/bench2.yaml &
#nohup app/bench/bench -f app/bench/etc/bench2.yaml &

run_bench_cli:
cd app/bench_cli && ./bench_cli

kill:
pkill bench

docker_build:
docker build -t pomelo_bench:v1.3 .

docker_run:
docker run -d --restart=always --name=pomelo_bench -p 8080:8080 -p 9101:9101 pomelo_bench:v1.3

docker_save:
docker save pomelo_bench:v1.3 -o pomelo_bench_1_3.tar

docker_load:
docker load -i pomelo_bench_1_3.tar
54 changes: 32 additions & 22 deletions app/bench/benchclient/bench.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions app/bench/internal/logic/clearstatisticslogic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package logic

import (
"context"
"pomelo_bench/app/bench/internal/service/planmanager"

"pomelo_bench/app/bench/internal/svc"
"pomelo_bench/pb/bench"

"github.com/zeromicro/go-zero/core/logx"
)

type ClearStatisticsLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}

func NewClearStatisticsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ClearStatisticsLogic {
return &ClearStatisticsLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}

// ClearStatistics 清理任务指标
func (l *ClearStatisticsLogic) ClearStatistics(in *bench.ClearStatisticsRequest) (*bench.ClearStatisticsResponse, error) {

var (
plans []*planmanager.Plan
)

if in.Uuid != nil { // 说明单发

p, err := l.svcCtx.PlanManager.GetPlan(in.GetUuid())
if err != nil {
return nil, err
}

plans = []*planmanager.Plan{p}

} else {
plans = l.svcCtx.PlanManager.GetAllPlan()
}

for _, p := range plans {

p.ClearMetrics(l.ctx)
}

return &bench.ClearStatisticsResponse{}, nil
}
28 changes: 4 additions & 24 deletions app/bench/internal/logic/customsendlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,9 @@ func NewCustomSendLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Custom
// CustomSend 自定义消息发送
func (l *CustomSendLogic) CustomSend(in *bench.CustomSendRequest) (*bench.CustomSendResponse, error) {

var (
plans []*planmanager.Plan
)
err := l.svcCtx.PlanManager.GroupDo(in.Uuid, func(plan *planmanager.Plan) error {
return plan.PlanCustomSend(l.ctx, in.Pool, in.Number, in.Limit, in.Duration)
})

if in.Uuid != nil { // 说明单发

p, err := l.svcCtx.PlanManager.GetPlan(in.GetUuid())
if err != nil {
return nil, err
}

plans = []*planmanager.Plan{p}

} else {
plans = l.svcCtx.PlanManager.GetAllPlan()
}

for _, p := range plans {
err := p.PlanCustomSend(l.ctx, in.Pool, in.Number, in.Limit, in.Duration)
if err != nil {
return nil, err
}
}

return &bench.CustomSendResponse{}, nil
return &bench.CustomSendResponse{}, err
}
28 changes: 4 additions & 24 deletions app/bench/internal/logic/sendchatlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,9 @@ func NewSendChatLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SendChat
// SendChat 快速发送消息
func (l *SendChatLogic) SendChat(in *bench.SendChatRequest) (*bench.SendChatResponse, error) {

var (
plans []*planmanager.Plan
)
err := l.svcCtx.PlanManager.GroupDo(in.Uuid, func(plan *planmanager.Plan) error {
return plan.PlanSendChat(l.ctx, in.Message, in.Number, in.Limit, in.Duration)
})

if in.Uuid != nil { // 说明单发

p, err := l.svcCtx.PlanManager.GetPlan(in.GetUuid())
if err != nil {
return nil, err
}

plans = []*planmanager.Plan{p}

} else {
plans = l.svcCtx.PlanManager.GetAllPlan()
}

for _, p := range plans {
err := p.PlanSendChat(l.ctx, in.Message, in.Number, in.Limit, in.Duration)
if err != nil {
return nil, err
}
}

return &bench.SendChatResponse{}, nil
return &bench.SendChatResponse{}, err
}
15 changes: 4 additions & 11 deletions app/bench/internal/logic/startplanlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"pomelo_bench/app/bench/internal/logic/transform"
"pomelo_bench/app/bench/internal/svc"
"pomelo_bench/pb/bench"
"time"
)

type StartPlanLogic struct {
Expand All @@ -32,18 +33,10 @@ func (l *StartPlanLogic) StartPlan(in *bench.StartPlanRequest) (*bench.StartPlan
return nil, err
}

l.Info("创建任务成功,准备通过网关链接获取chat地址")
l.Info("创建任务成功,准备通过网关链接获取chat地址并进入chat房间")

// 通过网关链接获取chat地址
err = plan.PlanQueryGate(l.ctx)
if err != nil {
return nil, err
}

l.Info("通过网关链接获取chat地址成功,准备进入chat房间")

// 进入chat房间
err = plan.PlanConnectEntry(l.ctx)
// 通过网关链接获取chat地址 and 进入chat房间
err = plan.PlanQueryGateAndEnter(l.ctx, time.Duration(in.Plan.Timeout)*time.Second)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions app/bench/internal/server/benchserver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions app/bench/internal/service/lcpomelo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ func NewClientConnector(addr string, uid int, channelId int, roomId string) *Cli
}

// RunGateConnectorAndWaitConnect 初始化GateConnector握手
func (c *ClientConnector) RunGateConnectorAndWaitConnect(ctx context.Context) error {
func (c *ClientConnector) RunGateConnectorAndWaitConnect(ctx context.Context, timeout time.Duration) error {
if !c.pomeloGateConnectorConnected {

err := c.runAndWaitConnect(ctx, c.pomeloGateConnector, c.pomeloGateAddress, nil)
err := c.runAndWaitConnect(ctx, c.pomeloGateConnector, c.pomeloGateAddress, timeout, nil)
if err != nil {
return err
return errors.New(fmt.Sprintf("GateConnector tailed, %s", err.Error()))
}

c.pomeloGateConnectorConnected = true
Expand All @@ -113,21 +113,21 @@ func (c *ClientConnector) RunGateConnectorAndWaitConnect(ctx context.Context) er
}

// RunChatConnectorAndWaitConnect 初始化GateConnector握手
func (c *ClientConnector) RunChatConnectorAndWaitConnect(ctx context.Context) error {
func (c *ClientConnector) RunChatConnectorAndWaitConnect(ctx context.Context, timeout time.Duration) error {
if c.pomeloChatAddress == "" {
return errors.New("invalid pomelo chat address")
}

if !c.pomeloChatConnectorConnected {

err := c.runAndWaitConnect(ctx, c.pomeloChatConnector, c.pomeloChatAddress, func(message string) {
err := c.runAndWaitConnect(ctx, c.pomeloChatConnector, c.pomeloChatAddress, timeout, func(message string) {

c.pomeloChatConnectorConnected = false

logx.Error("chat connector failed, err:", message)
})
if err != nil {
return err
return errors.New(fmt.Sprintf("ChatConnector tailed, %s", err.Error()))
}

c.pomeloChatConnectorConnected = true
Expand Down
10 changes: 7 additions & 3 deletions app/bench/internal/service/lcpomelo/client_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// runAndWaitConnect Connector 初始化握手信息和保持连接
func (c *ClientConnector) runAndWaitConnect(ctx context.Context, connector *pomelosdk.Connector, address string, failed func(message string)) error {
func (c *ClientConnector) runAndWaitConnect(ctx context.Context, connector *pomelosdk.Connector, address string, timeout time.Duration, failed func(message string)) error {
err := connector.InitReqHandshake("0.6.0", "golang-websocket", nil, map[string]interface{}{"uid": "dude"})
if err != nil {
return err
Expand All @@ -33,7 +33,11 @@ func (c *ClientConnector) runAndWaitConnect(ctx context.Context, connector *pome

go func() {

err = connector.Run(address, 10)
// 增加超时时间
ctx2, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

err = connector.Run(ctx2, address, 10)
if err != nil {

logx.WithContext(ctx).Errorf("[%d] pomelo Connector.Run failed ,err:%s", c.uid, err)
Expand All @@ -45,7 +49,7 @@ func (c *ClientConnector) runAndWaitConnect(ctx context.Context, connector *pome
}
}()

_, ok := cond.WaitWithTimeout(10 * time.Second)
_, ok := cond.WaitWithTimeout(timeout + 5*time.Second)
if !ok {
return errors.New("run timeout")
}
Expand Down
Loading

0 comments on commit 529b46d

Please sign in to comment.