diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index abb0c6c32e..c475443dc0 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -139,8 +139,6 @@ metrics_report_statsd_prefix = "" ` type Config struct { - ConfigFileName string `toml:"-" json:"config_file_name"` - ProtoType string `toml:"proto_type" json:"proto_type"` ProxyAddr string `toml:"proxy_addr" json:"proxy_addr"` AdminAddr string `toml:"admin_addr" json:"admin_addr"` @@ -197,6 +195,7 @@ type Config struct { MetricsReportStatsdServer string `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"` MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"` MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"` + ConfigFileName string `toml:"-" json:"config_file_name"` } func NewDefaultConfig() *Config { diff --git a/codis/pkg/proxy/mapper.go b/codis/pkg/proxy/mapper.go index 528c101a6a..3095c5548a 100644 --- a/codis/pkg/proxy/mapper.go +++ b/codis/pkg/proxy/mapper.go @@ -73,7 +73,6 @@ func init() { {"BLPOP", FlagWrite | FlagNotAllow}, {"BRPOP", FlagWrite | FlagNotAllow}, {"BRPOPLPUSH", FlagWrite | FlagNotAllow}, - {"CCONFIG", FlagWrite}, {"CLIENT", FlagNotAllow}, {"CLUSTER", FlagNotAllow}, {"COMMAND", 0}, @@ -216,7 +215,7 @@ func init() { {"SLOTSRESTORE-ASYNC-AUTH", FlagWrite | FlagNotAllow}, {"SLOTSRESTORE-ASYNC-ACK", FlagWrite | FlagNotAllow}, {"SLOTSSCAN", FlagMasterOnly}, - {"SLOWLOG", 0}, + {"SLOWLOG", FlagNotAllow}, {"SMEMBERS", 0}, {"SMOVE", FlagWrite}, {"SORT", FlagWrite}, @@ -230,6 +229,8 @@ func init() { {"SUNION", 0}, {"SUNIONSTORE", FlagWrite}, {"SYNC", FlagNotAllow}, + {"PCONFIG", 0}, + {"PSLOWLOG", 0}, {"TIME", FlagNotAllow}, {"TOUCH", FlagWrite}, {"TTL", 0}, @@ -322,9 +323,10 @@ func getHashKey(multi []*redis.Resp, opstr string) []byte { } func getWholeCmd(multi []*redis.Resp, cmd []byte) int { - var index = 0 - var bytes = 0 - + var ( + index = 0 + bytes = 0 + ) for i := 0; i < len(multi); i++ { if index < len(cmd) { index += copy(cmd[index:], multi[i].Value) @@ -334,7 +336,6 @@ func getWholeCmd(multi []*redis.Resp, cmd []byte) int { } bytes += len(multi[i].Value) - // 如果cmd已经满了,那么最后腾出来一个more长度的位置添加more信息 if i == len(multi)-1 && index == len(cmd) { more := []byte("... " + strconv.Itoa(len(multi)) + " elements " + strconv.Itoa(bytes) + " bytes.") index = len(cmd) - len(more) diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index 899eafa607..c951694bfb 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -10,7 +10,6 @@ import ( "os" "os/exec" "path/filepath" - "pika/codis/v2/pkg/proxy/redis" "runtime" "strconv" "strings" @@ -18,6 +17,7 @@ import ( "time" "pika/codis/v2/pkg/models" + "pika/codis/v2/pkg/proxy/redis" "pika/codis/v2/pkg/utils" "pika/codis/v2/pkg/utils/errors" "pika/codis/v2/pkg/utils/log" @@ -62,82 +62,82 @@ func New(config *Config) (*Proxy, error) { return nil, errors.Trace(err) } - s := &Proxy{} - s.config = config - s.exit.C = make(chan struct{}) - s.router = NewRouter(config) - s.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64()) + p := &Proxy{} + p.config = config + p.exit.C = make(chan struct{}) + p.router = NewRouter(config) + p.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64()) - s.model = &models.Proxy{ + p.model = &models.Proxy{ StartTime: time.Now().String(), } - s.model.ProductName = config.ProductName - s.model.DataCenter = config.ProxyDataCenter - s.model.Pid = os.Getpid() - s.model.Pwd, _ = os.Getwd() + p.model.ProductName = config.ProductName + p.model.DataCenter = config.ProxyDataCenter + p.model.Pid = os.Getpid() + p.model.Pwd, _ = os.Getwd() if b, err := exec.Command("uname", "-a").Output(); err != nil { log.WarnErrorf(err, "run command uname failed") } else { - s.model.Sys = strings.TrimSpace(string(b)) + p.model.Sys = strings.TrimSpace(string(b)) } - s.model.Hostname = utils.Hostname + p.model.Hostname = utils.Hostname - if err := s.setup(config); err != nil { - s.Close() + if err := p.setup(config); err != nil { + p.Close() return nil, err } - log.Warnf("[%p] create new proxy:\n%s", s, s.model.Encode()) + log.Warnf("[%p] create new proxy:\n%s", p, p.model.Encode()) unsafe2.SetMaxOffheapBytes(config.ProxyMaxOffheapBytes.Int64()) - go s.serveAdmin() - go s.serveProxy() + go p.serveAdmin() + go p.serveProxy() - s.startMetricsJson() - s.startMetricsInfluxdb() - s.startMetricsStatsd() + p.startMetricsJson() + p.startMetricsInfluxdb() + p.startMetricsStatsd() - return s, nil + return p, nil } -func (s *Proxy) setup(config *Config) error { +func (p *Proxy) setup(config *Config) error { proto := config.ProtoType if l, err := net.Listen(proto, config.ProxyAddr); err != nil { return errors.Trace(err) } else { - s.lproxy = l + p.lproxy = l x, err := utils.ReplaceUnspecifiedIP(proto, l.Addr().String(), config.HostProxy) if err != nil { return err } - s.model.ProtoType = proto - s.model.ProxyAddr = x + p.model.ProtoType = proto + p.model.ProxyAddr = x } proto = "tcp" if l, err := net.Listen(proto, config.AdminAddr); err != nil { return errors.Trace(err) } else { - s.ladmin = l + p.ladmin = l x, err := utils.ReplaceUnspecifiedIP(proto, l.Addr().String(), config.HostAdmin) if err != nil { return err } - s.model.AdminAddr = x + p.model.AdminAddr = x } - s.model.Token = rpc.NewToken( + p.model.Token = rpc.NewToken( config.ProductName, - s.lproxy.Addr().String(), - s.ladmin.Addr().String(), + p.lproxy.Addr().String(), + p.ladmin.Addr().String(), ) - s.xauth = rpc.NewXAuth( + p.xauth = rpc.NewXAuth( config.ProductName, config.ProductAuth, - s.model.Token, + p.model.Token, ) if config.JodisAddr != "" { @@ -146,92 +146,92 @@ func (s *Proxy) setup(config *Config) error { return err } if config.JodisCompatible { - s.model.JodisPath = filepath.Join("/zk/codis", fmt.Sprintf("db_%s", config.ProductName), "proxy", s.model.Token) + p.model.JodisPath = filepath.Join("/zk/codis", fmt.Sprintf("db_%s", config.ProductName), "proxy", p.model.Token) } else { - s.model.JodisPath = models.JodisPath(config.ProductName, s.model.Token) + p.model.JodisPath = models.JodisPath(config.ProductName, p.model.Token) } - s.jodis = NewJodis(c, s.model) + p.jodis = NewJodis(c, p.model) } - s.model.MaxSlotNum = config.MaxSlotNum + p.model.MaxSlotNum = config.MaxSlotNum return nil } -func (s *Proxy) Start() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) Start() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - if s.online { + if p.online { return nil } - s.online = true - s.router.Start() - if s.jodis != nil { - s.jodis.Start() + p.online = true + p.router.Start() + if p.jodis != nil { + p.jodis.Start() } return nil } -func (s *Proxy) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return nil } - s.closed = true - close(s.exit.C) + p.closed = true + close(p.exit.C) - if s.jodis != nil { - s.jodis.Close() + if p.jodis != nil { + p.jodis.Close() } - if s.ladmin != nil { - s.ladmin.Close() + if p.ladmin != nil { + p.ladmin.Close() } - if s.lproxy != nil { - s.lproxy.Close() + if p.lproxy != nil { + p.lproxy.Close() } - if s.router != nil { - s.router.Close() + if p.router != nil { + p.router.Close() } return nil } -func (s *Proxy) XAuth() string { - return s.xauth +func (p *Proxy) XAuth() string { + return p.xauth } -func (s *Proxy) Model() *models.Proxy { - return s.model +func (p *Proxy) Model() *models.Proxy { + return p.model } -func (s *Proxy) Config() *Config { - s.mu.Lock() - defer s.mu.Unlock() - return s.config +func (p *Proxy) Config() *Config { + p.mu.Lock() + defer p.mu.Unlock() + return p.config } -func (s *Proxy) ConfigGet(key string) *redis.Resp { - s.mu.Lock() - defer s.mu.Unlock() +func (p *Proxy) ConfigGet(key string) *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() switch key { case "proxy_max_clients": - return redis.NewBulkBytes([]byte(strconv.Itoa(s.config.ProxyMaxClients))) + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.ProxyMaxClients))) case "backend_primary_only": - return redis.NewBulkBytes([]byte(strconv.FormatBool(s.config.BackendPrimaryOnly))) + return redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.BackendPrimaryOnly))) case "slowlog_log_slower_than": - return redis.NewBulkBytes([]byte(strconv.FormatInt(s.config.SlowlogLogSlowerThan, 10))) + return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogLogSlowerThan, 10))) case "slowlog_max_len": - return redis.NewBulkBytes([]byte(strconv.FormatInt(s.config.SlowlogMaxLen, 10))) + return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogMaxLen, 10))) default: return redis.NewErrorf("unsupported key: %s", key) } } -func (s *Proxy) ConfigSet(key, value string) *redis.Resp { - s.mu.Lock() - defer s.mu.Unlock() +func (p *Proxy) ConfigSet(key, value string) *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() switch key { case "proxy_max_clients": n, err := strconv.Atoi(value) @@ -241,7 +241,7 @@ func (s *Proxy) ConfigSet(key, value string) *redis.Resp { if n <= 0 { return redis.NewErrorf("invalid proxy_max_clients") } - s.config.ProxyMaxClients = n + p.config.ProxyMaxClients = n return redis.NewString([]byte("OK")) case "backend_primary_only": return redis.NewErrorf("not currently supported") @@ -253,7 +253,7 @@ func (s *Proxy) ConfigSet(key, value string) *redis.Resp { if n < 0 { return redis.NewErrorf("invalid slowlog_log_slower_than") } - s.config.SlowlogLogSlowerThan = n + p.config.SlowlogLogSlowerThan = n return redis.NewString([]byte("OK")) case "slowlog_max_len": n, err := strconv.ParseInt(value, 10, 64) @@ -264,9 +264,9 @@ func (s *Proxy) ConfigSet(key, value string) *redis.Resp { if n < 0 { return redis.NewErrorf("invalid slowlog_max_len") } - s.config.SlowlogMaxLen = n - if s.config.SlowlogMaxLen > 0 { - SlowLogSetMaxLen(s.config.SlowlogMaxLen) + p.config.SlowlogMaxLen = n + if p.config.SlowlogMaxLen > 0 { + SlowLogSetMaxLen(p.config.SlowlogMaxLen) } return redis.NewString([]byte("OK")) default: @@ -274,114 +274,114 @@ func (s *Proxy) ConfigSet(key, value string) *redis.Resp { } } -func (s *Proxy) ConfigRewrite() *redis.Resp { - s.mu.Lock() - defer s.mu.Unlock() - utils.RewriteConfig(*(s.config), s.config.ConfigFileName, "=", true) +func (p *Proxy) ConfigRewrite() *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() + utils.RewriteConfig(*(p.config), p.config.ConfigFileName, "=", true) return redis.NewString([]byte("OK")) } -func (s *Proxy) IsOnline() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.online && !s.closed +func (p *Proxy) IsOnline() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.online && !p.closed } -func (s *Proxy) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.closed +func (p *Proxy) IsClosed() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.closed } -func (s *Proxy) HasSwitched() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.router.HasSwitched() +func (p *Proxy) HasSwitched() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.router.HasSwitched() } -func (s *Proxy) Slots() []*models.Slot { - s.mu.Lock() - defer s.mu.Unlock() - return s.router.GetSlots() +func (p *Proxy) Slots() []*models.Slot { + p.mu.Lock() + defer p.mu.Unlock() + return p.router.GetSlots() } -func (s *Proxy) FillSlot(m *models.Slot) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) FillSlot(m *models.Slot) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - return s.router.FillSlot(m) + return p.router.FillSlot(m) } -func (s *Proxy) FillSlots(slots []*models.Slot) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) FillSlots(slots []*models.Slot) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } for _, m := range slots { - if err := s.router.FillSlot(m); err != nil { + if err := p.router.FillSlot(m); err != nil { return err } } return nil } -func (s *Proxy) SwitchMasters(masters map[int]string) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) SwitchMasters(masters map[int]string) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - s.ha.masters = masters + p.ha.masters = masters if len(masters) != 0 { - s.router.SwitchMasters(masters) + p.router.SwitchMasters(masters) } return nil } -func (s *Proxy) GetSentinels() ([]string, map[int]string) { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) GetSentinels() ([]string, map[int]string) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return nil, nil } - return s.ha.servers, s.ha.masters + return p.ha.servers, p.ha.masters } -func (s *Proxy) serveAdmin() { - if s.IsClosed() { +func (p *Proxy) serveAdmin() { + if p.IsClosed() { return } - defer s.Close() + defer p.Close() - log.Warnf("[%p] admin start service on %s", s, s.ladmin.Addr()) + log.Warnf("[%p] admin start service on %s", p, p.ladmin.Addr()) eh := make(chan error, 1) go func(l net.Listener) { h := http.NewServeMux() - h.Handle("/", newApiServer(s)) + h.Handle("/", newApiServer(p)) hs := &http.Server{Handler: h} eh <- hs.Serve(l) - }(s.ladmin) + }(p.ladmin) select { - case <-s.exit.C: - log.Warnf("[%p] admin shutdown", s) + case <-p.exit.C: + log.Warnf("[%p] admin shutdown", p) case err := <-eh: - log.ErrorErrorf(err, "[%p] admin exit on error", s) + log.ErrorErrorf(err, "[%p] admin exit on error", p) } } -func (s *Proxy) serveProxy() { - if s.IsClosed() { +func (p *Proxy) serveProxy() { + if p.IsClosed() { return } - defer s.Close() + defer p.Close() - log.Warnf("[%p] proxy start service on %s", s, s.lproxy.Addr()) + log.Warnf("[%p] proxy start service on %s", p, p.lproxy.Addr()) eh := make(chan error, 1) go func(l net.Listener) (err error) { @@ -389,40 +389,40 @@ func (s *Proxy) serveProxy() { eh <- err }() for { - c, err := s.acceptConn(l) + c, err := p.acceptConn(l) if err != nil { return err } - NewSession(c, s.config, s).Start(s.router) + NewSession(c, p.config, p).Start(p.router) } - }(s.lproxy) + }(p.lproxy) - if d := s.config.BackendPingPeriod.Duration(); d != 0 { - go s.keepAlive(d) + if d := p.config.BackendPingPeriod.Duration(); d != 0 { + go p.keepAlive(d) } select { - case <-s.exit.C: - log.Warnf("[%p] proxy shutdown", s) + case <-p.exit.C: + log.Warnf("[%p] proxy shutdown", p) case err := <-eh: - log.ErrorErrorf(err, "[%p] proxy exit on error", s) + log.ErrorErrorf(err, "[%p] proxy exit on error", p) } } -func (s *Proxy) keepAlive(d time.Duration) { +func (p *Proxy) keepAlive(d time.Duration) { var ticker = time.NewTicker(math2.MaxDuration(d, time.Second)) defer ticker.Stop() for { select { - case <-s.exit.C: + case <-p.exit.C: return case <-ticker.C: - s.router.KeepAlive() + p.router.KeepAlive() } } } -func (s *Proxy) acceptConn(l net.Listener) (net.Conn, error) { +func (p *Proxy) acceptConn(l net.Listener) (net.Conn, error) { var delay = &DelayExp2{ Min: 10, Max: 500, Unit: time.Millisecond, @@ -431,7 +431,7 @@ func (s *Proxy) acceptConn(l net.Listener) (net.Conn, error) { c, err := l.Accept() if err != nil { if e, ok := err.(net.Error); ok && e.Temporary() { - log.WarnErrorf(err, "[%p] proxy accept new connection failed", s) + log.WarnErrorf(err, "[%p] proxy accept new connection failed", p) delay.Sleep() continue } @@ -531,24 +531,24 @@ const ( StatsFull = StatsFlags(^uint32(0)) ) -func (s *Proxy) Overview(flags StatsFlags) *Overview { +func (p *Proxy) Overview(flags StatsFlags) *Overview { o := &Overview{ Version: utils.Version, Compile: utils.Compile, - Config: s.Config(), - Model: s.Model(), - Stats: s.Stats(flags), + Config: p.Config(), + Model: p.Model(), + Stats: p.Stats(flags), } if flags.HasBit(StatsSlots) { - o.Slots = s.Slots() + o.Slots = p.Slots() } return o } -func (s *Proxy) Stats(flags StatsFlags) *Stats { +func (p *Proxy) Stats(flags StatsFlags) *Stats { stats := &Stats{} - stats.Online = s.IsOnline() - stats.Closed = s.IsClosed() + stats.Online = p.IsOnline() + stats.Closed = p.IsClosed() stats.Ops.Total = OpTotal() stats.Ops.Fails = OpFails() @@ -569,7 +569,7 @@ func (s *Proxy) Stats(flags StatsFlags) *Stats { stats.Rusage.Raw = u.Usage } - stats.Backend.PrimaryOnly = s.Config().BackendPrimaryOnly + stats.Backend.PrimaryOnly = p.Config().BackendPrimaryOnly if flags.HasBit(StatsRuntime) { var r runtime.MemStats diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index 4679b60035..188a62ae8c 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -212,6 +212,7 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { breakOnFailure = s.config.SessionBreakOnFailure maxPipelineLen = s.config.SessionMaxPipeline ) + var cmd = make([]byte, 128) p := s.Conn.FlushEncoder() p.MaxInterval = time.Millisecond @@ -240,7 +241,7 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { } nowTime := time.Now().UnixNano() duration := int64((nowTime - r.ReceiveTime) / 1e3) - if duration >= 50000 { + if duration >= s.config.SlowlogLogSlowerThan { //client -> proxy -> server -> porxy -> client //Record the waiting time from receiving the request from the client to sending it to the backend server //the waiting time from sending the request to the backend server to receiving the response from the server @@ -255,8 +256,13 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { if r.ReceiveFromServerTime > 0 { d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3) } - log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d", - time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen) + index := getWholeCmd(r.Multi, cmd) + cmdLog := fmt.Sprintf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", + time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index])) + log.Warnf("%s", cmdLog) + if s.config.SlowlogMaxLen > 0 { + SlowLogPush(&SlowLogEntry{SlowLogGetCurLogId(), r.ReceiveTime / 1e3, duration, cmdLog}) + } } return nil }) @@ -320,10 +326,10 @@ func (s *Session) handleRequest(r *Request, d *Router) error { return s.handleRequestDel(r, d) case "EXISTS": return s.handleRequestExists(r, d) - case "CCONFIG": - return s.handleCConfig(r) - case "SLOWLOG": - return s.handleSlowLog(r, d) + case "PCONFIG": + return s.handlePConfig(r) + case "PSLOWLOG": + return s.handlePSlowLog(r) case "SLOTSINFO": return s.handleRequestSlotsInfo(r, d) case "SLOTSSCAN": @@ -720,12 +726,11 @@ func (s *Session) flushOpStats(force bool) { } } -func (s *Session) handleSlowLog(r *Request, d *Router) error { +func (s *Session) handlePSlowLog(r *Request) error { if len(r.Multi) < 2 || len(r.Multi) > 4 { r.Resp = redis.NewErrorf("ERR slowLog parameters") return nil } - var subCmd = strings.ToUpper(string(r.Multi[1].Value)) switch subCmd { case "GET": @@ -738,9 +743,11 @@ func (s *Session) handleSlowLog(r *Request, d *Router) error { r.Resp = SlowLogGetByNum(num) } else if len(r.Multi) == 4 { - var id int64 - var num int64 - var err error + var ( + id int64 + num int64 + err error + ) id, err = strconv.ParseInt(string(r.Multi[2].Value), 10, 64) if err != nil { r.Resp = redis.NewErrorf("ERR invalid slowLog start logId") @@ -774,7 +781,7 @@ func (s *Session) handleSlowLog(r *Request, d *Router) error { return nil } -func (s *Session) handleCConfig(r *Request) error { +func (s *Session) handlePConfig(r *Request) error { if len(r.Multi) < 2 || len(r.Multi) > 4 { r.Resp = redis.NewErrorf("ERR config parameters") return nil diff --git a/codis/pkg/proxy/slowlog.go b/codis/pkg/proxy/slowlog.go index e91113e584..79c131c546 100644 --- a/codis/pkg/proxy/slowlog.go +++ b/codis/pkg/proxy/slowlog.go @@ -15,10 +15,6 @@ const ( PIKA_SLOWLOG_LENGTH_MAX = 10000000 ) -type Mutex struct { - sync.Mutex -} - type SlowLogEntry struct { id int64 time int64 @@ -27,7 +23,7 @@ type SlowLogEntry struct { } type SlowLog struct { - Mutex + sync.Mutex logList *list.List logId atomic2.Int64 maxLen atomic2.Int64 @@ -107,7 +103,7 @@ func SlowLogGetByNum(num int64) *redis.Resp { num = int64(PSlowLog.logList.Len()) } var res = make([]*redis.Resp, 0, num) - var iter = PSlowLog.logList.Front() // 从最新的数据开始 + var iter = PSlowLog.logList.Front() for i := int64(0); i < num; i++ { if iter == nil || iter.Value == nil { break diff --git a/codis/pkg/utils/configAux.go b/codis/pkg/utils/configAux.go index 5dfbac18bc..7dc68286ad 100644 --- a/codis/pkg/utils/configAux.go +++ b/codis/pkg/utils/configAux.go @@ -29,7 +29,7 @@ type ConfItem struct { type DeployConfig struct { items []*ConfItem confMap map[string]*ConfItem - sep string //配置项中key、value分隔符 + sep string } func (c *DeployConfig) Init(path string, sep string) error { @@ -185,10 +185,11 @@ func (c *DeployConfig) Set(key string, value string) error { if found { item.value = value } else { - item := &ConfItem{} - item.confType = TypeConf - item.name = key - item.value = value + item := &ConfItem{ + confType: TypeConf, + name: key, + value: value, + } c.items = append(c.items, item) c.confMap[item.name] = item }