From 8ed1123843f0d973af41da04a300fc8aee40cb53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E5=B0=8F=E5=B8=85?= <56024577+dingxiaoshuai123@users.noreply.github.com> Date: Wed, 8 Nov 2023 17:56:49 +0800 Subject: [PATCH] Dynamic modification of configuration files&&Slow log (#2103) * Dynamic modification of configuration files.Check the correctness of the functionality. * Modified some specification issues. * Dynamically modify Codis configuration file and add slow query logs. --- codis/cmd/proxy/main.go | 2 + codis/config/proxy.toml | 6 + codis/pkg/proxy/config.go | 18 ++ codis/pkg/proxy/mapper.go | 30 +++ codis/pkg/proxy/proxy.go | 347 +++++++++++++++++++++-------------- codis/pkg/proxy/session.go | 117 +++++++++++- codis/pkg/proxy/slowlog.go | 173 +++++++++++++++++ codis/pkg/utils/configAux.go | 271 +++++++++++++++++++++++++++ 8 files changed, 821 insertions(+), 143 deletions(-) create mode 100644 codis/pkg/proxy/slowlog.go create mode 100644 codis/pkg/utils/configAux.go diff --git a/codis/cmd/proxy/main.go b/codis/cmd/proxy/main.go index 92ebe47e87..8d7da2381f 100644 --- a/codis/cmd/proxy/main.go +++ b/codis/cmd/proxy/main.go @@ -112,6 +112,8 @@ Options: if err := config.LoadFromFile(s); err != nil { log.PanicErrorf(err, "load config %s failed", s) } + config.ConfigFileName = s + log.Warnf("option --config = %s", s) } models.SetMaxSlotNum(config.MaxSlotNum) if s, ok := utils.Argument(d, "--host-admin"); ok { diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index 5aa6e18a73..e69bc02f4a 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -99,6 +99,12 @@ session_keepalive_period = "75s" # Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client. session_break_on_failure = false +# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log +slowlog_log_slower_than = 100000 + +# set the number of slowlog in memory, max len is 10000000. (0 to disable) +slowlog_max_len = 128000 + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index e101df44b6..c475443dc0 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -115,6 +115,12 @@ session_keepalive_period = "75s" # Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client. session_break_on_failure = false +# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log +slowlog_log_slower_than = 100000 + +# set the number of slowlog in memory, max len is 10000000. (0 to disable) +slowlog_max_len = 128000 + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" @@ -176,6 +182,9 @@ type Config struct { SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"` SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"` + SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"` + SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"` + MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"` MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"` MetricsReportInfluxdbServer string `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"` @@ -186,6 +195,7 @@ type Config struct { MetricsReportStatsdServer string `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"` MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"` MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"` + ConfigFileName string `toml:"-" json:"config_file_name"` } func NewDefaultConfig() *Config { @@ -302,6 +312,13 @@ func (c *Config) Validate() error { return errors.New("invalid session_keepalive_period") } + if c.SlowlogLogSlowerThan < 0 { + return errors.New("invalid slowlog_log_slower_than") + } + if c.SlowlogMaxLen < 0 { + return errors.New("invalid slowlog_max_len") + } + if c.MetricsReportPeriod < 0 { return errors.New("invalid metrics_report_period") } @@ -311,5 +328,6 @@ func (c *Config) Validate() error { if c.MetricsReportStatsdPeriod < 0 { return errors.New("invalid metrics_report_statsd_period") } + return nil } diff --git a/codis/pkg/proxy/mapper.go b/codis/pkg/proxy/mapper.go index 365ff642d3..3095c5548a 100644 --- a/codis/pkg/proxy/mapper.go +++ b/codis/pkg/proxy/mapper.go @@ -6,6 +6,7 @@ package proxy import ( "bytes" "hash/crc32" + "strconv" "strings" "pika/codis/v2/pkg/proxy/redis" @@ -228,6 +229,8 @@ func init() { {"SUNION", 0}, {"SUNIONSTORE", FlagWrite}, {"SYNC", FlagNotAllow}, + {"PCONFIG", 0}, + {"PSLOWLOG", 0}, {"TIME", FlagNotAllow}, {"TOUCH", FlagWrite}, {"TTL", 0}, @@ -318,3 +321,30 @@ func getHashKey(multi []*redis.Resp, opstr string) []byte { } return nil } + +func getWholeCmd(multi []*redis.Resp, cmd []byte) int { + var ( + index = 0 + bytes = 0 + ) + for i := 0; i < len(multi); i++ { + if index < len(cmd) { + index += copy(cmd[index:], multi[i].Value) + if i < len(multi)-i { + index += copy(cmd[index:], []byte(" ")) + } + } + bytes += len(multi[i].Value) + + if i == len(multi)-1 && index == len(cmd) { + more := []byte("... " + strconv.Itoa(len(multi)) + " elements " + strconv.Itoa(bytes) + " bytes.") + index = len(cmd) - len(more) + if index < 0 { + index = 0 + } + index += copy(cmd[index:], more) + break + } + } + return index +} diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index d830182286..c951694bfb 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -11,11 +11,13 @@ import ( "os/exec" "path/filepath" "runtime" + "strconv" "strings" "sync" "time" "pika/codis/v2/pkg/models" + "pika/codis/v2/pkg/proxy/redis" "pika/codis/v2/pkg/utils" "pika/codis/v2/pkg/utils/errors" "pika/codis/v2/pkg/utils/log" @@ -60,82 +62,82 @@ func New(config *Config) (*Proxy, error) { return nil, errors.Trace(err) } - s := &Proxy{} - s.config = config - s.exit.C = make(chan struct{}) - s.router = NewRouter(config) - s.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64()) + p := &Proxy{} + p.config = config + p.exit.C = make(chan struct{}) + p.router = NewRouter(config) + p.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64()) - s.model = &models.Proxy{ + p.model = &models.Proxy{ StartTime: time.Now().String(), } - s.model.ProductName = config.ProductName - s.model.DataCenter = config.ProxyDataCenter - s.model.Pid = os.Getpid() - s.model.Pwd, _ = os.Getwd() + p.model.ProductName = config.ProductName + p.model.DataCenter = config.ProxyDataCenter + p.model.Pid = os.Getpid() + p.model.Pwd, _ = os.Getwd() if b, err := exec.Command("uname", "-a").Output(); err != nil { log.WarnErrorf(err, "run command uname failed") } else { - s.model.Sys = strings.TrimSpace(string(b)) + p.model.Sys = strings.TrimSpace(string(b)) } - s.model.Hostname = utils.Hostname + p.model.Hostname = utils.Hostname - if err := s.setup(config); err != nil { - s.Close() + if err := p.setup(config); err != nil { + p.Close() return nil, err } - log.Warnf("[%p] create new proxy:\n%s", s, s.model.Encode()) + log.Warnf("[%p] create new proxy:\n%s", p, p.model.Encode()) unsafe2.SetMaxOffheapBytes(config.ProxyMaxOffheapBytes.Int64()) - go s.serveAdmin() - go s.serveProxy() + go p.serveAdmin() + go p.serveProxy() - s.startMetricsJson() - s.startMetricsInfluxdb() - s.startMetricsStatsd() + p.startMetricsJson() + p.startMetricsInfluxdb() + p.startMetricsStatsd() - return s, nil + return p, nil } -func (s *Proxy) setup(config *Config) error { +func (p *Proxy) setup(config *Config) error { proto := config.ProtoType if l, err := net.Listen(proto, config.ProxyAddr); err != nil { return errors.Trace(err) } else { - s.lproxy = l + p.lproxy = l x, err := utils.ReplaceUnspecifiedIP(proto, l.Addr().String(), config.HostProxy) if err != nil { return err } - s.model.ProtoType = proto - s.model.ProxyAddr = x + p.model.ProtoType = proto + p.model.ProxyAddr = x } proto = "tcp" if l, err := net.Listen(proto, config.AdminAddr); err != nil { return errors.Trace(err) } else { - s.ladmin = l + p.ladmin = l x, err := utils.ReplaceUnspecifiedIP(proto, l.Addr().String(), config.HostAdmin) if err != nil { return err } - s.model.AdminAddr = x + p.model.AdminAddr = x } - s.model.Token = rpc.NewToken( + p.model.Token = rpc.NewToken( config.ProductName, - s.lproxy.Addr().String(), - s.ladmin.Addr().String(), + p.lproxy.Addr().String(), + p.ladmin.Addr().String(), ) - s.xauth = rpc.NewXAuth( + p.xauth = rpc.NewXAuth( config.ProductName, config.ProductAuth, - s.model.Token, + p.model.Token, ) if config.JodisAddr != "" { @@ -144,171 +146,242 @@ func (s *Proxy) setup(config *Config) error { return err } if config.JodisCompatible { - s.model.JodisPath = filepath.Join("/zk/codis", fmt.Sprintf("db_%s", config.ProductName), "proxy", s.model.Token) + p.model.JodisPath = filepath.Join("/zk/codis", fmt.Sprintf("db_%s", config.ProductName), "proxy", p.model.Token) } else { - s.model.JodisPath = models.JodisPath(config.ProductName, s.model.Token) + p.model.JodisPath = models.JodisPath(config.ProductName, p.model.Token) } - s.jodis = NewJodis(c, s.model) + p.jodis = NewJodis(c, p.model) } - s.model.MaxSlotNum = config.MaxSlotNum + p.model.MaxSlotNum = config.MaxSlotNum return nil } -func (s *Proxy) Start() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) Start() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - if s.online { + if p.online { return nil } - s.online = true - s.router.Start() - if s.jodis != nil { - s.jodis.Start() + p.online = true + p.router.Start() + if p.jodis != nil { + p.jodis.Start() } return nil } -func (s *Proxy) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return nil } - s.closed = true - close(s.exit.C) + p.closed = true + close(p.exit.C) - if s.jodis != nil { - s.jodis.Close() + if p.jodis != nil { + p.jodis.Close() } - if s.ladmin != nil { - s.ladmin.Close() + if p.ladmin != nil { + p.ladmin.Close() } - if s.lproxy != nil { - s.lproxy.Close() + if p.lproxy != nil { + p.lproxy.Close() } - if s.router != nil { - s.router.Close() + if p.router != nil { + p.router.Close() } return nil } -func (s *Proxy) XAuth() string { - return s.xauth +func (p *Proxy) XAuth() string { + return p.xauth } -func (s *Proxy) Model() *models.Proxy { - return s.model +func (p *Proxy) Model() *models.Proxy { + return p.model } -func (s *Proxy) Config() *Config { - return s.config +func (p *Proxy) Config() *Config { + p.mu.Lock() + defer p.mu.Unlock() + return p.config } -func (s *Proxy) IsOnline() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.online && !s.closed +func (p *Proxy) ConfigGet(key string) *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() + switch key { + case "proxy_max_clients": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.ProxyMaxClients))) + case "backend_primary_only": + return redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.BackendPrimaryOnly))) + case "slowlog_log_slower_than": + return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogLogSlowerThan, 10))) + case "slowlog_max_len": + return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogMaxLen, 10))) + default: + return redis.NewErrorf("unsupported key: %s", key) + } +} + +func (p *Proxy) ConfigSet(key, value string) *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() + switch key { + case "proxy_max_clients": + n, err := strconv.Atoi(value) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + if n <= 0 { + return redis.NewErrorf("invalid proxy_max_clients") + } + p.config.ProxyMaxClients = n + return redis.NewString([]byte("OK")) + case "backend_primary_only": + return redis.NewErrorf("not currently supported") + case "slowlog_log_slower_than": + n, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + if n < 0 { + return redis.NewErrorf("invalid slowlog_log_slower_than") + } + p.config.SlowlogLogSlowerThan = n + return redis.NewString([]byte("OK")) + case "slowlog_max_len": + n, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + + if n < 0 { + return redis.NewErrorf("invalid slowlog_max_len") + } + p.config.SlowlogMaxLen = n + if p.config.SlowlogMaxLen > 0 { + SlowLogSetMaxLen(p.config.SlowlogMaxLen) + } + return redis.NewString([]byte("OK")) + default: + return redis.NewErrorf("unsupported key: %s", key) + } +} + +func (p *Proxy) ConfigRewrite() *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() + utils.RewriteConfig(*(p.config), p.config.ConfigFileName, "=", true) + return redis.NewString([]byte("OK")) +} + +func (p *Proxy) IsOnline() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.online && !p.closed } -func (s *Proxy) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.closed +func (p *Proxy) IsClosed() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.closed } -func (s *Proxy) HasSwitched() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.router.HasSwitched() +func (p *Proxy) HasSwitched() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.router.HasSwitched() } -func (s *Proxy) Slots() []*models.Slot { - s.mu.Lock() - defer s.mu.Unlock() - return s.router.GetSlots() +func (p *Proxy) Slots() []*models.Slot { + p.mu.Lock() + defer p.mu.Unlock() + return p.router.GetSlots() } -func (s *Proxy) FillSlot(m *models.Slot) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) FillSlot(m *models.Slot) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - return s.router.FillSlot(m) + return p.router.FillSlot(m) } -func (s *Proxy) FillSlots(slots []*models.Slot) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) FillSlots(slots []*models.Slot) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } for _, m := range slots { - if err := s.router.FillSlot(m); err != nil { + if err := p.router.FillSlot(m); err != nil { return err } } return nil } -func (s *Proxy) SwitchMasters(masters map[int]string) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) SwitchMasters(masters map[int]string) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - s.ha.masters = masters + p.ha.masters = masters if len(masters) != 0 { - s.router.SwitchMasters(masters) + p.router.SwitchMasters(masters) } return nil } -func (s *Proxy) GetSentinels() ([]string, map[int]string) { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) GetSentinels() ([]string, map[int]string) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return nil, nil } - return s.ha.servers, s.ha.masters + return p.ha.servers, p.ha.masters } -func (s *Proxy) serveAdmin() { - if s.IsClosed() { +func (p *Proxy) serveAdmin() { + if p.IsClosed() { return } - defer s.Close() + defer p.Close() - log.Warnf("[%p] admin start service on %s", s, s.ladmin.Addr()) + log.Warnf("[%p] admin start service on %s", p, p.ladmin.Addr()) eh := make(chan error, 1) go func(l net.Listener) { h := http.NewServeMux() - h.Handle("/", newApiServer(s)) + h.Handle("/", newApiServer(p)) hs := &http.Server{Handler: h} eh <- hs.Serve(l) - }(s.ladmin) + }(p.ladmin) select { - case <-s.exit.C: - log.Warnf("[%p] admin shutdown", s) + case <-p.exit.C: + log.Warnf("[%p] admin shutdown", p) case err := <-eh: - log.ErrorErrorf(err, "[%p] admin exit on error", s) + log.ErrorErrorf(err, "[%p] admin exit on error", p) } } -func (s *Proxy) serveProxy() { - if s.IsClosed() { +func (p *Proxy) serveProxy() { + if p.IsClosed() { return } - defer s.Close() + defer p.Close() - log.Warnf("[%p] proxy start service on %s", s, s.lproxy.Addr()) + log.Warnf("[%p] proxy start service on %s", p, p.lproxy.Addr()) eh := make(chan error, 1) go func(l net.Listener) (err error) { @@ -316,40 +389,40 @@ func (s *Proxy) serveProxy() { eh <- err }() for { - c, err := s.acceptConn(l) + c, err := p.acceptConn(l) if err != nil { return err } - NewSession(c, s.config).Start(s.router) + NewSession(c, p.config, p).Start(p.router) } - }(s.lproxy) + }(p.lproxy) - if d := s.config.BackendPingPeriod.Duration(); d != 0 { - go s.keepAlive(d) + if d := p.config.BackendPingPeriod.Duration(); d != 0 { + go p.keepAlive(d) } select { - case <-s.exit.C: - log.Warnf("[%p] proxy shutdown", s) + case <-p.exit.C: + log.Warnf("[%p] proxy shutdown", p) case err := <-eh: - log.ErrorErrorf(err, "[%p] proxy exit on error", s) + log.ErrorErrorf(err, "[%p] proxy exit on error", p) } } -func (s *Proxy) keepAlive(d time.Duration) { +func (p *Proxy) keepAlive(d time.Duration) { var ticker = time.NewTicker(math2.MaxDuration(d, time.Second)) defer ticker.Stop() for { select { - case <-s.exit.C: + case <-p.exit.C: return case <-ticker.C: - s.router.KeepAlive() + p.router.KeepAlive() } } } -func (s *Proxy) acceptConn(l net.Listener) (net.Conn, error) { +func (p *Proxy) acceptConn(l net.Listener) (net.Conn, error) { var delay = &DelayExp2{ Min: 10, Max: 500, Unit: time.Millisecond, @@ -358,7 +431,7 @@ func (s *Proxy) acceptConn(l net.Listener) (net.Conn, error) { c, err := l.Accept() if err != nil { if e, ok := err.(net.Error); ok && e.Temporary() { - log.WarnErrorf(err, "[%p] proxy accept new connection failed", s) + log.WarnErrorf(err, "[%p] proxy accept new connection failed", p) delay.Sleep() continue } @@ -458,24 +531,24 @@ const ( StatsFull = StatsFlags(^uint32(0)) ) -func (s *Proxy) Overview(flags StatsFlags) *Overview { +func (p *Proxy) Overview(flags StatsFlags) *Overview { o := &Overview{ Version: utils.Version, Compile: utils.Compile, - Config: s.Config(), - Model: s.Model(), - Stats: s.Stats(flags), + Config: p.Config(), + Model: p.Model(), + Stats: p.Stats(flags), } if flags.HasBit(StatsSlots) { - o.Slots = s.Slots() + o.Slots = p.Slots() } return o } -func (s *Proxy) Stats(flags StatsFlags) *Stats { +func (p *Proxy) Stats(flags StatsFlags) *Stats { stats := &Stats{} - stats.Online = s.IsOnline() - stats.Closed = s.IsClosed() + stats.Online = p.IsOnline() + stats.Closed = p.IsClosed() stats.Ops.Total = OpTotal() stats.Ops.Fails = OpFails() @@ -496,7 +569,7 @@ func (s *Proxy) Stats(flags StatsFlags) *Stats { stats.Rusage.Raw = u.Usage } - stats.Backend.PrimaryOnly = s.Config().BackendPrimaryOnly + stats.Backend.PrimaryOnly = p.Config().BackendPrimaryOnly if flags.HasBit(StatsRuntime) { var r runtime.MemStats diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index 547a13d21f..188a62ae8c 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "strconv" + "strings" "sync" "time" @@ -44,6 +45,7 @@ type Session struct { broken atomic2.Bool config *Config + proxy *Proxy authorized bool } @@ -62,7 +64,7 @@ func (s *Session) String() string { return string(b) } -func NewSession(sock net.Conn, config *Config) *Session { +func NewSession(sock net.Conn, config *Config, proxy *Proxy) *Session { c := redis.NewConn(sock, config.SessionRecvBufsize.AsInt(), config.SessionSendBufsize.AsInt(), @@ -72,7 +74,7 @@ func NewSession(sock net.Conn, config *Config) *Session { c.SetKeepAlivePeriod(config.SessionKeepAlivePeriod.Duration()) s := &Session{ - Conn: c, config: config, + Conn: c, config: config, proxy: proxy, CreateUnix: time.Now().Unix(), } s.stats.opmap = make(map[string]*opStats, 16) @@ -206,11 +208,11 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { }) s.flushOpStats(true) }() - var ( breakOnFailure = s.config.SessionBreakOnFailure maxPipelineLen = s.config.SessionMaxPipeline ) + var cmd = make([]byte, 128) p := s.Conn.FlushEncoder() p.MaxInterval = time.Millisecond @@ -239,7 +241,7 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { } nowTime := time.Now().UnixNano() duration := int64((nowTime - r.ReceiveTime) / 1e3) - if duration >= 50000 { + if duration >= s.config.SlowlogLogSlowerThan { //client -> proxy -> server -> porxy -> client //Record the waiting time from receiving the request from the client to sending it to the backend server //the waiting time from sending the request to the backend server to receiving the response from the server @@ -254,8 +256,13 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { if r.ReceiveFromServerTime > 0 { d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3) } - log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d", - time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen) + index := getWholeCmd(r.Multi, cmd) + cmdLog := fmt.Sprintf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", + time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index])) + log.Warnf("%s", cmdLog) + if s.config.SlowlogMaxLen > 0 { + SlowLogPush(&SlowLogEntry{SlowLogGetCurLogId(), r.ReceiveTime / 1e3, duration, cmdLog}) + } } return nil }) @@ -319,6 +326,10 @@ func (s *Session) handleRequest(r *Request, d *Router) error { return s.handleRequestDel(r, d) case "EXISTS": return s.handleRequestExists(r, d) + case "PCONFIG": + return s.handlePConfig(r) + case "PSLOWLOG": + return s.handlePSlowLog(r) case "SLOTSINFO": return s.handleRequestSlotsInfo(r, d) case "SLOTSSCAN": @@ -714,3 +725,97 @@ func (s *Session) flushOpStats(force bool) { s.stats.opmap = make(map[string]*opStats, 32) } } + +func (s *Session) handlePSlowLog(r *Request) error { + if len(r.Multi) < 2 || len(r.Multi) > 4 { + r.Resp = redis.NewErrorf("ERR slowLog parameters") + return nil + } + var subCmd = strings.ToUpper(string(r.Multi[1].Value)) + switch subCmd { + case "GET": + if len(r.Multi) == 3 { + num, err := strconv.ParseInt(string(r.Multi[2].Value), 10, 64) + if err != nil { + r.Resp = redis.NewErrorf("ERR invalid slowLog number") + break + } + + r.Resp = SlowLogGetByNum(num) + } else if len(r.Multi) == 4 { + var ( + id int64 + num int64 + err error + ) + id, err = strconv.ParseInt(string(r.Multi[2].Value), 10, 64) + if err != nil { + r.Resp = redis.NewErrorf("ERR invalid slowLog start logId") + break + } + num, err = strconv.ParseInt(string(r.Multi[3].Value), 10, 64) + if err != nil { + r.Resp = redis.NewErrorf("ERR invalid slowLog number") + break + } + + r.Resp = SlowLogGetByIdAndNUm(id, num) + } else { + r.Resp = SlowLogGetByNum(10) + } + case "LEN": + if len(r.Multi) == 2 { + r.Resp = SlowLogGetLen() + } else { + r.Resp = redis.NewErrorf("ERR slowLog parameters") + } + case "RESET": + if len(r.Multi) == 2 { + r.Resp = SlowLogReset() + } else { + r.Resp = redis.NewErrorf("ERR slowLog parameters") + } + default: + r.Resp = redis.NewErrorf("ERR Unknown SLOWLOG subcommand or wrong args. Try GET, RESET, LEN.") + } + return nil +} + +func (s *Session) handlePConfig(r *Request) error { + if len(r.Multi) < 2 || len(r.Multi) > 4 { + r.Resp = redis.NewErrorf("ERR config parameters") + return nil + } + + var subCmd = strings.ToUpper(string(r.Multi[1].Value)) + switch subCmd { + case "GET": + if len(r.Multi) == 3 { + key := strings.ToLower(string(r.Multi[2].Value)) + r.Resp = s.proxy.ConfigGet(key) + } else { + r.Resp = redis.NewErrorf("ERR config get parameters.") + } + case "SET": + if len(r.Multi) == 3 { + key := strings.ToLower(string(r.Multi[2].Value)) + value := "" + r.Resp = s.proxy.ConfigSet(key, value) + } else if len(r.Multi) == 4 { + key := strings.ToLower(string(r.Multi[2].Value)) + value := string(r.Multi[3].Value) + r.Resp = s.proxy.ConfigSet(key, value) + } else { + r.Resp = redis.NewErrorf("ERR config set parameters.") + } + case "REWRITE": + if len(r.Multi) == 2 { + r.Resp = s.proxy.ConfigRewrite() + } else { + r.Resp = redis.NewErrorf("ERR config rewrite parameters") + } + default: + r.Resp = redis.NewErrorf("ERR Unknown CONFIG subcommand or wrong args. Try GET, SET, REWRITE.") + } + return nil +} diff --git a/codis/pkg/proxy/slowlog.go b/codis/pkg/proxy/slowlog.go new file mode 100644 index 0000000000..79c131c546 --- /dev/null +++ b/codis/pkg/proxy/slowlog.go @@ -0,0 +1,173 @@ +package proxy + +import ( + "container/list" + "strconv" + "sync" + + "pika/codis/v2/pkg/proxy/redis" + "pika/codis/v2/pkg/utils/log" + "pika/codis/v2/pkg/utils/sync2/atomic2" +) + +const ( + PIKA_SLOWLOG_LENGTH_DEFAULT = 128000 + PIKA_SLOWLOG_LENGTH_MAX = 10000000 +) + +type SlowLogEntry struct { + id int64 + time int64 + duration int64 + cmd string +} + +type SlowLog struct { + sync.Mutex + logList *list.List + logId atomic2.Int64 + maxLen atomic2.Int64 +} + +var PSlowLog = &SlowLog{} + +func init() { + PSlowLog.logList = list.New() + PSlowLog.logId.Swap(0) + PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_DEFAULT) +} + +func SlowLogSetMaxLen(len int64) { + if len < 0 { + PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_DEFAULT) + } else if len > PIKA_SLOWLOG_LENGTH_MAX { + PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_MAX) + } else { + PSlowLog.maxLen.Swap(len) + } +} + +func SlowLogGetCurLogId() int64 { + return PSlowLog.logId.Incr() +} + +func SlowLogPush(entry *SlowLogEntry) { + if entry == nil || PSlowLog.maxLen <= 0 { + return + } + if PSlowLog.TryLock() { + defer PSlowLog.Unlock() + PSlowLog.logList.PushFront(entry) // push a ptr + for int64(PSlowLog.logList.Len()) > PSlowLog.maxLen.Int64() { + PSlowLog.logList.Remove(PSlowLog.logList.Back()) + } + } else { + log.Warnf("cant get slowlog lock, logid: %d, cmd: %s", entry.id, entry.cmd) + } +} + +func SlowLogGetLen() *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + return redis.NewString([]byte(strconv.Itoa(PSlowLog.logList.Len()))) +} + +func SlowLogReset() *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + PSlowLog.logId.Swap(0) + PSlowLog.logList.Init() + return redis.NewString([]byte("OK")) +} + +func SlowLogToResp(entry *SlowLogEntry) *redis.Resp { + if entry == nil { + return redis.NewArray(make([]*redis.Resp, 0)) + } + return redis.NewArray([]*redis.Resp{ + redis.NewInt([]byte(strconv.FormatInt(entry.id, 10))), + redis.NewInt([]byte(strconv.FormatInt(entry.time, 10))), + redis.NewInt([]byte(strconv.FormatInt(entry.duration, 10))), + redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte(entry.cmd)), + }), + }) +} + +func SlowLogGetByNum(num int64) *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + if num <= 0 { + return redis.NewArray(make([]*redis.Resp, 0)) + } else if num > int64(PSlowLog.logList.Len()) { + num = int64(PSlowLog.logList.Len()) + } + var res = make([]*redis.Resp, 0, num) + var iter = PSlowLog.logList.Front() + for i := int64(0); i < num; i++ { + if iter == nil || iter.Value == nil { + break + } + if entry, ok := iter.Value.(*SlowLogEntry); ok { + res = append(res, SlowLogToResp(entry)) + } else { + log.Warnf("slowLogGet cont parse iter.Value[%v] to slowLogEntry.", iter.Value) + } + iter = iter.Next() + } + return redis.NewArray(res) +} + +func SlowLogGetByIdAndNUm(id, num int64) *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + + var smallestID int64 + var oldestNode = PSlowLog.logList.Back() + if oldestNode == nil || oldestNode.Value == nil { + log.Warnf("slowlogGet oldestNode or oldestNode.Value == nil, oldestNode: %v", oldestNode) + return redis.NewArray(make([]*redis.Resp, 0)) + } + + if entry, ok := oldestNode.Value.(*SlowLogEntry); ok { + smallestID = entry.id + } else { + log.Warnf("slowlogGet cont parse oldestNode.Value[%v] to slowlogEntry.", oldestNode.Value) + } + if id < smallestID || num < 0 { + return redis.NewArray(make([]*redis.Resp, 0)) + } + if num > id-smallestID+1 { + num = id - smallestID + 1 + } + + if num > int64(PSlowLog.logList.Len()) { + num = int64(PSlowLog.logList.Len()) + } + + var res = make([]*redis.Resp, num) + var iter = PSlowLog.logList.Front() + + for ; iter != nil && iter.Value != nil; iter = iter.Next() { + if entry, ok := iter.Value.(*SlowLogEntry); ok { + if id >= entry.id { + break + } + } else { + log.Warnf("slowlogGet cont parse iter.Value[%v] to slowlogEntry.", iter.Value) + } + } + + for i := int64(0); i < num; i++ { + if iter == nil || iter.Value == nil { + break + } + if entry, ok := iter.Value.(*SlowLogEntry); ok { + res = append(res, SlowLogToResp(entry)) + } else { + log.Warnf("slowlogGet cont parse iter.Value[%v] to slowlogEntry.", iter.Value) + } + iter = iter.Next() + } + return redis.NewArray(res) +} diff --git a/codis/pkg/utils/configAux.go b/codis/pkg/utils/configAux.go new file mode 100644 index 0000000000..7dc68286ad --- /dev/null +++ b/codis/pkg/utils/configAux.go @@ -0,0 +1,271 @@ +package utils + +import ( + "bufio" + "fmt" + "io" + "os" + "reflect" + "strconv" + "strings" + + "pika/codis/v2/pkg/utils/bytesize" + "pika/codis/v2/pkg/utils/errors" + "pika/codis/v2/pkg/utils/log" + "pika/codis/v2/pkg/utils/timesize" +) + +const ( + TypeConf = iota + TypeComment +) + +type ConfItem struct { + confType int // 0 means conf, 1 means comment + name string + value string +} + +type DeployConfig struct { + items []*ConfItem + confMap map[string]*ConfItem + sep string +} + +func (c *DeployConfig) Init(path string, sep string) error { + c.confMap = make(map[string]*ConfItem) + c.sep = sep + + f, err := os.Open(path) + if err != nil { + return err + } + defer func(f *os.File) { + err := f.Close() + if err != nil { + log.WarnErrorf(err, "Close %s failed.\n", path) + } + }(f) + + r := bufio.NewReader(f) + for { + b, _, err := r.ReadLine() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + line := strings.TrimSpace(string(b)) + + item := &ConfItem{} + if strings.Index(line, "#") == 0 || len(line) == 0 { + item.confType = TypeComment + item.name = line + c.items = append(c.items, item) + continue + } + index := strings.Index(line, sep) + if index <= 0 { + continue + } + key := strings.TrimSpace(line[:index]) + value := strings.TrimSpace(line[index+1:]) + if len(key) == 0 { + continue + } + item.confType = TypeConf + item.name = key + item.value = value + c.items = append(c.items, item) + c.confMap[item.name] = item + } +} + +func (c *DeployConfig) Reset(conf interface{}, isWrap bool) { + obj := reflect.ValueOf(conf) + + for i := 0; i < obj.NumField(); i++ { + fieldInfo := obj.Type().Field(i) + name := fieldInfo.Tag.Get("toml") + if name == "" || name == "-" { + continue + } + var value string + switch v := obj.Field(i).Interface().(type) { + case string: + value = strings.Trim(strings.TrimSpace(v), "\"") + if value == "" { + continue + } + if isWrap { + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set string with wrap failed!") + } + } else { + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set string without wrap failed!") + } + } + case int: + value = strconv.Itoa(v) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int failed!") + } + case int32: + value = strconv.FormatInt(int64(v), 10) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int32 failed!") + } + + case int64: + value = strconv.FormatInt(v, 10) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int64 failed!") + } + case bool: + if v { + err := c.Set(name, "true") + if err != nil { + log.WarnErrorf(err, "Set bool value failed!") + } + } else { + err := c.Set(name, "false") + if err != nil { + log.WarnErrorf(err, "Set bool value failed!") + } + } + case timesize.Duration: + if ret, err := v.MarshalText(); err != nil { + log.WarnErrorf(err, "config set %s failed.\n", name) + } else { + value = string(ret[:]) + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set timesize failed!") + } + } + + case bytesize.Int64: + if ret, err := v.MarshalText(); err != nil { + log.WarnErrorf(err, "config set %s failed.\n", name) + } else { + value = string(ret[:]) + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set bytesize failed!") + } + } + + default: + log.Warnf("value error: %v\n", v) + continue + } + } + +} + +func (c *DeployConfig) Set(key string, value string) error { + key = strings.TrimSpace(key) + value = strings.TrimSpace(value) + + log.Infof("Set key : %s, value: %s\n", key, value) + + if len(key) == 0 || len(value) == 0 { + return errors.New("key or value is null") + } + + item, found := c.confMap[key] + if found { + item.value = value + } else { + item := &ConfItem{ + confType: TypeConf, + name: key, + value: value, + } + c.items = append(c.items, item) + c.confMap[item.name] = item + } + return nil +} + +func (c *DeployConfig) Get(key string) string { + item, found := c.confMap[key] + if !found { + return "" + } + return item.value +} + +func (c *DeployConfig) Show() { + log.Infof("Show config, len = %d\n", len(c.items)) + for index, item := range c.items { + if item.confType == TypeComment { + // Comment format: id: context + log.Infof("%d: %s\n", index, item.name) + } else { + // Configuration format: id: key = value or id: key value + if len(strings.TrimSpace(c.sep)) > 0 { + log.Infof("%d: %s %s %s\n", index, item.name, c.sep, item.value) + } else { + log.Infof("%d: %s%s%s\n", index, item.name, c.sep, item.value) + } + } + } +} + +func (c *DeployConfig) ReWrite(confName string) error { + f, err := os.Create(confName) + if err != nil { + log.WarnErrorf(err, "create %s failed.\n", confName) + return err + } + defer func(f *os.File) { + err := f.Close() + if err != nil { + log.WarnErrorf(err, "Close %s failed.\n", confName) + } + }(f) + + w := bufio.NewWriter(f) + var lineStr string + for _, item := range c.items { + if item.confType == TypeComment { + lineStr = fmt.Sprintf("%s", item.name) + } else { + if len(strings.TrimSpace(c.sep)) > 0 { + lineStr = fmt.Sprintf("%s %s %s", item.name, c.sep, item.value) + } else { + lineStr = fmt.Sprintf("%s%s%s", item.name, c.sep, item.value) + } + } + fmt.Fprintln(w, lineStr) + } + return w.Flush() +} + +func RewriteConfig(postConf interface{}, defaultConf string, sep string, isWrap bool) error { + conf := &DeployConfig{} + err := conf.Init(defaultConf, sep) + if err != nil { + log.WarnErrorf(err, "open %s file failed.\n", defaultConf) + return err + } + conf.Reset(postConf, isWrap) + conf.Show() + var newConf = defaultConf + ".tmp" + if err = conf.ReWrite(newConf); err != nil { + return err + } + if err = os.Remove(defaultConf); err != nil { + return err + } + return os.Rename(newConf, defaultConf) +}