diff --git a/api/mysql.go b/api/mysql.go index a880f1a..654c68b 100644 --- a/api/mysql.go +++ b/api/mysql.go @@ -450,7 +450,7 @@ func (d *db) GetAlarmRecords(eventID int64, order, limit string) (records []*Ala // GetTriggersRecords 获取报警事件下的表达式组 func (d *db) GetTriggersRecords(eventID int64, count int) []*TriggerEventRecord { triggers := []*TriggerEventRecord{} - rawSQL := "SELECT * FROM trigger_event_record WHERE strategy_event_id = ? AND count = ?" + rawSQL := "SELECT * FROM trigger_event_record WHERE strategy_event_id = ? AND count = ? AND triggered=TRUE" if err := d.Select(&triggers, rawSQL, eventID, count); err != nil { log.Println(err) return nil diff --git a/common/chanMonitor/monitor.go b/common/chanMonitor/monitor.go new file mode 100755 index 0000000..b57f1b5 --- /dev/null +++ b/common/chanMonitor/monitor.go @@ -0,0 +1,80 @@ +package chanMonitor + +import ( + "fmt" + "reflect" + "sync" +) + +var chans = make(map[key]interface{}) +var chmu sync.RWMutex + +// AddNamed adds a channel to be monitor and associates the channel +// with this name and, optionally, the instance of this named channel (there may be many) +func AddNamed(name, instance string, channel interface{}) error { + + //reflect on the input to get the correct channel type. + if reflect.TypeOf(channel).Kind() != reflect.Chan { + return fmt.Errorf("invalid input type %v for input param channel, must be of type chan", channel) + } + + chmu.Lock() + defer chmu.Unlock() + + k := key{name: name, instance: instance} + + if _, found := chans[k]; found { + return fmt.Errorf("channel with name: %s already being monitored.", name) + } + chans[k] = channel + + return nil +} + +// ChanState struct holding Length and Capacity. +type ChanState struct { + Len int `json:"length"` + Cap int `json:"capacity"` + Instance string `json:"instance"` +} + +type key struct { + name string + instance string +} + +// Get returns the channel state for a give channel name. +func Get(name, instance string) *ChanState { + + chmu.RLock() + defer chmu.RUnlock() + + k := key{name: name, instance: instance} + + ch, found := chans[k] + if !found { + return nil + } + + return &ChanState{ + Len: reflect.ValueOf(ch).Len(), + Cap: reflect.ValueOf(ch).Cap(), + Instance: k.instance, + } + +} + +// Get the channel states map[string]*ChanState of all the monitored channels. Keyed by channel name. +func GetAll() map[string]*ChanState { + + results := make(map[string]*ChanState) + + chmu.RLock() + defer chmu.RUnlock() + for k, _ := range chans { + results[k.name] = Get(k.name, k.instance) + } + + return results + +} diff --git a/common/chanMonitor/service.go b/common/chanMonitor/service.go new file mode 100755 index 0000000..100ee66 --- /dev/null +++ b/common/chanMonitor/service.go @@ -0,0 +1,62 @@ +package chanMonitor + +import ( + "encoding/json" + "log" + "net/http" +) + +type Service struct { + url string + name string +} + +func New(serviceName string, url string) *Service { + + return &Service{ + url: url, + name: serviceName, + } +} + +func (this *Service) Start() { + http.HandleFunc("/channels", this.chanHandler) + go func() { + if err := this.start(); err != nil { + panic(err) + } + }() +} + +func (this *Service) start() error { + return http.ListenAndServe(this.url, nil) +} + +func (this *Service) chanHandler(w http.ResponseWriter, r *http.Request) { + chStats := GetAll() + + resp := &ServiceChannelsStatus{ + Service: this.name, + Channels: chStats, + } + + jsonResp, err := json.Marshal(resp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write(nil) + log.Printf("Error: %#v", err) + } + + w.Header().Add("Content-Type", "application/json") + w.Write(jsonResp) +} + +type ServiceChannelsStatus struct { + Service string `json:"service"` + Channels map[string]*ChanState `json:"channels"` +} + +type Config struct { + Name string + Url string +} diff --git a/inspector/main.go b/inspector/main.go index 35b33cc..792be23 100644 --- a/inspector/main.go +++ b/inspector/main.go @@ -6,6 +6,7 @@ package main import ( "fmt" "os" + chm "owl/common/chanMonitor" "path/filepath" "runtime" ) @@ -33,5 +34,9 @@ func main() { fmt.Println("failed to init inspector:", err) return } + + chm.AddNamed("inspector.resultPool.results", "owl-inspector", inspector.resultPool.results) + chm.AddNamed("inspector.taskPool.tasks", "owl-inspector", inspector.taskPool.tasks) + chm.New("owl-inspector", ":20001").Start() select {} }