Skip to content

Commit

Permalink
Merge pull request #3 from barryz/fix_code
Browse files Browse the repository at this point in the history
optimize how mutexes are used
  • Loading branch information
barryz authored May 15, 2018
2 parents 4161d12 + 752f3fe commit 7c97e19
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 42 deletions.
40 changes: 17 additions & 23 deletions falcon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
exchangePrefix = "rabbitmq.exchange."
)

// MetaData ...
// MetaData meta data
type MetaData struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Expand All @@ -32,7 +32,7 @@ type MetaData struct {
Step int64 `json:"step"`
}

// NewMetric ...
// NewMetric create an new metric
func NewMetric(name string, value interface{}, tags string) *MetaData {
host := g.GetHost()
return &MetaData{
Expand All @@ -52,7 +52,7 @@ func (m *MetaData) String() string {
return s
}

// SetValue ...
// SetValue value setter
func (m *MetaData) SetValue(v interface{}) {
m.Value = v
}
Expand All @@ -64,7 +64,7 @@ func trimFloat(s float64) float64 {
return s
}

func calcpct(l, t int64) (pct float64) {
func calcPercentage(l, t int64) (pct float64) {
if t == 0 {
return
}
Expand All @@ -73,9 +73,9 @@ func calcpct(l, t int64) (pct float64) {
return
}

func qstats(s string) int64 {
var alivequeue = g.Config().Qrunning
for _, i := range alivequeue {
func qStats(s string) int64 {
var aliveQueue = g.Config().Qrunning
for _, i := range aliveQueue {
if strings.Contains(strings.ToLower(s), i) {
return 1
}
Expand All @@ -98,7 +98,6 @@ func partitions(s []string) int64 {
return 1
default:
return 0

}
}

Expand All @@ -122,7 +121,7 @@ func GetCurrentStatsDB() *g.StatsDB {
return statsDB
}

// handleJudge 根据节点的角色判断push哪些指标
// handleJudge
func handleJudge() (data []*MetaData) {
data = make([]*MetaData, 0)
nd, err := funcs.GetNode()
Expand All @@ -131,7 +130,6 @@ func handleJudge() (data []*MetaData) {
return
}

// 设置节点指标, 集群中所有的节点都需要采集的指标
data = append(data, NewMetric(overviewPrefix+"ioReadawait", nd.Rawait, "")) // io_read_avg_wait_time
data = append(data, NewMetric(overviewPrefix+"ioWriteawait", nd.Wawait, "")) // io_write_avg_wait_time
data = append(data, NewMetric(overviewPrefix+"ioSyncawait", nd.Syncawait, "")) // io_sync_avg_wait_time
Expand All @@ -145,10 +143,10 @@ func handleJudge() (data []*MetaData) {
data = append(data, NewMetric(overviewPrefix+"memBinary", nd.Binary, ""))
data = append(data, NewMetric(overviewPrefix+"memAlarm", nd.MemAlarmStatus(), ""))
data = append(data, NewMetric(overviewPrefix+"diskAlarm", nd.DiskAlarmStatus(), ""))
data = append(data, NewMetric(overviewPrefix+"fdUsedPct", calcpct(nd.FdUsed, nd.FdTotal), ""))
data = append(data, NewMetric(overviewPrefix+"memUsedPct", calcpct(nd.MemUsed, nd.MemLimit), ""))
data = append(data, NewMetric(overviewPrefix+"socketUsedPct", calcpct(nd.SocketsUsed, nd.SocketsTotal), ""))
data = append(data, NewMetric(overviewPrefix+"erlProcsUsedPct", calcpct(nd.ErlProcUsed, nd.ErlProcTotal), "")) //消费生产比
data = append(data, NewMetric(overviewPrefix+"fdUsedPct", calcPercentage(nd.FdUsed, nd.FdTotal), ""))
data = append(data, NewMetric(overviewPrefix+"memUsedPct", calcPercentage(nd.MemUsed, nd.MemLimit), ""))
data = append(data, NewMetric(overviewPrefix+"socketUsedPct", calcPercentage(nd.SocketsUsed, nd.SocketsTotal), ""))
data = append(data, NewMetric(overviewPrefix+"erlProcsUsedPct", calcPercentage(nd.ErlProcUsed, nd.ErlProcTotal), "")) //消费生产比
data = append(data, NewMetric(overviewPrefix+"runQueue", nd.RunQueues, ""))
data = append(data, NewMetric(overviewPrefix+"isPartition", partitions(nd.Partitions), "")) // 是否发生网络分区

Expand All @@ -161,30 +159,26 @@ func handleJudge() (data []*MetaData) {

updateCurrentStatsDB(ov.StatisticsDbNode)

// 判断是否为统计节点,如果为统计节点,则push所有数据; 反之,则push节点数据
// RabbitMQ Version Compatibility: (<= 3.6.x)
if ov.StatisticsDbNode == currentNode || len(ov.StatisticsDbNode) == 0 {
// 获取channel耗时
channelCost, err := funcs.GetChannelCost()
if err != nil {
log.Println(err.Error())
return
}

// 获取aliveness接口数据
aliveness, err := funcs.GetAlive()
if err != nil {
log.Printf("get aliveness api failed due to %s", err.Error())
return
}

// 获取queue监控数据
queues, err := funcs.GetQueues()
if err != nil {
log.Printf("get queue api failed due to %s", err.Error())
return
}

// 获取exchange监控数据
exchs, err := funcs.GetExchanges()
if err != nil {
log.Printf("get exchange api failed due to %s", err.Error())
Expand All @@ -209,7 +203,7 @@ func handleJudge() (data []*MetaData) {
data = append(data, NewMetric(overviewPrefix+"redeliverRate", ov.RedeliverRates.Rate, ""))
data = append(data, NewMetric(overviewPrefix+"ackRate", ov.AckRates.Rate, ""))
data = append(data, NewMetric(overviewPrefix+"getChannelCost", channelCost, "")) // 获取channel耗时
data = append(data, NewMetric(overviewPrefix+"dpRatio", calcpct(int64(ov.DeliverGetRates.Rate), int64(ov.PublishRates.Rate)), ""))
data = append(data, NewMetric(overviewPrefix+"dpRatio", calcPercentage(int64(ov.DeliverGetRates.Rate), int64(ov.PublishRates.Rate)), ""))
data = append(data, NewMetric(overviewPrefix+"isAlive", isAliveness(aliveness.Status), "")) // 读写判断
data = append(data, NewMetric(overviewPrefix+"isUp", 1, ""))

Expand All @@ -225,8 +219,8 @@ func handleJudge() (data []*MetaData) {
data = append(data, NewMetric(queuePrefix+"memory", q.Memory, tags))
data = append(data, NewMetric(queuePrefix+"consumers", q.Consumers, tags))
data = append(data, NewMetric(queuePrefix+"consumer_utilisation", consumerUtil(q.ConsumerUtil), tags))
data = append(data, NewMetric(queuePrefix+"status", qstats(q.Status), tags))
data = append(data, NewMetric(queuePrefix+"dpratio", calcpct(int64(q.DeliverGet.Rate), int64(q.Publish.Rate)), tags))
data = append(data, NewMetric(queuePrefix+"status", qStats(q.Status), tags))
data = append(data, NewMetric(queuePrefix+"dpratio", calcPercentage(int64(q.DeliverGet.Rate), int64(q.Publish.Rate)), tags))
}

for _, e := range exchs {
Expand All @@ -246,7 +240,7 @@ func handleSickRabbit() (data []*MetaData) {
return
}

// Collector ...
// Collector collect metrics
func Collector() {
var m []*MetaData

Expand Down
2 changes: 1 addition & 1 deletion falcon/senddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func sendData(data []*MetaData) (resp []byte, err error) {
}

func sendDatas(m []*MetaData) {
// 根据batchsize发送metrics
// batch-size specified.
limit, lens := g.Config().Batchsize, len(m)
if lens >= limit {
offset := lens % limit
Expand Down
6 changes: 0 additions & 6 deletions funcs/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func GetExchanges() (exchs []*ExchangeInfo, err error) {
res, err1 := g.RabbitAPI(service)
if err1 != nil {
err = err1
// 获取exchange无论发生什么错误 ,直接返回
return
}

Expand All @@ -86,20 +85,15 @@ func GetExchanges() (exchs []*ExchangeInfo, err error) {
}

for _, e := range es {
// 若exchange为默认, 则重命名为"DEFAULT_EXCHANGE"
if e.Name == "" {
e.Name = "DEFAULT_EXCHANGE"
}

// 不处理amq.开头的内置exchange
if strings.Contains(e.Name, "amq.") {
continue
}
exchs = append(exchs, e)
}

// add one slice to another
// exchs = append(exchs, es...)
}

return
Expand Down
4 changes: 2 additions & 2 deletions funcs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ type NodeStats struct {
DiskAlarm bool `json:"disk_free_alarm"`
}

// MemAlarmStatus 内存告警指标
// MemAlarmStatus memory alarm status
func (n *NodeStats) MemAlarmStatus() int {
if n.MemAlarm {
return 0
}
return 1
}

// DiskAlarmStatus 磁盘告警指标
// DiskAlarmStatus disc alarm status
func (n *NodeStats) DiskAlarmStatus() int {
if n.DiskAlarm {
return 0
Expand Down
2 changes: 1 addition & 1 deletion witch/system/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *Launcher) readPid() (int, bool) {
}

func (s *Launcher) pidAlive(pid int) bool {
return (syscall.Kill(pid, 0) == nil)
return syscall.Kill(pid, 0) == nil
}

// IsAlive check if the process alive.
Expand Down
6 changes: 3 additions & 3 deletions witch/system/statsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ func NewStatsDBCtl() *StatsDBCtl {
// Reset reset the RabbitMQ statsdb
func (s *StatsDBCtl) Reset() (bool, string, error) {
output, err := ExecCommand(s.name, []string{"eval", "application:stop(rabbitmq_management), application:start(rabbitmq_management)."})
return (err == nil), output, err
return err == nil, output, err
}

// Terminate terminate the RabbitMQ statsdb
func (s *StatsDBCtl) Terminate() (bool, string, error) {
output, err := ExecCommand(s.name, []string{"eval", "exit(erlang:whereis(rabbit_mgmt_db), please_terminate)."})
return (err == nil), output, err
return err == nil, output, err
}

// Crash crash the RabbitMQ statsdb
func (s *StatsDBCtl) Crash() (bool, string, error) {
output, err := ExecCommand(s.name, []string{"eval", "exit(erlang:whereis(rabbit_mgmt_db), please_crash)."})
return (err == nil), output, err
return err == nil, output, err
}
6 changes: 3 additions & 3 deletions witch/system/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ func (s *Supervisor) IsAlive() (int, bool) {
// Start executes `supervisorctl start [service]`
func (s *Supervisor) Start() (bool, error) {
_, err := ExecCommand(s.name, []string{"start", s.service})
return (err == nil), err
return err == nil, err
}

// Restart executes `supervisorctl restart [service]`
func (s *Supervisor) Restart() (bool, error) {
_, err := ExecCommand(s.name, []string{"restart", s.service})
return (err == nil), err
return err == nil, err
}

// Stop executes `supervisorctl stop [service]`
func (s *Supervisor) Stop() bool {
_, err := ExecCommand(s.name, []string{"stop", s.service})
return (err == nil)
return err == nil
}
6 changes: 3 additions & 3 deletions witch/system/systemd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ func (s *Systemd) IsAlive() (int, bool) {
// Start executes `systemctl start [service]`
func (s *Systemd) Start() (bool, error) {
_, err := ExecCommand(s.name, []string{"start", s.service})
return (err == nil), err
return err == nil, err
}

// Restart executes `systemctl restart [service]`
func (s *Systemd) Restart() (bool, error) {
_, err := ExecCommand(s.name, []string{"restart", s.service})
return (err == nil), err
return err == nil, err
}

// Stop executes `systemctl stop [service]`
func (s *Systemd) Stop() bool {
_, err := ExecCommand(s.name, []string{"stop", s.service})
return (err == nil)
return err == nil
}

0 comments on commit 7c97e19

Please sign in to comment.