diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index e69bc02f4a..07f5a3126a 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -102,9 +102,6 @@ session_break_on_failure = false # Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log slowlog_log_slower_than = 100000 -# set the number of slowlog in memory, max len is 10000000. (0 to disable) -slowlog_max_len = 128000 - # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index c475443dc0..c531d4ab53 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -118,9 +118,6 @@ session_break_on_failure = false # Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log slowlog_log_slower_than = 100000 -# set the number of slowlog in memory, max len is 10000000. (0 to disable) -slowlog_max_len = 128000 - # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" @@ -183,7 +180,6 @@ type Config struct { SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"` SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"` - SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"` MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"` MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"` @@ -195,7 +191,8 @@ type Config struct { MetricsReportStatsdServer string `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"` MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"` MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"` - ConfigFileName string `toml:"-" json:"config_file_name"` + + ConfigFileName string `toml:"-" json:"config_file_name"` } func NewDefaultConfig() *Config { @@ -315,9 +312,6 @@ func (c *Config) Validate() error { if c.SlowlogLogSlowerThan < 0 { return errors.New("invalid slowlog_log_slower_than") } - if c.SlowlogMaxLen < 0 { - return errors.New("invalid slowlog_max_len") - } if c.MetricsReportPeriod < 0 { return errors.New("invalid metrics_report_period") diff --git a/codis/pkg/proxy/mapper.go b/codis/pkg/proxy/mapper.go index 3095c5548a..bb68e4f3cb 100644 --- a/codis/pkg/proxy/mapper.go +++ b/codis/pkg/proxy/mapper.go @@ -230,7 +230,6 @@ func init() { {"SUNIONSTORE", FlagWrite}, {"SYNC", FlagNotAllow}, {"PCONFIG", 0}, - {"PSLOWLOG", 0}, {"TIME", FlagNotAllow}, {"TOUCH", FlagWrite}, {"TTL", 0}, diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index c951694bfb..05cf9de8ef 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -216,14 +216,98 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp { p.mu.Lock() defer p.mu.Unlock() switch key { - case "proxy_max_clients": - return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.ProxyMaxClients))) + case "jodis": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("jodis_name")), + redis.NewBulkBytes([]byte(p.config.JodisName)), + redis.NewBulkBytes([]byte("jodis_addr")), + redis.NewBulkBytes([]byte(p.config.JodisAddr)), + redis.NewBulkBytes([]byte("jodis_auth")), + redis.NewBulkBytes([]byte(p.config.JodisAuth)), + redis.NewBulkBytes([]byte("jodis_timeout")), + redis.NewBulkBytes([]byte(p.config.JodisTimeout.Duration().String())), + redis.NewBulkBytes([]byte("jodis_compatible")), + redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.JodisCompatible))), + }) + case "proxy": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("proxy_datacenter")), + redis.NewBulkBytes([]byte(p.config.ProxyDataCenter)), + redis.NewBulkBytes([]byte("proxy_max_clients")), + redis.NewBulkBytes([]byte(strconv.Itoa(p.config.ProxyMaxClients))), + redis.NewBulkBytes([]byte("proxy_max_offheap_size")), + redis.NewBulkBytes([]byte(p.config.ProxyMaxOffheapBytes.HumanString())), + redis.NewBulkBytes([]byte("proxy_heap_placeholder")), + redis.NewBulkBytes([]byte(p.config.ProxyHeapPlaceholder.HumanString())), + }) + case "backend_ping_period": + return redis.NewBulkBytes([]byte(p.config.BackendPingPeriod.Duration().String())) + case "backend_buffer_size": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("backend_recv_bufsize")), + redis.NewBulkBytes([]byte(p.config.BackendRecvBufsize.HumanString())), + redis.NewBulkBytes([]byte("backend_send_bufsize")), + redis.NewBulkBytes([]byte(p.config.BackendSendBufsize.HumanString())), + }) + case "backend_timeout": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("backend_recv_timeout")), + redis.NewBulkBytes([]byte(p.config.BackendRecvTimeout.Duration().String())), + redis.NewBulkBytes([]byte("backend_send_timeout")), + redis.NewBulkBytes([]byte(p.config.BackendSendTimeout.Duration().String())), + }) + case "backend_max_pipeline": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendMaxPipeline))) case "backend_primary_only": return redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.BackendPrimaryOnly))) + case "max_slot_num": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.MaxSlotNum))) + case "backend_replica_parallel": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendReplicaParallel))) + case "backend_keepalive_period": + return redis.NewBulkBytes([]byte(p.config.BackendKeepAlivePeriod.Duration().String())) + case "backend_number_databases": + return redis.NewBulkBytes([]byte(strconv.FormatInt(int64(p.config.BackendNumberDatabases), 10))) + case "session_buffer_size": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("session_recv_bufsize")), + redis.NewBulkBytes([]byte(p.config.SessionRecvBufsize.HumanString())), + redis.NewBulkBytes([]byte("session_send_bufsize")), + redis.NewBulkBytes([]byte(p.config.SessionSendBufsize.HumanString())), + }) + case "session_timeout": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("session_recv_timeout")), + redis.NewBulkBytes([]byte(p.config.SessionRecvTimeout.Duration().String())), + redis.NewBulkBytes([]byte("session_send_timeout")), + redis.NewBulkBytes([]byte(p.config.SessionSendTimeout.Duration().String())), + }) case "slowlog_log_slower_than": return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogLogSlowerThan, 10))) - case "slowlog_max_len": - return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogMaxLen, 10))) + case "metrics_report_server": + return redis.NewBulkBytes([]byte(p.config.MetricsReportServer)) + case "metrics_report_period": + return redis.NewBulkBytes([]byte(p.config.MetricsReportPeriod.Duration().String())) + case "metrics_report_influxdb": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("metrics_report_influxdb_server")), + redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbServer)), + redis.NewBulkBytes([]byte("metrics_report_influxdb_period")), + redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbPeriod.Duration().String())), + redis.NewBulkBytes([]byte("metrics_report_influxdb_username")), + redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbUsername)), + redis.NewBulkBytes([]byte("metrics_report_influxdb_database")), + redis.NewBulkBytes([]byte(p.config.MetricsReportInfluxdbDatabase)), + }) + case "metrics_report_statsd": + return redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte("metrics_report_statsd_server")), + redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdServer)), + redis.NewBulkBytes([]byte("metrics_report_statsd_period")), + redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPeriod.Duration().String())), + redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")), + redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)), + }) default: return redis.NewErrorf("unsupported key: %s", key) } @@ -233,6 +317,9 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp { p.mu.Lock() defer p.mu.Unlock() switch key { + case "product_name": + p.config.ProductName = value + return redis.NewString([]byte("OK")) case "proxy_max_clients": n, err := strconv.Atoi(value) if err != nil { @@ -255,20 +342,6 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp { } p.config.SlowlogLogSlowerThan = n return redis.NewString([]byte("OK")) - case "slowlog_max_len": - n, err := strconv.ParseInt(value, 10, 64) - if err != nil { - return redis.NewErrorf("err:%s", err) - } - - if n < 0 { - return redis.NewErrorf("invalid slowlog_max_len") - } - p.config.SlowlogMaxLen = n - if p.config.SlowlogMaxLen > 0 { - SlowLogSetMaxLen(p.config.SlowlogMaxLen) - } - return redis.NewString([]byte("OK")) default: return redis.NewErrorf("unsupported key: %s", key) } diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index 188a62ae8c..b4fb8f7e57 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -257,12 +257,8 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3) } index := getWholeCmd(r.Multi, cmd) - cmdLog := fmt.Sprintf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", + log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index])) - log.Warnf("%s", cmdLog) - if s.config.SlowlogMaxLen > 0 { - SlowLogPush(&SlowLogEntry{SlowLogGetCurLogId(), r.ReceiveTime / 1e3, duration, cmdLog}) - } } return nil }) @@ -328,8 +324,6 @@ func (s *Session) handleRequest(r *Request, d *Router) error { return s.handleRequestExists(r, d) case "PCONFIG": return s.handlePConfig(r) - case "PSLOWLOG": - return s.handlePSlowLog(r) case "SLOTSINFO": return s.handleRequestSlotsInfo(r, d) case "SLOTSSCAN": @@ -726,61 +720,6 @@ func (s *Session) flushOpStats(force bool) { } } -func (s *Session) handlePSlowLog(r *Request) error { - if len(r.Multi) < 2 || len(r.Multi) > 4 { - r.Resp = redis.NewErrorf("ERR slowLog parameters") - return nil - } - var subCmd = strings.ToUpper(string(r.Multi[1].Value)) - switch subCmd { - case "GET": - if len(r.Multi) == 3 { - num, err := strconv.ParseInt(string(r.Multi[2].Value), 10, 64) - if err != nil { - r.Resp = redis.NewErrorf("ERR invalid slowLog number") - break - } - - r.Resp = SlowLogGetByNum(num) - } else if len(r.Multi) == 4 { - var ( - id int64 - num int64 - err error - ) - id, err = strconv.ParseInt(string(r.Multi[2].Value), 10, 64) - if err != nil { - r.Resp = redis.NewErrorf("ERR invalid slowLog start logId") - break - } - num, err = strconv.ParseInt(string(r.Multi[3].Value), 10, 64) - if err != nil { - r.Resp = redis.NewErrorf("ERR invalid slowLog number") - break - } - - r.Resp = SlowLogGetByIdAndNUm(id, num) - } else { - r.Resp = SlowLogGetByNum(10) - } - case "LEN": - if len(r.Multi) == 2 { - r.Resp = SlowLogGetLen() - } else { - r.Resp = redis.NewErrorf("ERR slowLog parameters") - } - case "RESET": - if len(r.Multi) == 2 { - r.Resp = SlowLogReset() - } else { - r.Resp = redis.NewErrorf("ERR slowLog parameters") - } - default: - r.Resp = redis.NewErrorf("ERR Unknown SLOWLOG subcommand or wrong args. Try GET, RESET, LEN.") - } - return nil -} - func (s *Session) handlePConfig(r *Request) error { if len(r.Multi) < 2 || len(r.Multi) > 4 { r.Resp = redis.NewErrorf("ERR config parameters") diff --git a/codis/pkg/proxy/slowlog.go b/codis/pkg/proxy/slowlog.go deleted file mode 100644 index 79c131c546..0000000000 --- a/codis/pkg/proxy/slowlog.go +++ /dev/null @@ -1,173 +0,0 @@ -package proxy - -import ( - "container/list" - "strconv" - "sync" - - "pika/codis/v2/pkg/proxy/redis" - "pika/codis/v2/pkg/utils/log" - "pika/codis/v2/pkg/utils/sync2/atomic2" -) - -const ( - PIKA_SLOWLOG_LENGTH_DEFAULT = 128000 - PIKA_SLOWLOG_LENGTH_MAX = 10000000 -) - -type SlowLogEntry struct { - id int64 - time int64 - duration int64 - cmd string -} - -type SlowLog struct { - sync.Mutex - logList *list.List - logId atomic2.Int64 - maxLen atomic2.Int64 -} - -var PSlowLog = &SlowLog{} - -func init() { - PSlowLog.logList = list.New() - PSlowLog.logId.Swap(0) - PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_DEFAULT) -} - -func SlowLogSetMaxLen(len int64) { - if len < 0 { - PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_DEFAULT) - } else if len > PIKA_SLOWLOG_LENGTH_MAX { - PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_MAX) - } else { - PSlowLog.maxLen.Swap(len) - } -} - -func SlowLogGetCurLogId() int64 { - return PSlowLog.logId.Incr() -} - -func SlowLogPush(entry *SlowLogEntry) { - if entry == nil || PSlowLog.maxLen <= 0 { - return - } - if PSlowLog.TryLock() { - defer PSlowLog.Unlock() - PSlowLog.logList.PushFront(entry) // push a ptr - for int64(PSlowLog.logList.Len()) > PSlowLog.maxLen.Int64() { - PSlowLog.logList.Remove(PSlowLog.logList.Back()) - } - } else { - log.Warnf("cant get slowlog lock, logid: %d, cmd: %s", entry.id, entry.cmd) - } -} - -func SlowLogGetLen() *redis.Resp { - PSlowLog.Lock() - defer PSlowLog.Unlock() - return redis.NewString([]byte(strconv.Itoa(PSlowLog.logList.Len()))) -} - -func SlowLogReset() *redis.Resp { - PSlowLog.Lock() - defer PSlowLog.Unlock() - PSlowLog.logId.Swap(0) - PSlowLog.logList.Init() - return redis.NewString([]byte("OK")) -} - -func SlowLogToResp(entry *SlowLogEntry) *redis.Resp { - if entry == nil { - return redis.NewArray(make([]*redis.Resp, 0)) - } - return redis.NewArray([]*redis.Resp{ - redis.NewInt([]byte(strconv.FormatInt(entry.id, 10))), - redis.NewInt([]byte(strconv.FormatInt(entry.time, 10))), - redis.NewInt([]byte(strconv.FormatInt(entry.duration, 10))), - redis.NewArray([]*redis.Resp{ - redis.NewBulkBytes([]byte(entry.cmd)), - }), - }) -} - -func SlowLogGetByNum(num int64) *redis.Resp { - PSlowLog.Lock() - defer PSlowLog.Unlock() - if num <= 0 { - return redis.NewArray(make([]*redis.Resp, 0)) - } else if num > int64(PSlowLog.logList.Len()) { - num = int64(PSlowLog.logList.Len()) - } - var res = make([]*redis.Resp, 0, num) - var iter = PSlowLog.logList.Front() - for i := int64(0); i < num; i++ { - if iter == nil || iter.Value == nil { - break - } - if entry, ok := iter.Value.(*SlowLogEntry); ok { - res = append(res, SlowLogToResp(entry)) - } else { - log.Warnf("slowLogGet cont parse iter.Value[%v] to slowLogEntry.", iter.Value) - } - iter = iter.Next() - } - return redis.NewArray(res) -} - -func SlowLogGetByIdAndNUm(id, num int64) *redis.Resp { - PSlowLog.Lock() - defer PSlowLog.Unlock() - - var smallestID int64 - var oldestNode = PSlowLog.logList.Back() - if oldestNode == nil || oldestNode.Value == nil { - log.Warnf("slowlogGet oldestNode or oldestNode.Value == nil, oldestNode: %v", oldestNode) - return redis.NewArray(make([]*redis.Resp, 0)) - } - - if entry, ok := oldestNode.Value.(*SlowLogEntry); ok { - smallestID = entry.id - } else { - log.Warnf("slowlogGet cont parse oldestNode.Value[%v] to slowlogEntry.", oldestNode.Value) - } - if id < smallestID || num < 0 { - return redis.NewArray(make([]*redis.Resp, 0)) - } - if num > id-smallestID+1 { - num = id - smallestID + 1 - } - - if num > int64(PSlowLog.logList.Len()) { - num = int64(PSlowLog.logList.Len()) - } - - var res = make([]*redis.Resp, num) - var iter = PSlowLog.logList.Front() - - for ; iter != nil && iter.Value != nil; iter = iter.Next() { - if entry, ok := iter.Value.(*SlowLogEntry); ok { - if id >= entry.id { - break - } - } else { - log.Warnf("slowlogGet cont parse iter.Value[%v] to slowlogEntry.", iter.Value) - } - } - - for i := int64(0); i < num; i++ { - if iter == nil || iter.Value == nil { - break - } - if entry, ok := iter.Value.(*SlowLogEntry); ok { - res = append(res, SlowLogToResp(entry)) - } else { - log.Warnf("slowlogGet cont parse iter.Value[%v] to slowlogEntry.", iter.Value) - } - iter = iter.Next() - } - return redis.NewArray(res) -} diff --git a/src/storage/tests/strings_test.cc b/src/storage/tests/strings_test.cc index e1c2d011f8..27759269f3 100644 --- a/src/storage/tests/strings_test.cc +++ b/src/storage/tests/strings_test.cc @@ -938,6 +938,9 @@ TEST_F(StringsTest, BitPosTest) { // PKSetexAt TEST_F(StringsTest, PKSetexAtTest) { +#ifdef OS_MACOSX + return ; +#endif int64_t unix_time; rocksdb::Env::Default()->GetCurrentTime(&unix_time); std::map ttl_ret;