Skip to content

Commit

Permalink
feat: 完善自定义发送功能
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengjiaming1-tal committed Sep 19, 2023
1 parent 6ff4d51 commit 77b1bad
Show file tree
Hide file tree
Showing 22 changed files with 600 additions and 295 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ kill:
pkill bench

docker_build:
docker build -t pomelo_bench:v1.3 .
docker build -t pomelo_bench:v1.8 .

docker_run:
docker run -d --restart=always --name=pomelo_bench -p 8080:8080 -p 9101:9101 pomelo_bench:v1.3
docker run -d --restart=always --name=pomelo_bench -p 8080:8080 -p 9101:9101 pomelo_bench:v1.8

docker_save:
docker save pomelo_bench:v1.3 -o pomelo_bench_1_3.tar
docker save pomelo_bench:v1.8 -o pomelo_bench_1_8.tar

docker_load:
docker load -i pomelo_bench_1_3.tar
docker load -i pomelo_bench_1_8.tar
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Go Pomelo Bench 压测工具
# Go Pomelo Bench 多节点压测工具

工具分为两部分:

Expand All @@ -7,11 +7,10 @@

结构为:

![](docs/pomelo.jpg)
![](docs/pomelo-bench.jpg)

压测单元实际为rpc服务端,通过grpc接口实现对压测任务的创建,操作,销毁等。

前端命令行控制器提供多压测单元控制,提供建议操作命令,通过grpc客户端连接到后端压测单元,简单实现模拟分布式压测控制。
1. pomelo-bench: 压测单元实际为rpc服务端,通过grpc接口实现对压测任务的创建,操作,销毁等。
2. pomelo-cli: 前端命令行控制器提供多压测单元控制,提供建议操作命令,通过grpc客户端连接到后端压测单元,简单实现模拟分布式压测控制。

## 快速启动

Expand Down
1 change: 1 addition & 0 deletions app/bench/internal/logic/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func Statistics(a lcpomelo.Statistics) *bench.Statistics {
OnAddReceiveCount: a.OnAddReceiveCount,
OnLeaveReceiveCount: a.OnLeaveReceiveCount,
OnChatReceiveCount: a.OnChatReceiveCount,
OnChatDuration: a.OnChatDuration,
OnlineNum: a.OnlineNum,
}
}
Expand Down
52 changes: 52 additions & 0 deletions app/bench/internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metrics

import (
"encoding/json"
"sync"
"testing"
"time"
)
Expand All @@ -28,3 +29,54 @@ func TestMetrics_Execute(t *testing.T) {
t.Log(string(d))

}

func TestMetrics_AppendExecute(t *testing.T) {

m := NewMetrics("test")

n := time.Now()

wg := sync.WaitGroup{}

for i := 0; i < 1000; i++ {

wg.Add(1)

go func(index int) {
if index == 99 {

time.Sleep(1 * time.Second)
}

m.Add(time.Now().Sub(n))

wg.Done()

}(i)

}

wg.Wait()

report := m.Execute()

d, _ := json.Marshal(report)
t.Log(string(d))

t.Log(len(m.tasks))

}

func Benchmark_ADD(b *testing.B) {

me := SimpleMetrics{}
n := time.Now()

for i := 0; i < b.N; i++ {

me.Add(time.Now().Sub(n))

}

b.Log(me.Duration())
}
48 changes: 48 additions & 0 deletions app/bench/internal/metrics/simplemetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package metrics

import (
"sync"
"time"
)

type SimpleMetrics struct {
duration int64 // 总耗时
count int64
mu sync.Mutex
}

func (s *SimpleMetrics) Add(duration time.Duration) {

s.mu.Lock()
defer s.mu.Unlock()

s.count++
s.duration += int64(duration)

}
func (s *SimpleMetrics) Duration() (duration time.Duration) {

s.mu.Lock()
defer s.mu.Unlock()

if s.count == 0 {
return 0
}

duration = time.Duration(s.duration / s.count)

return duration
}

func (s *SimpleMetrics) Clear() {

s.mu.Lock()
defer s.mu.Unlock()

if s.count == 0 {
return
}

s.count = 0
s.duration = 0
}
92 changes: 77 additions & 15 deletions app/bench/internal/service/lcpomelo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/zeromicro/go-zero/core/jsonx"
"github.com/zeromicro/go-zero/core/logx"
"math/rand"
"pomelo_bench/app/bench/internal/metrics"
"pomelo_bench/pomelosdk"
"strconv"
"strings"
Expand Down Expand Up @@ -50,11 +51,12 @@ type ClientConnector struct {

sendCount *uint64
customSendCount *uint64
onServerReceiveCount *uint64 // 接收到包的数量
onAddReceiveCount *uint64 // 接收到包的数量
onLeaveReceiveCount *uint64 // 接收到包的数量
onChatReceiveCount *uint64 // 接收到包的数量
onlineNum uint64 // 房间在线人数
onServerReceiveCount *uint64 // 接收到包的数量
onAddReceiveCount *uint64 // 接收到包的数量
onLeaveReceiveCount *uint64 // 接收到包的数量
onChatReceiveCount *uint64 // 接收到包的数量
onlineNum uint64 // 房间在线人数
metrics metrics.SimpleMetrics // 接收到包的耗时统计
}

func NewClientConnector(addr string, uid int, channelId int, roomId string) *ClientConnector {
Expand Down Expand Up @@ -89,11 +91,11 @@ func NewClientConnector(addr string, uid int, channelId int, roomId string) *Cli

sendCount: &sendCount,
customSendCount: &customSendCount,
onServerReceiveCount: &onServerReceiveCount, // 接收到包的数量
onAddReceiveCount: &onAddReceiveCount, // 接收到包的数量
onLeaveReceiveCount: &onLeaveReceiveCount, // 接收到包的数量
onChatReceiveCount: &onChatReceiveCount, // 接收到包的数量

onServerReceiveCount: &onServerReceiveCount, // 接收到包的数量
onAddReceiveCount: &onAddReceiveCount, // 接收到包的数量
onLeaveReceiveCount: &onLeaveReceiveCount, // 接收到包的数量
onChatReceiveCount: &onChatReceiveCount, // 接收到包的数量
metrics: metrics.SimpleMetrics{}, // 接收到包的耗时统计
}
}

Expand Down Expand Up @@ -145,8 +147,8 @@ func (c *ClientConnector) SyncGateRequest(ctx context.Context) error {
type GateRequest struct {
Rid string `json:"rid"` // room id
Uid int `json:"uid"`
RType int `json:"rtype"`
UType int `json:"utype"`
RType int `json:"rtype"` //频道类型,授课频道=1,辅导频道=2,教师组频道=3,rtc房间频道=4
UType int `json:"utype"` // utype=0正常,utype=1旁观者
RetryTime int64 `json:"retrytime"`
}

Expand Down Expand Up @@ -213,7 +215,7 @@ func (c *ClientConnector) SyncChatEnterConnectorRequest(ctx context.Context) err
Username int `json:"username"`
Rtype int `json:"rtype"`
Rid string `json:"rid"`
Role int `json:"role"`
Role int `json:"role"` //1:学生,2:辅导,4:授课,3:旁听用户,5:游客
Ulevel int `json:"ulevel"`
Uname int `json:"uname"`
Classid string `json:"classid"`
Expand All @@ -235,7 +237,7 @@ func (c *ClientConnector) SyncChatEnterConnectorRequest(ctx context.Context) err

Rtype: c.channelId,
Rid: c.roomId,
Role: 1,
Role: 1, //1:学生,2:辅导,4:授课,3:旁听用户,5:游客
Ulevel: 1,
Classid: c.roomId,
Mtcv: "0.0.1",
Expand Down Expand Up @@ -323,13 +325,62 @@ func (c *ClientConnector) onEvent() {
c.pomeloChatConnector.On(Event_OnChat, func(data []byte) {
atomic.AddUint64(c.onChatReceiveCount, 1)

logx.Infof("[%d] onChat,data: %s", c.uid, string(data))
duration, err := getOnChatDurationFromContent(data)
if err != nil {
logx.Infof("[%d] onChat,duration: null ,data: %s", c.uid, string(data))
} else {

c.metrics.Add(duration)

logx.Infof("[%d] onChat,duration: %s , data: %s", c.uid, duration.String(), string(data))
}

ack(data)
})

}

func getOnChatDurationFromContent(data []byte) (time.Duration, error) {
type onChat struct {
//Route string `json:"route"`
Msg string `json:"msg"`
//From string `json:"from"`
//Target string `json:"target"`
//MsgId int `json:"msgId"`
}

type onChatMessage struct {
Ctime string `json:"ctime"`
SendTime int `json:"sendTime"`
}

chat := onChat{}
err := json.Unmarshal(data, &chat)
if err != nil {
return 0, err
}

msg := onChatMessage{}

err = json.Unmarshal([]byte(chat.Msg), &msg)
if err != nil {
return 0, err
}

ctime, err := strconv.Atoi(msg.Ctime)
if err != nil {
return 0, err

}

sendTime := time.UnixMilli(int64(ctime))

duration := time.Since(sendTime)

return duration, nil

}

func (c *ClientConnector) AsyncCustomSend(ctx context.Context, route string, data []byte, cb pomelosdk.Callback) error {
if c.pomeloChatConnectorConnected == false {
return errors.New("ChatConnector not connected")
Expand Down Expand Up @@ -387,3 +438,14 @@ func (c *ClientConnector) AsyncChatSendMessage(ctx context.Context, message stri

return nil
}

func (c *ClientConnector) ClearMetrics() {
c.metrics.Clear()

atomic.SwapUint64(c.sendCount, 0)
atomic.SwapUint64(c.customSendCount, 0)
atomic.SwapUint64(c.onServerReceiveCount, 0)
atomic.SwapUint64(c.onAddReceiveCount, 0)
atomic.SwapUint64(c.onLeaveReceiveCount, 0)
atomic.SwapUint64(c.onChatReceiveCount, 0)
}
2 changes: 1 addition & 1 deletion app/bench/internal/service/lcpomelo/client_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *ClientConnector) runAndWaitConnect(ctx context.Context, connector *pome
err = connector.Run(ctx2, address, 10)
if err != nil {

logx.WithContext(ctx).Errorf("[%d] pomelo Connector.Run failed ,err:%s", c.uid, err)
logx.WithContext(ctx).Infof("[%d] pomelo Connector.Run failed ,err:%s", c.uid, err)

if failed != nil {
failed(err.Error())
Expand Down
3 changes: 3 additions & 0 deletions app/bench/internal/service/lcpomelo/client_detail.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (c *ClientConnector) Detail(ctx context.Context) ClientDetail {
OnAddReceiveCount: atomic.LoadUint64(c.onAddReceiveCount),
OnLeaveReceiveCount: atomic.LoadUint64(c.onLeaveReceiveCount),
OnChatReceiveCount: atomic.LoadUint64(c.onChatReceiveCount),
OnChatDuration: int64(c.metrics.Duration()),
},
}
}
Expand Down Expand Up @@ -58,6 +59,7 @@ type Statistics struct {
OnAddReceiveCount uint64
OnLeaveReceiveCount uint64
OnChatReceiveCount uint64
OnChatDuration int64 // 接收到包的耗时统计
}

func (s *Statistics) Add(b Statistics) {
Expand All @@ -68,6 +70,7 @@ func (s *Statistics) Add(b Statistics) {
s.OnAddReceiveCount += b.OnAddReceiveCount
s.OnLeaveReceiveCount += b.OnLeaveReceiveCount
s.OnChatReceiveCount += b.OnChatReceiveCount
s.OnChatDuration += b.OnChatDuration
}

type ConnectorDetail struct {
Expand Down
13 changes: 13 additions & 0 deletions app/bench/internal/service/planmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func NewManager() *Manager {
func (m *Manager) CreatePlan(cfg *bench.Plan) string {
uid := uuid.NewString()

if len(m.plans) == 0 {
uid = "7f0f850e-2933-4f03-87c0-da1768d51zjm"
}

p := NewPlan(uid, cfg)

m.plans[uid] = p
Expand All @@ -49,12 +53,21 @@ func (m *Manager) ListPlan() (infos []PlanInfo) {
totalStatistics lcpomelo.Statistics
// Connector 客户端链接情况
connector bench.ConnectorStatus

lastOnChat uint64
)

for i := 0; i < len(detail.Connectors); i++ {

totalStatistics.Add(detail.Connectors[i].Statistics)

if lastOnChat != detail.Connectors[i].Statistics.OnChatReceiveCount {
logx.Errorf("not equal onChat,uid:%d ,lastOnChat:%d , OnChatReceiveCount:%d",
detail.Connectors[i].Uid, lastOnChat, detail.Connectors[i].Statistics.OnChatReceiveCount)
}

lastOnChat = detail.Connectors[i].Statistics.OnChatReceiveCount

if detail.Connectors[i].PomeloGate.ConnectorConnected {
connector.GateConnector++
}
Expand Down
Loading

0 comments on commit 77b1bad

Please sign in to comment.