Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在common中新增加了监控channel长度的功能,并且把inspector的channel监控起来 #23

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (d *db) GetAlarmRecords(eventID int64, order, limit string) (records []*Ala
// GetTriggersRecords 获取报警事件下的表达式组
func (d *db) GetTriggersRecords(eventID int64, count int) []*TriggerEventRecord {
triggers := []*TriggerEventRecord{}
rawSQL := "SELECT * FROM trigger_event_record WHERE strategy_event_id = ? AND count = ?"
rawSQL := "SELECT * FROM trigger_event_record WHERE strategy_event_id = ? AND count = ? AND triggered=TRUE"
if err := d.Select(&triggers, rawSQL, eventID, count); err != nil {
log.Println(err)
return nil
Expand Down
80 changes: 80 additions & 0 deletions common/chanMonitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package chanMonitor

import (
"fmt"
"reflect"
"sync"
)

var chans = make(map[key]interface{})
var chmu sync.RWMutex

// AddNamed adds a channel to be monitor and associates the channel
// with this name and, optionally, the instance of this named channel (there may be many)
func AddNamed(name, instance string, channel interface{}) error {

//reflect on the input to get the correct channel type.
if reflect.TypeOf(channel).Kind() != reflect.Chan {
return fmt.Errorf("invalid input type %v for input param channel, must be of type chan", channel)
}

chmu.Lock()
defer chmu.Unlock()

k := key{name: name, instance: instance}

if _, found := chans[k]; found {
return fmt.Errorf("channel with name: %s already being monitored.", name)
}
chans[k] = channel

return nil
}

// ChanState struct holding Length and Capacity.
type ChanState struct {
Len int `json:"length"`
Cap int `json:"capacity"`
Instance string `json:"instance"`
}

type key struct {
name string
instance string
}

// Get returns the channel state for a give channel name.
func Get(name, instance string) *ChanState {

chmu.RLock()
defer chmu.RUnlock()

k := key{name: name, instance: instance}

ch, found := chans[k]
if !found {
return nil
}

return &ChanState{
Len: reflect.ValueOf(ch).Len(),
Cap: reflect.ValueOf(ch).Cap(),
Instance: k.instance,
}

}

// Get the channel states map[string]*ChanState of all the monitored channels. Keyed by channel name.
func GetAll() map[string]*ChanState {

results := make(map[string]*ChanState)

chmu.RLock()
defer chmu.RUnlock()
for k, _ := range chans {
results[k.name] = Get(k.name, k.instance)
}

return results

}
62 changes: 62 additions & 0 deletions common/chanMonitor/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package chanMonitor

import (
"encoding/json"
"log"
"net/http"
)

type Service struct {
url string
name string
}

func New(serviceName string, url string) *Service {

return &Service{
url: url,
name: serviceName,
}
}

func (this *Service) Start() {
http.HandleFunc("/channels", this.chanHandler)
go func() {
if err := this.start(); err != nil {
panic(err)
}
}()
}

func (this *Service) start() error {
return http.ListenAndServe(this.url, nil)
}

func (this *Service) chanHandler(w http.ResponseWriter, r *http.Request) {
chStats := GetAll()

resp := &ServiceChannelsStatus{
Service: this.name,
Channels: chStats,
}

jsonResp, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write(nil)
log.Printf("Error: %#v", err)
}

w.Header().Add("Content-Type", "application/json")
w.Write(jsonResp)
}

type ServiceChannelsStatus struct {
Service string `json:"service"`
Channels map[string]*ChanState `json:"channels"`
}

type Config struct {
Name string
Url string
}
5 changes: 5 additions & 0 deletions inspector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"fmt"
"os"
chm "owl/common/chanMonitor"
"path/filepath"
"runtime"
)
Expand Down Expand Up @@ -33,5 +34,9 @@ func main() {
fmt.Println("failed to init inspector:", err)
return
}

chm.AddNamed("inspector.resultPool.results", "owl-inspector", inspector.resultPool.results)
chm.AddNamed("inspector.taskPool.tasks", "owl-inspector", inspector.taskPool.tasks)
chm.New("owl-inspector", ":20001").Start()
select {}
}