From bd174a6d22b9673a58107de5e014abf9a4e49ead Mon Sep 17 00:00:00 2001 From: baerwang <52104949+baerwang@users.noreply.github.com> Date: Thu, 2 Nov 2023 20:33:38 +0800 Subject: [PATCH 1/8] chore: fix cache and optimization repeat docker builds (#2093) --- .github/workflows/pika.yml | 38 ++++++++++++++++++++++++++------------ ci/Dockerfile | 22 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 12 deletions(-) create mode 100644 ci/Dockerfile diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index bf1854f25a..1c66e343b8 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -9,6 +9,7 @@ on: env: # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) BUILD_TYPE: RelWithDebInfo + ARTIFACT_PIKA_NAME: artifact-pika jobs: build_on_ubuntu: @@ -50,15 +51,21 @@ jobs: id: cache-ubuntu with: key: ${{ runner.os }}-build-ubuntu-${{ hashFiles('**/CMakeLists.txt') }} - path: ${{ github.workspace }} - restore-keys: | - ${{ runner.os }}-build-ubuntu- + path: | + ${{ github.workspace }}/build + ${{ github.workspace }}/buildtrees + ${{ github.workspace }}/deps - name: Build if: ${{ steps.cache-ubuntu.outputs.cache-hit != 'true' }} # Build your program with the given configuration run: cmake --build build --config ${{ env.BUILD_TYPE }} + - uses: actions/upload-artifact@v3 + with: + name: ${{ env.ARTIFACT_PIKA_NAME }} + path: ${{ github.workspace }}/build/pika + - name: Test working-directory: ${{ github.workspace }}/build # Execute tests defined by the CMake configuration. @@ -120,9 +127,10 @@ jobs: id: cache-centos with: key: ${{ runner.os }}-build-centos-${{ hashFiles('**/CMakeLists.txt') }} - path: ${{ github.workspace }} - restore-keys: | - ${{ runner.os }}-build-centos- + path: | + ${{ github.workspace }}/build + ${{ github.workspace }}/buildtrees + ${{ github.workspace }}/deps - name: Build if: ${{ steps.cache-centos.outputs.cache-hit != 'true' }} @@ -150,7 +158,6 @@ jobs: cd ../tests/integration/ chmod +x integrate_test.sh sh integrate_test.sh - build_on_macos: runs-on: macos-latest @@ -179,9 +186,10 @@ jobs: id: cache-macos with: key: ${{ runner.os }}-build-macos-${{ hashFiles('**/CMakeLists.txt') }} - path: ${{ github.workspace }} - restore-keys: | - ${{ runner.os }}-build-macos- + path: | + ${{ github.workspace }}/build + ${{ github.workspace }}/buildtrees + ${{ github.workspace }}/deps - name: Build if: ${{ steps.cache-macos.outputs.cache-hit != 'true' }} @@ -213,6 +221,7 @@ jobs: build_pika_image: name: Build Pika Docker image runs-on: ubuntu-latest + needs: build_on_ubuntu steps: - name: Check out the repo uses: actions/checkout@v3 @@ -229,11 +238,16 @@ jobs: with: images: pikadb/pika + - uses: actions/download-artifact@v3 + with: + name: ${{ env.ARTIFACT_PIKA_NAME }} + path: artifact/ + - name: Build Docker image - uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 + uses: docker/build-push-action@v5 with: context: . - file: ./Dockerfile + file: ./ci/Dockerfile push: false tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} diff --git a/ci/Dockerfile b/ci/Dockerfile new file mode 100644 index 0000000000..4ce5b3f947 --- /dev/null +++ b/ci/Dockerfile @@ -0,0 +1,22 @@ +FROM ubuntu:22.04 + +RUN apt-get update && apt-get install -y \ + ca-certificates \ + rsync && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists /var/cache/apt/archives + +ENV PIKA=/pika \ + PATH=${PIKA}:${PIKA}/bin:${PATH} + +WORKDIR ${PIKA} + +COPY artifact/pika ${PIKA}/bin/pika +COPY entrypoint.sh /entrypoint.sh +COPY conf/pika.conf ${PIKA}/conf/pika.conf + +ENTRYPOINT ["/entrypoint.sh"] + +EXPOSE 9221 + +CMD ["/pika/bin/pika", "-c", "/pika/conf/pika.conf"] \ No newline at end of file From 94bee9c8c703c0ca9c6df31c07fb9f4a28870cb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E5=B0=8F=E5=B8=85?= <56024577+dingxiaoshuai123@users.noreply.github.com> Date: Thu, 2 Nov 2023 20:34:06 +0800 Subject: [PATCH 2/8] fix:fixed the return value error of the decr command. (#2092) --- src/storage/src/redis_strings.cc | 3 +- tests/integration/string_test.go | 47 +++++++++++++++++++++----------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/src/storage/src/redis_strings.cc b/src/storage/src/redis_strings.cc index 594ac81969..f59c4011dc 100644 --- a/src/storage/src/redis_strings.cc +++ b/src/storage/src/redis_strings.cc @@ -348,8 +348,9 @@ Status RedisStrings::Decrby(const Slice& key, int64_t value, int64_t* ret) { int32_t timestamp = parsed_strings_value.timestamp(); std::string old_user_value = parsed_strings_value.value().ToString(); char* end = nullptr; + errno = 0; int64_t ival = strtoll(old_user_value.c_str(), &end, 10); - if (*end != 0) { + if (errno == ERANGE || *end != 0) { return Status::Corruption("Value is not a integer"); } if ((value >= 0 && LLONG_MIN + value > ival) || (value < 0 && LLONG_MAX + value < ival)) { diff --git a/tests/integration/string_test.go b/tests/integration/string_test.go index 760cb7bd1d..3ac8ec9395 100644 --- a/tests/integration/string_test.go +++ b/tests/integration/string_test.go @@ -182,23 +182,38 @@ var _ = Describe("String Commands", func() { Expect(err).NotTo(HaveOccurred()) }) + // fix: https://github.com/OpenAtomFoundation/pika/issues/2061 It("should Decr", func() { - set := client.Set(ctx, "key", "10", 0) - Expect(set.Err()).NotTo(HaveOccurred()) - Expect(set.Val()).To(Equal("OK")) - - decr := client.Decr(ctx, "key") - Expect(decr.Err()).NotTo(HaveOccurred()) - Expect(decr.Val()).To(Equal(int64(9))) - - set = client.Set(ctx, "key", "234293482390480948029348230948", 0) - Expect(set.Err()).NotTo(HaveOccurred()) - Expect(set.Val()).To(Equal("OK")) - - //decr = client.Decr(ctx, "key") - //Expect(set.Err()).NotTo(HaveOccurred()) - //Expect(decr.Err()).To(MatchError("ERR value is not an integer or out of range")) - //Expect(decr.Val()).To(Equal(int64(-1))) + basicSet := client.Set(ctx, "mykey", "10", 0) + Expect(basicSet.Err()).NotTo(HaveOccurred()) + Expect(basicSet.Val()).To(Equal("OK")) + basicDecr := client.Decr(ctx, "mykey") + Expect(basicDecr.Err()).NotTo(HaveOccurred()) + Expect(basicDecr.Val()).To(Equal(int64(9))) + basicDecr = client.Decr(ctx, "mykey") + Expect(basicDecr.Err()).NotTo(HaveOccurred()) + Expect(basicDecr.Val()).To(Equal(int64(8))) + + for i := 0; i < 5; i++ { + set := client.Set(ctx, "key", "234293482390480948029348230948", 0) + Expect(set.Err()).NotTo(HaveOccurred()) + Expect(set.Val()).To(Equal("OK")) + decr := client.Decr(ctx, "key") + Expect(decr.Err()).To(MatchError("ERR value is not an integer or out of range")) + + set = client.Set(ctx, "key", "-9223372036854775809", 0) + Expect(set.Err()).NotTo(HaveOccurred()) + Expect(set.Val()).To(Equal("OK")) + decr = client.Decr(ctx, "key") + Expect(decr.Err()).To(MatchError("ERR value is not an integer or out of range")) + + inter := randomInt(500) + set = client.Set(ctx, "key", inter, 0) + for j := 0; j < 200; j++ { + res := client.Decr(ctx, "key") + Expect(res.Err()).NotTo(HaveOccurred()) + } + } }) It("should DecrBy", func() { From d6ea4e2fd0406317d917a67e08c2f30e91021222 Mon Sep 17 00:00:00 2001 From: u6th9d Date: Sat, 4 Nov 2023 21:59:07 +0800 Subject: [PATCH 3/8] fix issue 2096: The setrange and setbit commands do not retain the expiration time of the original key (#2095) * fix setrange/bitop cmd lost key expire time * Update string_test.go --- src/storage/src/redis_strings.cc | 6 ++++++ tests/integration/string_test.go | 30 +++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/storage/src/redis_strings.cc b/src/storage/src/redis_strings.cc index f59c4011dc..2fa7501581 100644 --- a/src/storage/src/redis_strings.cc +++ b/src/storage/src/redis_strings.cc @@ -658,10 +658,12 @@ Status RedisStrings::SetBit(const Slice& key, int64_t offset, int32_t on, int32_ Status s = db_->Get(default_read_options_, key, &meta_value); if (s.ok() || s.IsNotFound()) { std::string data_value; + int32_t timestamp = 0; if (s.ok()) { ParsedStringsValue parsed_strings_value(&meta_value); if (!parsed_strings_value.IsStale()) { data_value = parsed_strings_value.value().ToString(); + timestamp = parsed_strings_value.timestamp(); } } size_t byte = offset >> 3; @@ -687,6 +689,7 @@ Status RedisStrings::SetBit(const Slice& key, int64_t offset, int32_t on, int32_ data_value.append(1, byte_val); } StringsValue strings_value(data_value); + strings_value.set_timestamp(timestamp); return db_->Put(rocksdb::WriteOptions(), key, strings_value.Encode()); } else { return s; @@ -803,6 +806,7 @@ Status RedisStrings::Setrange(const Slice& key, int64_t start_offset, const Slic ScopeRecordLock l(lock_mgr_, key); Status s = db_->Get(default_read_options_, key, &old_value); if (s.ok()) { + int32_t timestamp = 0; ParsedStringsValue parsed_strings_value(&old_value); parsed_strings_value.StripSuffix(); if (parsed_strings_value.IsStale()) { @@ -810,6 +814,7 @@ Status RedisStrings::Setrange(const Slice& key, int64_t start_offset, const Slic new_value = tmp.append(value.data()); *ret = static_cast(new_value.length()); } else { + timestamp = parsed_strings_value.timestamp(); if (static_cast(start_offset) > old_value.length()) { old_value.resize(start_offset); new_value = old_value.append(value.data()); @@ -824,6 +829,7 @@ Status RedisStrings::Setrange(const Slice& key, int64_t start_offset, const Slic } *ret = static_cast(new_value.length()); StringsValue strings_value(new_value); + strings_value.set_timestamp(timestamp); return db_->Put(default_write_options_, key, strings_value.Encode()); } else if (s.IsNotFound()) { std::string tmp(start_offset, '\0'); diff --git a/tests/integration/string_test.go b/tests/integration/string_test.go index 3ac8ec9395..dcf48e921c 100644 --- a/tests/integration/string_test.go +++ b/tests/integration/string_test.go @@ -240,6 +240,23 @@ var _ = Describe("String Commands", func() { Expect(get.Val()).To(Equal("hello")) }) + It("should SetBit", func() { + setBit := client.SetBit(ctx, "key_3s", 7, 1) + Expect(setBit.Err()).NotTo(HaveOccurred()) + Expect(setBit.Val()).To(Equal(int64(0))) + + Expect(client.Expire(ctx, "key_3s", 3*time.Second).Val()).To(Equal(true)) + Expect(client.TTL(ctx, "key_3s").Val()).NotTo(Equal(int64(-2))) + + setBit = client.SetBit(ctx, "key_3s", 69, 1) + Expect(client.TTL(ctx, "key_3s").Val()).NotTo(Equal(int64(-2))) + Expect(setBit.Err()).NotTo(HaveOccurred()) + Expect(setBit.Val()).To(Equal(int64(0))) + + time.Sleep(4 * time.Second) + Expect(client.TTL(ctx, "key_3s").Val()).To(Equal(time.Duration(-2))) + }) + It("should GetBit", func() { setBit := client.SetBit(ctx, "key", 7, 1) Expect(setBit.Err()).NotTo(HaveOccurred()) @@ -882,17 +899,24 @@ var _ = Describe("String Commands", func() { }) It("should SetRange", func() { - set := client.Set(ctx, "key", "Hello World", 0) + set := client.Set(ctx, "key_3s", "Hello World", 0) Expect(set.Err()).NotTo(HaveOccurred()) Expect(set.Val()).To(Equal("OK")) - range_ := client.SetRange(ctx, "key", 6, "Redis") + Expect(client.Expire(ctx, "key_3s", 3*time.Second).Val()).To(Equal(true)) + Expect(client.TTL(ctx, "key_3s").Val()).NotTo(Equal(int64(-2))) + + range_ := client.SetRange(ctx, "key_3s", 6, "Redis") Expect(range_.Err()).NotTo(HaveOccurred()) Expect(range_.Val()).To(Equal(int64(11))) - get := client.Get(ctx, "key") + get := client.Get(ctx, "key_3s") Expect(get.Err()).NotTo(HaveOccurred()) Expect(get.Val()).To(Equal("Hello Redis")) + Expect(client.TTL(ctx, "key_3s").Val()).NotTo(Equal(int64(-2))) + + time.Sleep(4 * time.Second) + Expect(client.TTL(ctx, "key_3s").Val()).To(Equal(time.Duration(-2))) }) It("should StrLen", func() { From 5a3102abdf6b7b74be91daf8056b380d19e051fc Mon Sep 17 00:00:00 2001 From: baerwang <52104949+baerwang@users.noreply.github.com> Date: Mon, 6 Nov 2023 13:49:22 +0800 Subject: [PATCH 4/8] style: change license (#2100) --- integrate_test.sh | 19 ++++------------ src/net/src/build_version.cc.in | 5 +++++ src/net/src/http_conn.cc | 1 + src/net/src/net_thread_name.h | 1 + src/net/src/net_util.cc | 1 + src/pstd/src/build_version.cc | 5 +++++ tests/integration/integrate_test.sh | 19 ++++------------ tools/pika-port/build2.sh | 2 +- tools/pika-port/build3.sh | 2 +- tools/pika-port/glog.sh | 2 +- tools/pika_operator/README.md | 22 +++++-------------- .../pika_operator/api/v1alpha1/additional.go | 7 ++++++ .../api/v1alpha1/groupversion_info.go | 17 ++++---------- .../pika_operator/api/v1alpha1/pika_types.go | 17 ++++---------- .../api/v1alpha1/zz_generated.deepcopy.go | 17 ++++---------- .../controllers/pika_controller.go | 17 ++++---------- .../controllers/pika_controller_test.go | 7 ++++++ tools/pika_operator/controllers/suite_test.go | 17 ++++---------- tools/pika_operator/hack/boilerplate.go.txt | 17 ++++---------- tools/pika_operator/main.go | 17 ++++---------- 20 files changed, 72 insertions(+), 140 deletions(-) diff --git a/integrate_test.sh b/integrate_test.sh index cc555b3fd0..e2e3337393 100644 --- a/integrate_test.sh +++ b/integrate_test.sh @@ -1,18 +1,7 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. An additional grant +# of patent rights can be found in the PATENTS file in the same directory. #!/bin/bash diff --git a/src/net/src/build_version.cc.in b/src/net/src/build_version.cc.in index cc4026a838..5087b21249 100644 --- a/src/net/src/build_version.cc.in +++ b/src/net/src/build_version.cc.in @@ -1,3 +1,8 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + #include "net/include/build_version.h" const char* net_build_git_sha = "net_build_git_sha:@@GIT_SHA@@"; const char* net_build_git_date = "net_build_git_date:@@GIT_DATE_TIME@@"; diff --git a/src/net/src/http_conn.cc b/src/net/src/http_conn.cc index 027afd5e36..bde5f46177 100644 --- a/src/net/src/http_conn.cc +++ b/src/net/src/http_conn.cc @@ -2,6 +2,7 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. + #include "net/include/http_conn.h" #include #include diff --git a/src/net/src/net_thread_name.h b/src/net/src/net_thread_name.h index 355cd66f1a..e85cd1a6df 100644 --- a/src/net/src/net_thread_name.h +++ b/src/net/src/net_thread_name.h @@ -2,6 +2,7 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. + #ifndef NET_THREAD_NAME_H #define NET_THREAD_NAME_H diff --git a/src/net/src/net_util.cc b/src/net/src/net_util.cc index 791df31e86..6f1f4692d0 100644 --- a/src/net/src/net_util.cc +++ b/src/net/src/net_util.cc @@ -2,6 +2,7 @@ // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. + #include "net/src/net_util.h" #include #include diff --git a/src/pstd/src/build_version.cc b/src/pstd/src/build_version.cc index 506f4d3829..7e8f1fd035 100644 --- a/src/pstd/src/build_version.cc +++ b/src/pstd/src/build_version.cc @@ -1,3 +1,8 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + #include "pstd/include/version.h" const char* pstd_build_git_sha = "pstd_build_git_sha:2f67b928b3ccd2f23109802aa9932a7af45abcd9"; const char* pstd_build_git_date = "pstd_build_git_date:2023-03-27"; diff --git a/tests/integration/integrate_test.sh b/tests/integration/integrate_test.sh index 4fbcb92da0..8c3319acec 100755 --- a/tests/integration/integrate_test.sh +++ b/tests/integration/integrate_test.sh @@ -1,18 +1,7 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. An additional grant +# of patent rights can be found in the PATENTS file in the same directory. go mod tidy go test \ No newline at end of file diff --git a/tools/pika-port/build2.sh b/tools/pika-port/build2.sh index b41358d50e..8db596c09e 100644 --- a/tools/pika-port/build2.sh +++ b/tools/pika-port/build2.sh @@ -3,7 +3,7 @@ # DESC : pika-port2 build script # AUTHOR : Alex Stocks # VERSION : 1.0 -# LICENCE : Apache License 2.0 +# LICENCE : BSD-3-Clause License # EMAIL : alexstocks@foxmail.com # MOD : 2019-01-22 19:54 # FILE : build.sh diff --git a/tools/pika-port/build3.sh b/tools/pika-port/build3.sh index dbf2133508..1648de2c89 100644 --- a/tools/pika-port/build3.sh +++ b/tools/pika-port/build3.sh @@ -3,7 +3,7 @@ # DESC : pika-port3 build script # AUTHOR : Alex Stocks # VERSION : 1.0 -# LICENCE : Apache License 2.0 +# LICENCE : BSD-3-Clause License # EMAIL : alexstocks@foxmail.com # MOD : 2019-01-22 19:54 # FILE : build.sh diff --git a/tools/pika-port/glog.sh b/tools/pika-port/glog.sh index 37b347250e..2c83fbc8e2 100644 --- a/tools/pika-port/glog.sh +++ b/tools/pika-port/glog.sh @@ -3,7 +3,7 @@ # DESC : glog build script # AUTHOR : Alex Stocks # VERSION : 1.0 -# LICENCE : Apache License 2.0 +# LICENCE : BSD-3-Clause License # EMAIL : alexstocks@foxmail.com # MOD : 2019-01-22 19:54 # FILE : build.sh diff --git a/tools/pika_operator/README.md b/tools/pika_operator/README.md index 23bc83435c..b5fd5c10d5 100644 --- a/tools/pika_operator/README.md +++ b/tools/pika_operator/README.md @@ -13,7 +13,7 @@ It is responsible for creating and managing the following resources: ## Getting Started -You’ll need a Kubernetes cluster to run against. You can use [MiniKube](https://minikube.sigs.k8s.io) +You’ll need a Kubernetes cluster to run against. You can use [MiniKube](https://minikube.sigs.k8s.io) or [KIND](https://kind.sigs.k8s.io) to get a local cluster for testing, or run against a remote cluster. **Note:** Your controller will automatically use the current context in your kubeconfig file (i.e. whatever @@ -22,7 +22,7 @@ cluster `kubectl cluster-info` shows). ### Running locally with MiniKube 1. Install [MiniKube](https://minikube.sigs.k8s.io/docs/start/) - + 2. Start a local cluster: ```sh @@ -147,17 +147,7 @@ More information can be found via the [Kubebuilder Documentation](https://book.k ## License -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. diff --git a/tools/pika_operator/api/v1alpha1/additional.go b/tools/pika_operator/api/v1alpha1/additional.go index 88774895c9..79b3552db1 100644 --- a/tools/pika_operator/api/v1alpha1/additional.go +++ b/tools/pika_operator/api/v1alpha1/additional.go @@ -1,3 +1,10 @@ +/* +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. +*/ + package v1alpha1 const ( diff --git a/tools/pika_operator/api/v1alpha1/groupversion_info.go b/tools/pika_operator/api/v1alpha1/groupversion_info.go index 1686366033..b8d980b7aa 100644 --- a/tools/pika_operator/api/v1alpha1/groupversion_info.go +++ b/tools/pika_operator/api/v1alpha1/groupversion_info.go @@ -1,17 +1,8 @@ /* -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. */ // Package v1alpha1 contains API Schema definitions for the pika v1alpha1 API group diff --git a/tools/pika_operator/api/v1alpha1/pika_types.go b/tools/pika_operator/api/v1alpha1/pika_types.go index d14f53536a..0181424863 100644 --- a/tools/pika_operator/api/v1alpha1/pika_types.go +++ b/tools/pika_operator/api/v1alpha1/pika_types.go @@ -1,17 +1,8 @@ /* -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. */ package v1alpha1 diff --git a/tools/pika_operator/api/v1alpha1/zz_generated.deepcopy.go b/tools/pika_operator/api/v1alpha1/zz_generated.deepcopy.go index 062f4d1458..f1b8b25e64 100644 --- a/tools/pika_operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/tools/pika_operator/api/v1alpha1/zz_generated.deepcopy.go @@ -2,19 +2,10 @@ // +build !ignore_autogenerated /* -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. */ // Code generated by controller-gen. DO NOT EDIT. diff --git a/tools/pika_operator/controllers/pika_controller.go b/tools/pika_operator/controllers/pika_controller.go index f0e50b87c7..e263ad80ca 100644 --- a/tools/pika_operator/controllers/pika_controller.go +++ b/tools/pika_operator/controllers/pika_controller.go @@ -1,17 +1,8 @@ /* -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. */ package controllers diff --git a/tools/pika_operator/controllers/pika_controller_test.go b/tools/pika_operator/controllers/pika_controller_test.go index 2d1afa3adc..981a68bec5 100644 --- a/tools/pika_operator/controllers/pika_controller_test.go +++ b/tools/pika_operator/controllers/pika_controller_test.go @@ -1,3 +1,10 @@ +/* +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. +*/ + package controllers import ( diff --git a/tools/pika_operator/controllers/suite_test.go b/tools/pika_operator/controllers/suite_test.go index 55367f4605..1fc54ac7ab 100644 --- a/tools/pika_operator/controllers/suite_test.go +++ b/tools/pika_operator/controllers/suite_test.go @@ -1,17 +1,8 @@ /* -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. */ package controllers diff --git a/tools/pika_operator/hack/boilerplate.go.txt b/tools/pika_operator/hack/boilerplate.go.txt index 65b8622718..8e060b602d 100644 --- a/tools/pika_operator/hack/boilerplate.go.txt +++ b/tools/pika_operator/hack/boilerplate.go.txt @@ -1,15 +1,6 @@ /* -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. */ \ No newline at end of file diff --git a/tools/pika_operator/main.go b/tools/pika_operator/main.go index 292d811708..a7b796e2d5 100644 --- a/tools/pika_operator/main.go +++ b/tools/pika_operator/main.go @@ -1,17 +1,8 @@ /* -Copyright 2023. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. +This source code is licensed under the BSD-style license found in the +LICENSE file in the root directory of this source tree. An additional grant +of patent rights can be found in the PATENTS file in the same directory. */ package main From 8364fe59b25ef3940eaaf34a880491679dfd4ac8 Mon Sep 17 00:00:00 2001 From: Zsh_Space <49178994+a105531816@users.noreply.github.com> Date: Tue, 7 Nov 2023 14:44:11 +0800 Subject: [PATCH 5/8] add pksetexat in pika-port (#2104) * add pksetexat * add pksetexat --- tools/pika-port/pika_port_3/master_conn.cc | 38 ++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/tools/pika-port/pika_port_3/master_conn.cc b/tools/pika-port/pika_port_3/master_conn.cc index fbb7cb3c66..0b7e7213b3 100644 --- a/tools/pika-port/pika_port_3/master_conn.cc +++ b/tools/pika-port/pika_port_3/master_conn.cc @@ -275,10 +275,44 @@ bool MasterConn::ProcessBinlogData(const net::RedisCmdArgsType& argv, const Port if (1 < argv.size()) { key = argv[1]; } - int ret = g_pika_port->SendRedisCommand(binlog_item.content(), key); + + std::string command; + if (argv[0] == "pksetexat"){ + //struct timeval now; + std::string temp(""); + std::string time_out(""); + std::string time_cmd(""); + int start; + int old_time_size; + int new_time_size; + int diff; + temp = argv[2]; + //gettimeofday(&now, NULL); + unsigned long int sec= time(NULL); + unsigned long int tot; + tot = std::stol(temp) - sec; + time_out = std::to_string(tot); + + command = binlog_item.content(); + command.erase(0,4); + command.replace(0, 13, "*4\r\n$5\r\nsetex"); + //"*4\r\n$5\r\nsetex\r\n$48\r\n1691478611637921200018685540810_4932190141418052\r\n$10\r\n1691483848\r\n$1681\r\n(\265/\375`\332\024=4"}} + start = 13 + 3 + std::to_string(key.size()).size() + 2 + key.size() +3; + old_time_size = std::to_string(temp.size()).size() + 2 + temp.size(); + new_time_size = std::to_string(time_out.size()).size() + 2 + time_out.size(); + diff = old_time_size - new_time_size; + command.erase(start, diff); + time_cmd = std::to_string(time_out.size()) + "\r\n" + time_out; + command.replace(start, new_time_size, time_cmd); + } else { + command = binlog_item.content(); + } + + int ret = g_pika_port->SendRedisCommand(command, key); if (ret != 0) { - LOG(WARNING) << "send redis command:" << binlog_item.ToString() << ", ret:" << ret; + LOG(WARNING) << "send redis command:" << command << ", ret:" << ret; } return true; } + From bdf90e81dce4cb49650f1a0d801e1c4e691d692f Mon Sep 17 00:00:00 2001 From: baerwang <52104949+baerwang@users.noreply.github.com> Date: Tue, 7 Nov 2023 15:57:41 +0800 Subject: [PATCH 6/8] chore: om cache (#2105) --- .github/workflows/pika.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index 1c66e343b8..c36645cb17 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -52,12 +52,10 @@ jobs: with: key: ${{ runner.os }}-build-ubuntu-${{ hashFiles('**/CMakeLists.txt') }} path: | - ${{ github.workspace }}/build ${{ github.workspace }}/buildtrees ${{ github.workspace }}/deps - name: Build - if: ${{ steps.cache-ubuntu.outputs.cache-hit != 'true' }} # Build your program with the given configuration run: cmake --build build --config ${{ env.BUILD_TYPE }} @@ -128,7 +126,6 @@ jobs: with: key: ${{ runner.os }}-build-centos-${{ hashFiles('**/CMakeLists.txt') }} path: | - ${{ github.workspace }}/build ${{ github.workspace }}/buildtrees ${{ github.workspace }}/deps @@ -187,7 +184,6 @@ jobs: with: key: ${{ runner.os }}-build-macos-${{ hashFiles('**/CMakeLists.txt') }} path: | - ${{ github.workspace }}/build ${{ github.workspace }}/buildtrees ${{ github.workspace }}/deps From 000cb2478de223b78a91c21c79e0b7264c0d7e11 Mon Sep 17 00:00:00 2001 From: baerwang <52104949+baerwang@users.noreply.github.com> Date: Wed, 8 Nov 2023 15:44:27 +0800 Subject: [PATCH 7/8] chore: clean cache (#2107) --- .github/workflows/pika.yml | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index c36645cb17..30e2ea8ad4 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -120,17 +120,7 @@ jobs: source /opt/rh/devtoolset-10/enable cmake -B build -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DUSE_PIKA_TOOLS=ON -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address - - name: Cache Build - uses: actions/cache@v3 - id: cache-centos - with: - key: ${{ runner.os }}-build-centos-${{ hashFiles('**/CMakeLists.txt') }} - path: | - ${{ github.workspace }}/buildtrees - ${{ github.workspace }}/deps - - name: Build - if: ${{ steps.cache-centos.outputs.cache-hit != 'true' }} run: | source /opt/rh/devtoolset-10/enable cmake --build build --config ${{ env.BUILD_TYPE }} @@ -178,17 +168,7 @@ jobs: export CC=/usr/local/opt/gcc@10/bin/gcc-10 cmake -B build -DCMAKE_C_COMPILER=/usr/local/opt/gcc@10/bin/gcc-10 -DUSE_PIKA_TOOLS=ON -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} -DCMAKE_CXX_FLAGS_DEBUG=-fsanitize=address - - name: Cache Build - uses: actions/cache@v3 - id: cache-macos - with: - key: ${{ runner.os }}-build-macos-${{ hashFiles('**/CMakeLists.txt') }} - path: | - ${{ github.workspace }}/buildtrees - ${{ github.workspace }}/deps - - name: Build - if: ${{ steps.cache-macos.outputs.cache-hit != 'true' }} run: | cmake --build build --config ${{ env.BUILD_TYPE }} From 8ed1123843f0d973af41da04a300fc8aee40cb53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=81=E5=B0=8F=E5=B8=85?= <56024577+dingxiaoshuai123@users.noreply.github.com> Date: Wed, 8 Nov 2023 17:56:49 +0800 Subject: [PATCH 8/8] Dynamic modification of configuration files&&Slow log (#2103) * Dynamic modification of configuration files.Check the correctness of the functionality. * Modified some specification issues. * Dynamically modify Codis configuration file and add slow query logs. --- codis/cmd/proxy/main.go | 2 + codis/config/proxy.toml | 6 + codis/pkg/proxy/config.go | 18 ++ codis/pkg/proxy/mapper.go | 30 +++ codis/pkg/proxy/proxy.go | 347 +++++++++++++++++++++-------------- codis/pkg/proxy/session.go | 117 +++++++++++- codis/pkg/proxy/slowlog.go | 173 +++++++++++++++++ codis/pkg/utils/configAux.go | 271 +++++++++++++++++++++++++++ 8 files changed, 821 insertions(+), 143 deletions(-) create mode 100644 codis/pkg/proxy/slowlog.go create mode 100644 codis/pkg/utils/configAux.go diff --git a/codis/cmd/proxy/main.go b/codis/cmd/proxy/main.go index 92ebe47e87..8d7da2381f 100644 --- a/codis/cmd/proxy/main.go +++ b/codis/cmd/proxy/main.go @@ -112,6 +112,8 @@ Options: if err := config.LoadFromFile(s); err != nil { log.PanicErrorf(err, "load config %s failed", s) } + config.ConfigFileName = s + log.Warnf("option --config = %s", s) } models.SetMaxSlotNum(config.MaxSlotNum) if s, ok := utils.Argument(d, "--host-admin"); ok { diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index 5aa6e18a73..e69bc02f4a 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -99,6 +99,12 @@ session_keepalive_period = "75s" # Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client. session_break_on_failure = false +# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log +slowlog_log_slower_than = 100000 + +# set the number of slowlog in memory, max len is 10000000. (0 to disable) +slowlog_max_len = 128000 + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index e101df44b6..c475443dc0 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -115,6 +115,12 @@ session_keepalive_period = "75s" # Set session to be sensitive to failures. Default is false, instead of closing socket, proxy will send an error response to client. session_break_on_failure = false +# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log +slowlog_log_slower_than = 100000 + +# set the number of slowlog in memory, max len is 10000000. (0 to disable) +slowlog_max_len = 128000 + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" @@ -176,6 +182,9 @@ type Config struct { SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"` SessionBreakOnFailure bool `toml:"session_break_on_failure" json:"session_break_on_failure"` + SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"` + SlowlogMaxLen int64 `toml:"slowlog_max_len" json:"slowlog_max_len"` + MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"` MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"` MetricsReportInfluxdbServer string `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"` @@ -186,6 +195,7 @@ type Config struct { MetricsReportStatsdServer string `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"` MetricsReportStatsdPeriod timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"` MetricsReportStatsdPrefix string `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"` + ConfigFileName string `toml:"-" json:"config_file_name"` } func NewDefaultConfig() *Config { @@ -302,6 +312,13 @@ func (c *Config) Validate() error { return errors.New("invalid session_keepalive_period") } + if c.SlowlogLogSlowerThan < 0 { + return errors.New("invalid slowlog_log_slower_than") + } + if c.SlowlogMaxLen < 0 { + return errors.New("invalid slowlog_max_len") + } + if c.MetricsReportPeriod < 0 { return errors.New("invalid metrics_report_period") } @@ -311,5 +328,6 @@ func (c *Config) Validate() error { if c.MetricsReportStatsdPeriod < 0 { return errors.New("invalid metrics_report_statsd_period") } + return nil } diff --git a/codis/pkg/proxy/mapper.go b/codis/pkg/proxy/mapper.go index 365ff642d3..3095c5548a 100644 --- a/codis/pkg/proxy/mapper.go +++ b/codis/pkg/proxy/mapper.go @@ -6,6 +6,7 @@ package proxy import ( "bytes" "hash/crc32" + "strconv" "strings" "pika/codis/v2/pkg/proxy/redis" @@ -228,6 +229,8 @@ func init() { {"SUNION", 0}, {"SUNIONSTORE", FlagWrite}, {"SYNC", FlagNotAllow}, + {"PCONFIG", 0}, + {"PSLOWLOG", 0}, {"TIME", FlagNotAllow}, {"TOUCH", FlagWrite}, {"TTL", 0}, @@ -318,3 +321,30 @@ func getHashKey(multi []*redis.Resp, opstr string) []byte { } return nil } + +func getWholeCmd(multi []*redis.Resp, cmd []byte) int { + var ( + index = 0 + bytes = 0 + ) + for i := 0; i < len(multi); i++ { + if index < len(cmd) { + index += copy(cmd[index:], multi[i].Value) + if i < len(multi)-i { + index += copy(cmd[index:], []byte(" ")) + } + } + bytes += len(multi[i].Value) + + if i == len(multi)-1 && index == len(cmd) { + more := []byte("... " + strconv.Itoa(len(multi)) + " elements " + strconv.Itoa(bytes) + " bytes.") + index = len(cmd) - len(more) + if index < 0 { + index = 0 + } + index += copy(cmd[index:], more) + break + } + } + return index +} diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index d830182286..c951694bfb 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -11,11 +11,13 @@ import ( "os/exec" "path/filepath" "runtime" + "strconv" "strings" "sync" "time" "pika/codis/v2/pkg/models" + "pika/codis/v2/pkg/proxy/redis" "pika/codis/v2/pkg/utils" "pika/codis/v2/pkg/utils/errors" "pika/codis/v2/pkg/utils/log" @@ -60,82 +62,82 @@ func New(config *Config) (*Proxy, error) { return nil, errors.Trace(err) } - s := &Proxy{} - s.config = config - s.exit.C = make(chan struct{}) - s.router = NewRouter(config) - s.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64()) + p := &Proxy{} + p.config = config + p.exit.C = make(chan struct{}) + p.router = NewRouter(config) + p.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64()) - s.model = &models.Proxy{ + p.model = &models.Proxy{ StartTime: time.Now().String(), } - s.model.ProductName = config.ProductName - s.model.DataCenter = config.ProxyDataCenter - s.model.Pid = os.Getpid() - s.model.Pwd, _ = os.Getwd() + p.model.ProductName = config.ProductName + p.model.DataCenter = config.ProxyDataCenter + p.model.Pid = os.Getpid() + p.model.Pwd, _ = os.Getwd() if b, err := exec.Command("uname", "-a").Output(); err != nil { log.WarnErrorf(err, "run command uname failed") } else { - s.model.Sys = strings.TrimSpace(string(b)) + p.model.Sys = strings.TrimSpace(string(b)) } - s.model.Hostname = utils.Hostname + p.model.Hostname = utils.Hostname - if err := s.setup(config); err != nil { - s.Close() + if err := p.setup(config); err != nil { + p.Close() return nil, err } - log.Warnf("[%p] create new proxy:\n%s", s, s.model.Encode()) + log.Warnf("[%p] create new proxy:\n%s", p, p.model.Encode()) unsafe2.SetMaxOffheapBytes(config.ProxyMaxOffheapBytes.Int64()) - go s.serveAdmin() - go s.serveProxy() + go p.serveAdmin() + go p.serveProxy() - s.startMetricsJson() - s.startMetricsInfluxdb() - s.startMetricsStatsd() + p.startMetricsJson() + p.startMetricsInfluxdb() + p.startMetricsStatsd() - return s, nil + return p, nil } -func (s *Proxy) setup(config *Config) error { +func (p *Proxy) setup(config *Config) error { proto := config.ProtoType if l, err := net.Listen(proto, config.ProxyAddr); err != nil { return errors.Trace(err) } else { - s.lproxy = l + p.lproxy = l x, err := utils.ReplaceUnspecifiedIP(proto, l.Addr().String(), config.HostProxy) if err != nil { return err } - s.model.ProtoType = proto - s.model.ProxyAddr = x + p.model.ProtoType = proto + p.model.ProxyAddr = x } proto = "tcp" if l, err := net.Listen(proto, config.AdminAddr); err != nil { return errors.Trace(err) } else { - s.ladmin = l + p.ladmin = l x, err := utils.ReplaceUnspecifiedIP(proto, l.Addr().String(), config.HostAdmin) if err != nil { return err } - s.model.AdminAddr = x + p.model.AdminAddr = x } - s.model.Token = rpc.NewToken( + p.model.Token = rpc.NewToken( config.ProductName, - s.lproxy.Addr().String(), - s.ladmin.Addr().String(), + p.lproxy.Addr().String(), + p.ladmin.Addr().String(), ) - s.xauth = rpc.NewXAuth( + p.xauth = rpc.NewXAuth( config.ProductName, config.ProductAuth, - s.model.Token, + p.model.Token, ) if config.JodisAddr != "" { @@ -144,171 +146,242 @@ func (s *Proxy) setup(config *Config) error { return err } if config.JodisCompatible { - s.model.JodisPath = filepath.Join("/zk/codis", fmt.Sprintf("db_%s", config.ProductName), "proxy", s.model.Token) + p.model.JodisPath = filepath.Join("/zk/codis", fmt.Sprintf("db_%s", config.ProductName), "proxy", p.model.Token) } else { - s.model.JodisPath = models.JodisPath(config.ProductName, s.model.Token) + p.model.JodisPath = models.JodisPath(config.ProductName, p.model.Token) } - s.jodis = NewJodis(c, s.model) + p.jodis = NewJodis(c, p.model) } - s.model.MaxSlotNum = config.MaxSlotNum + p.model.MaxSlotNum = config.MaxSlotNum return nil } -func (s *Proxy) Start() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) Start() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - if s.online { + if p.online { return nil } - s.online = true - s.router.Start() - if s.jodis != nil { - s.jodis.Start() + p.online = true + p.router.Start() + if p.jodis != nil { + p.jodis.Start() } return nil } -func (s *Proxy) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return nil } - s.closed = true - close(s.exit.C) + p.closed = true + close(p.exit.C) - if s.jodis != nil { - s.jodis.Close() + if p.jodis != nil { + p.jodis.Close() } - if s.ladmin != nil { - s.ladmin.Close() + if p.ladmin != nil { + p.ladmin.Close() } - if s.lproxy != nil { - s.lproxy.Close() + if p.lproxy != nil { + p.lproxy.Close() } - if s.router != nil { - s.router.Close() + if p.router != nil { + p.router.Close() } return nil } -func (s *Proxy) XAuth() string { - return s.xauth +func (p *Proxy) XAuth() string { + return p.xauth } -func (s *Proxy) Model() *models.Proxy { - return s.model +func (p *Proxy) Model() *models.Proxy { + return p.model } -func (s *Proxy) Config() *Config { - return s.config +func (p *Proxy) Config() *Config { + p.mu.Lock() + defer p.mu.Unlock() + return p.config } -func (s *Proxy) IsOnline() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.online && !s.closed +func (p *Proxy) ConfigGet(key string) *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() + switch key { + case "proxy_max_clients": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.ProxyMaxClients))) + case "backend_primary_only": + return redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.BackendPrimaryOnly))) + case "slowlog_log_slower_than": + return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogLogSlowerThan, 10))) + case "slowlog_max_len": + return redis.NewBulkBytes([]byte(strconv.FormatInt(p.config.SlowlogMaxLen, 10))) + default: + return redis.NewErrorf("unsupported key: %s", key) + } +} + +func (p *Proxy) ConfigSet(key, value string) *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() + switch key { + case "proxy_max_clients": + n, err := strconv.Atoi(value) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + if n <= 0 { + return redis.NewErrorf("invalid proxy_max_clients") + } + p.config.ProxyMaxClients = n + return redis.NewString([]byte("OK")) + case "backend_primary_only": + return redis.NewErrorf("not currently supported") + case "slowlog_log_slower_than": + n, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + if n < 0 { + return redis.NewErrorf("invalid slowlog_log_slower_than") + } + p.config.SlowlogLogSlowerThan = n + return redis.NewString([]byte("OK")) + case "slowlog_max_len": + n, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return redis.NewErrorf("err:%s", err) + } + + if n < 0 { + return redis.NewErrorf("invalid slowlog_max_len") + } + p.config.SlowlogMaxLen = n + if p.config.SlowlogMaxLen > 0 { + SlowLogSetMaxLen(p.config.SlowlogMaxLen) + } + return redis.NewString([]byte("OK")) + default: + return redis.NewErrorf("unsupported key: %s", key) + } +} + +func (p *Proxy) ConfigRewrite() *redis.Resp { + p.mu.Lock() + defer p.mu.Unlock() + utils.RewriteConfig(*(p.config), p.config.ConfigFileName, "=", true) + return redis.NewString([]byte("OK")) +} + +func (p *Proxy) IsOnline() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.online && !p.closed } -func (s *Proxy) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.closed +func (p *Proxy) IsClosed() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.closed } -func (s *Proxy) HasSwitched() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.router.HasSwitched() +func (p *Proxy) HasSwitched() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.router.HasSwitched() } -func (s *Proxy) Slots() []*models.Slot { - s.mu.Lock() - defer s.mu.Unlock() - return s.router.GetSlots() +func (p *Proxy) Slots() []*models.Slot { + p.mu.Lock() + defer p.mu.Unlock() + return p.router.GetSlots() } -func (s *Proxy) FillSlot(m *models.Slot) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) FillSlot(m *models.Slot) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - return s.router.FillSlot(m) + return p.router.FillSlot(m) } -func (s *Proxy) FillSlots(slots []*models.Slot) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) FillSlots(slots []*models.Slot) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } for _, m := range slots { - if err := s.router.FillSlot(m); err != nil { + if err := p.router.FillSlot(m); err != nil { return err } } return nil } -func (s *Proxy) SwitchMasters(masters map[int]string) error { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) SwitchMasters(masters map[int]string) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return ErrClosedProxy } - s.ha.masters = masters + p.ha.masters = masters if len(masters) != 0 { - s.router.SwitchMasters(masters) + p.router.SwitchMasters(masters) } return nil } -func (s *Proxy) GetSentinels() ([]string, map[int]string) { - s.mu.Lock() - defer s.mu.Unlock() - if s.closed { +func (p *Proxy) GetSentinels() ([]string, map[int]string) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { return nil, nil } - return s.ha.servers, s.ha.masters + return p.ha.servers, p.ha.masters } -func (s *Proxy) serveAdmin() { - if s.IsClosed() { +func (p *Proxy) serveAdmin() { + if p.IsClosed() { return } - defer s.Close() + defer p.Close() - log.Warnf("[%p] admin start service on %s", s, s.ladmin.Addr()) + log.Warnf("[%p] admin start service on %s", p, p.ladmin.Addr()) eh := make(chan error, 1) go func(l net.Listener) { h := http.NewServeMux() - h.Handle("/", newApiServer(s)) + h.Handle("/", newApiServer(p)) hs := &http.Server{Handler: h} eh <- hs.Serve(l) - }(s.ladmin) + }(p.ladmin) select { - case <-s.exit.C: - log.Warnf("[%p] admin shutdown", s) + case <-p.exit.C: + log.Warnf("[%p] admin shutdown", p) case err := <-eh: - log.ErrorErrorf(err, "[%p] admin exit on error", s) + log.ErrorErrorf(err, "[%p] admin exit on error", p) } } -func (s *Proxy) serveProxy() { - if s.IsClosed() { +func (p *Proxy) serveProxy() { + if p.IsClosed() { return } - defer s.Close() + defer p.Close() - log.Warnf("[%p] proxy start service on %s", s, s.lproxy.Addr()) + log.Warnf("[%p] proxy start service on %s", p, p.lproxy.Addr()) eh := make(chan error, 1) go func(l net.Listener) (err error) { @@ -316,40 +389,40 @@ func (s *Proxy) serveProxy() { eh <- err }() for { - c, err := s.acceptConn(l) + c, err := p.acceptConn(l) if err != nil { return err } - NewSession(c, s.config).Start(s.router) + NewSession(c, p.config, p).Start(p.router) } - }(s.lproxy) + }(p.lproxy) - if d := s.config.BackendPingPeriod.Duration(); d != 0 { - go s.keepAlive(d) + if d := p.config.BackendPingPeriod.Duration(); d != 0 { + go p.keepAlive(d) } select { - case <-s.exit.C: - log.Warnf("[%p] proxy shutdown", s) + case <-p.exit.C: + log.Warnf("[%p] proxy shutdown", p) case err := <-eh: - log.ErrorErrorf(err, "[%p] proxy exit on error", s) + log.ErrorErrorf(err, "[%p] proxy exit on error", p) } } -func (s *Proxy) keepAlive(d time.Duration) { +func (p *Proxy) keepAlive(d time.Duration) { var ticker = time.NewTicker(math2.MaxDuration(d, time.Second)) defer ticker.Stop() for { select { - case <-s.exit.C: + case <-p.exit.C: return case <-ticker.C: - s.router.KeepAlive() + p.router.KeepAlive() } } } -func (s *Proxy) acceptConn(l net.Listener) (net.Conn, error) { +func (p *Proxy) acceptConn(l net.Listener) (net.Conn, error) { var delay = &DelayExp2{ Min: 10, Max: 500, Unit: time.Millisecond, @@ -358,7 +431,7 @@ func (s *Proxy) acceptConn(l net.Listener) (net.Conn, error) { c, err := l.Accept() if err != nil { if e, ok := err.(net.Error); ok && e.Temporary() { - log.WarnErrorf(err, "[%p] proxy accept new connection failed", s) + log.WarnErrorf(err, "[%p] proxy accept new connection failed", p) delay.Sleep() continue } @@ -458,24 +531,24 @@ const ( StatsFull = StatsFlags(^uint32(0)) ) -func (s *Proxy) Overview(flags StatsFlags) *Overview { +func (p *Proxy) Overview(flags StatsFlags) *Overview { o := &Overview{ Version: utils.Version, Compile: utils.Compile, - Config: s.Config(), - Model: s.Model(), - Stats: s.Stats(flags), + Config: p.Config(), + Model: p.Model(), + Stats: p.Stats(flags), } if flags.HasBit(StatsSlots) { - o.Slots = s.Slots() + o.Slots = p.Slots() } return o } -func (s *Proxy) Stats(flags StatsFlags) *Stats { +func (p *Proxy) Stats(flags StatsFlags) *Stats { stats := &Stats{} - stats.Online = s.IsOnline() - stats.Closed = s.IsClosed() + stats.Online = p.IsOnline() + stats.Closed = p.IsClosed() stats.Ops.Total = OpTotal() stats.Ops.Fails = OpFails() @@ -496,7 +569,7 @@ func (s *Proxy) Stats(flags StatsFlags) *Stats { stats.Rusage.Raw = u.Usage } - stats.Backend.PrimaryOnly = s.Config().BackendPrimaryOnly + stats.Backend.PrimaryOnly = p.Config().BackendPrimaryOnly if flags.HasBit(StatsRuntime) { var r runtime.MemStats diff --git a/codis/pkg/proxy/session.go b/codis/pkg/proxy/session.go index 547a13d21f..188a62ae8c 100644 --- a/codis/pkg/proxy/session.go +++ b/codis/pkg/proxy/session.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "strconv" + "strings" "sync" "time" @@ -44,6 +45,7 @@ type Session struct { broken atomic2.Bool config *Config + proxy *Proxy authorized bool } @@ -62,7 +64,7 @@ func (s *Session) String() string { return string(b) } -func NewSession(sock net.Conn, config *Config) *Session { +func NewSession(sock net.Conn, config *Config, proxy *Proxy) *Session { c := redis.NewConn(sock, config.SessionRecvBufsize.AsInt(), config.SessionSendBufsize.AsInt(), @@ -72,7 +74,7 @@ func NewSession(sock net.Conn, config *Config) *Session { c.SetKeepAlivePeriod(config.SessionKeepAlivePeriod.Duration()) s := &Session{ - Conn: c, config: config, + Conn: c, config: config, proxy: proxy, CreateUnix: time.Now().Unix(), } s.stats.opmap = make(map[string]*opStats, 16) @@ -206,11 +208,11 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { }) s.flushOpStats(true) }() - var ( breakOnFailure = s.config.SessionBreakOnFailure maxPipelineLen = s.config.SessionMaxPipeline ) + var cmd = make([]byte, 128) p := s.Conn.FlushEncoder() p.MaxInterval = time.Millisecond @@ -239,7 +241,7 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { } nowTime := time.Now().UnixNano() duration := int64((nowTime - r.ReceiveTime) / 1e3) - if duration >= 50000 { + if duration >= s.config.SlowlogLogSlowerThan { //client -> proxy -> server -> porxy -> client //Record the waiting time from receiving the request from the client to sending it to the backend server //the waiting time from sending the request to the backend server to receiving the response from the server @@ -254,8 +256,13 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) { if r.ReceiveFromServerTime > 0 { d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3) } - log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d", - time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen) + index := getWholeCmd(r.Multi, cmd) + cmdLog := fmt.Sprintf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].", + time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index])) + log.Warnf("%s", cmdLog) + if s.config.SlowlogMaxLen > 0 { + SlowLogPush(&SlowLogEntry{SlowLogGetCurLogId(), r.ReceiveTime / 1e3, duration, cmdLog}) + } } return nil }) @@ -319,6 +326,10 @@ func (s *Session) handleRequest(r *Request, d *Router) error { return s.handleRequestDel(r, d) case "EXISTS": return s.handleRequestExists(r, d) + case "PCONFIG": + return s.handlePConfig(r) + case "PSLOWLOG": + return s.handlePSlowLog(r) case "SLOTSINFO": return s.handleRequestSlotsInfo(r, d) case "SLOTSSCAN": @@ -714,3 +725,97 @@ func (s *Session) flushOpStats(force bool) { s.stats.opmap = make(map[string]*opStats, 32) } } + +func (s *Session) handlePSlowLog(r *Request) error { + if len(r.Multi) < 2 || len(r.Multi) > 4 { + r.Resp = redis.NewErrorf("ERR slowLog parameters") + return nil + } + var subCmd = strings.ToUpper(string(r.Multi[1].Value)) + switch subCmd { + case "GET": + if len(r.Multi) == 3 { + num, err := strconv.ParseInt(string(r.Multi[2].Value), 10, 64) + if err != nil { + r.Resp = redis.NewErrorf("ERR invalid slowLog number") + break + } + + r.Resp = SlowLogGetByNum(num) + } else if len(r.Multi) == 4 { + var ( + id int64 + num int64 + err error + ) + id, err = strconv.ParseInt(string(r.Multi[2].Value), 10, 64) + if err != nil { + r.Resp = redis.NewErrorf("ERR invalid slowLog start logId") + break + } + num, err = strconv.ParseInt(string(r.Multi[3].Value), 10, 64) + if err != nil { + r.Resp = redis.NewErrorf("ERR invalid slowLog number") + break + } + + r.Resp = SlowLogGetByIdAndNUm(id, num) + } else { + r.Resp = SlowLogGetByNum(10) + } + case "LEN": + if len(r.Multi) == 2 { + r.Resp = SlowLogGetLen() + } else { + r.Resp = redis.NewErrorf("ERR slowLog parameters") + } + case "RESET": + if len(r.Multi) == 2 { + r.Resp = SlowLogReset() + } else { + r.Resp = redis.NewErrorf("ERR slowLog parameters") + } + default: + r.Resp = redis.NewErrorf("ERR Unknown SLOWLOG subcommand or wrong args. Try GET, RESET, LEN.") + } + return nil +} + +func (s *Session) handlePConfig(r *Request) error { + if len(r.Multi) < 2 || len(r.Multi) > 4 { + r.Resp = redis.NewErrorf("ERR config parameters") + return nil + } + + var subCmd = strings.ToUpper(string(r.Multi[1].Value)) + switch subCmd { + case "GET": + if len(r.Multi) == 3 { + key := strings.ToLower(string(r.Multi[2].Value)) + r.Resp = s.proxy.ConfigGet(key) + } else { + r.Resp = redis.NewErrorf("ERR config get parameters.") + } + case "SET": + if len(r.Multi) == 3 { + key := strings.ToLower(string(r.Multi[2].Value)) + value := "" + r.Resp = s.proxy.ConfigSet(key, value) + } else if len(r.Multi) == 4 { + key := strings.ToLower(string(r.Multi[2].Value)) + value := string(r.Multi[3].Value) + r.Resp = s.proxy.ConfigSet(key, value) + } else { + r.Resp = redis.NewErrorf("ERR config set parameters.") + } + case "REWRITE": + if len(r.Multi) == 2 { + r.Resp = s.proxy.ConfigRewrite() + } else { + r.Resp = redis.NewErrorf("ERR config rewrite parameters") + } + default: + r.Resp = redis.NewErrorf("ERR Unknown CONFIG subcommand or wrong args. Try GET, SET, REWRITE.") + } + return nil +} diff --git a/codis/pkg/proxy/slowlog.go b/codis/pkg/proxy/slowlog.go new file mode 100644 index 0000000000..79c131c546 --- /dev/null +++ b/codis/pkg/proxy/slowlog.go @@ -0,0 +1,173 @@ +package proxy + +import ( + "container/list" + "strconv" + "sync" + + "pika/codis/v2/pkg/proxy/redis" + "pika/codis/v2/pkg/utils/log" + "pika/codis/v2/pkg/utils/sync2/atomic2" +) + +const ( + PIKA_SLOWLOG_LENGTH_DEFAULT = 128000 + PIKA_SLOWLOG_LENGTH_MAX = 10000000 +) + +type SlowLogEntry struct { + id int64 + time int64 + duration int64 + cmd string +} + +type SlowLog struct { + sync.Mutex + logList *list.List + logId atomic2.Int64 + maxLen atomic2.Int64 +} + +var PSlowLog = &SlowLog{} + +func init() { + PSlowLog.logList = list.New() + PSlowLog.logId.Swap(0) + PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_DEFAULT) +} + +func SlowLogSetMaxLen(len int64) { + if len < 0 { + PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_DEFAULT) + } else if len > PIKA_SLOWLOG_LENGTH_MAX { + PSlowLog.maxLen.Swap(PIKA_SLOWLOG_LENGTH_MAX) + } else { + PSlowLog.maxLen.Swap(len) + } +} + +func SlowLogGetCurLogId() int64 { + return PSlowLog.logId.Incr() +} + +func SlowLogPush(entry *SlowLogEntry) { + if entry == nil || PSlowLog.maxLen <= 0 { + return + } + if PSlowLog.TryLock() { + defer PSlowLog.Unlock() + PSlowLog.logList.PushFront(entry) // push a ptr + for int64(PSlowLog.logList.Len()) > PSlowLog.maxLen.Int64() { + PSlowLog.logList.Remove(PSlowLog.logList.Back()) + } + } else { + log.Warnf("cant get slowlog lock, logid: %d, cmd: %s", entry.id, entry.cmd) + } +} + +func SlowLogGetLen() *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + return redis.NewString([]byte(strconv.Itoa(PSlowLog.logList.Len()))) +} + +func SlowLogReset() *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + PSlowLog.logId.Swap(0) + PSlowLog.logList.Init() + return redis.NewString([]byte("OK")) +} + +func SlowLogToResp(entry *SlowLogEntry) *redis.Resp { + if entry == nil { + return redis.NewArray(make([]*redis.Resp, 0)) + } + return redis.NewArray([]*redis.Resp{ + redis.NewInt([]byte(strconv.FormatInt(entry.id, 10))), + redis.NewInt([]byte(strconv.FormatInt(entry.time, 10))), + redis.NewInt([]byte(strconv.FormatInt(entry.duration, 10))), + redis.NewArray([]*redis.Resp{ + redis.NewBulkBytes([]byte(entry.cmd)), + }), + }) +} + +func SlowLogGetByNum(num int64) *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + if num <= 0 { + return redis.NewArray(make([]*redis.Resp, 0)) + } else if num > int64(PSlowLog.logList.Len()) { + num = int64(PSlowLog.logList.Len()) + } + var res = make([]*redis.Resp, 0, num) + var iter = PSlowLog.logList.Front() + for i := int64(0); i < num; i++ { + if iter == nil || iter.Value == nil { + break + } + if entry, ok := iter.Value.(*SlowLogEntry); ok { + res = append(res, SlowLogToResp(entry)) + } else { + log.Warnf("slowLogGet cont parse iter.Value[%v] to slowLogEntry.", iter.Value) + } + iter = iter.Next() + } + return redis.NewArray(res) +} + +func SlowLogGetByIdAndNUm(id, num int64) *redis.Resp { + PSlowLog.Lock() + defer PSlowLog.Unlock() + + var smallestID int64 + var oldestNode = PSlowLog.logList.Back() + if oldestNode == nil || oldestNode.Value == nil { + log.Warnf("slowlogGet oldestNode or oldestNode.Value == nil, oldestNode: %v", oldestNode) + return redis.NewArray(make([]*redis.Resp, 0)) + } + + if entry, ok := oldestNode.Value.(*SlowLogEntry); ok { + smallestID = entry.id + } else { + log.Warnf("slowlogGet cont parse oldestNode.Value[%v] to slowlogEntry.", oldestNode.Value) + } + if id < smallestID || num < 0 { + return redis.NewArray(make([]*redis.Resp, 0)) + } + if num > id-smallestID+1 { + num = id - smallestID + 1 + } + + if num > int64(PSlowLog.logList.Len()) { + num = int64(PSlowLog.logList.Len()) + } + + var res = make([]*redis.Resp, num) + var iter = PSlowLog.logList.Front() + + for ; iter != nil && iter.Value != nil; iter = iter.Next() { + if entry, ok := iter.Value.(*SlowLogEntry); ok { + if id >= entry.id { + break + } + } else { + log.Warnf("slowlogGet cont parse iter.Value[%v] to slowlogEntry.", iter.Value) + } + } + + for i := int64(0); i < num; i++ { + if iter == nil || iter.Value == nil { + break + } + if entry, ok := iter.Value.(*SlowLogEntry); ok { + res = append(res, SlowLogToResp(entry)) + } else { + log.Warnf("slowlogGet cont parse iter.Value[%v] to slowlogEntry.", iter.Value) + } + iter = iter.Next() + } + return redis.NewArray(res) +} diff --git a/codis/pkg/utils/configAux.go b/codis/pkg/utils/configAux.go new file mode 100644 index 0000000000..7dc68286ad --- /dev/null +++ b/codis/pkg/utils/configAux.go @@ -0,0 +1,271 @@ +package utils + +import ( + "bufio" + "fmt" + "io" + "os" + "reflect" + "strconv" + "strings" + + "pika/codis/v2/pkg/utils/bytesize" + "pika/codis/v2/pkg/utils/errors" + "pika/codis/v2/pkg/utils/log" + "pika/codis/v2/pkg/utils/timesize" +) + +const ( + TypeConf = iota + TypeComment +) + +type ConfItem struct { + confType int // 0 means conf, 1 means comment + name string + value string +} + +type DeployConfig struct { + items []*ConfItem + confMap map[string]*ConfItem + sep string +} + +func (c *DeployConfig) Init(path string, sep string) error { + c.confMap = make(map[string]*ConfItem) + c.sep = sep + + f, err := os.Open(path) + if err != nil { + return err + } + defer func(f *os.File) { + err := f.Close() + if err != nil { + log.WarnErrorf(err, "Close %s failed.\n", path) + } + }(f) + + r := bufio.NewReader(f) + for { + b, _, err := r.ReadLine() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + line := strings.TrimSpace(string(b)) + + item := &ConfItem{} + if strings.Index(line, "#") == 0 || len(line) == 0 { + item.confType = TypeComment + item.name = line + c.items = append(c.items, item) + continue + } + index := strings.Index(line, sep) + if index <= 0 { + continue + } + key := strings.TrimSpace(line[:index]) + value := strings.TrimSpace(line[index+1:]) + if len(key) == 0 { + continue + } + item.confType = TypeConf + item.name = key + item.value = value + c.items = append(c.items, item) + c.confMap[item.name] = item + } +} + +func (c *DeployConfig) Reset(conf interface{}, isWrap bool) { + obj := reflect.ValueOf(conf) + + for i := 0; i < obj.NumField(); i++ { + fieldInfo := obj.Type().Field(i) + name := fieldInfo.Tag.Get("toml") + if name == "" || name == "-" { + continue + } + var value string + switch v := obj.Field(i).Interface().(type) { + case string: + value = strings.Trim(strings.TrimSpace(v), "\"") + if value == "" { + continue + } + if isWrap { + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set string with wrap failed!") + } + } else { + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set string without wrap failed!") + } + } + case int: + value = strconv.Itoa(v) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int failed!") + } + case int32: + value = strconv.FormatInt(int64(v), 10) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int32 failed!") + } + + case int64: + value = strconv.FormatInt(v, 10) + err := c.Set(name, value) + if err != nil { + log.WarnErrorf(err, "Set int64 failed!") + } + case bool: + if v { + err := c.Set(name, "true") + if err != nil { + log.WarnErrorf(err, "Set bool value failed!") + } + } else { + err := c.Set(name, "false") + if err != nil { + log.WarnErrorf(err, "Set bool value failed!") + } + } + case timesize.Duration: + if ret, err := v.MarshalText(); err != nil { + log.WarnErrorf(err, "config set %s failed.\n", name) + } else { + value = string(ret[:]) + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set timesize failed!") + } + } + + case bytesize.Int64: + if ret, err := v.MarshalText(); err != nil { + log.WarnErrorf(err, "config set %s failed.\n", name) + } else { + value = string(ret[:]) + err := c.Set(name, "\""+value+"\"") + if err != nil { + log.WarnErrorf(err, "Set bytesize failed!") + } + } + + default: + log.Warnf("value error: %v\n", v) + continue + } + } + +} + +func (c *DeployConfig) Set(key string, value string) error { + key = strings.TrimSpace(key) + value = strings.TrimSpace(value) + + log.Infof("Set key : %s, value: %s\n", key, value) + + if len(key) == 0 || len(value) == 0 { + return errors.New("key or value is null") + } + + item, found := c.confMap[key] + if found { + item.value = value + } else { + item := &ConfItem{ + confType: TypeConf, + name: key, + value: value, + } + c.items = append(c.items, item) + c.confMap[item.name] = item + } + return nil +} + +func (c *DeployConfig) Get(key string) string { + item, found := c.confMap[key] + if !found { + return "" + } + return item.value +} + +func (c *DeployConfig) Show() { + log.Infof("Show config, len = %d\n", len(c.items)) + for index, item := range c.items { + if item.confType == TypeComment { + // Comment format: id: context + log.Infof("%d: %s\n", index, item.name) + } else { + // Configuration format: id: key = value or id: key value + if len(strings.TrimSpace(c.sep)) > 0 { + log.Infof("%d: %s %s %s\n", index, item.name, c.sep, item.value) + } else { + log.Infof("%d: %s%s%s\n", index, item.name, c.sep, item.value) + } + } + } +} + +func (c *DeployConfig) ReWrite(confName string) error { + f, err := os.Create(confName) + if err != nil { + log.WarnErrorf(err, "create %s failed.\n", confName) + return err + } + defer func(f *os.File) { + err := f.Close() + if err != nil { + log.WarnErrorf(err, "Close %s failed.\n", confName) + } + }(f) + + w := bufio.NewWriter(f) + var lineStr string + for _, item := range c.items { + if item.confType == TypeComment { + lineStr = fmt.Sprintf("%s", item.name) + } else { + if len(strings.TrimSpace(c.sep)) > 0 { + lineStr = fmt.Sprintf("%s %s %s", item.name, c.sep, item.value) + } else { + lineStr = fmt.Sprintf("%s%s%s", item.name, c.sep, item.value) + } + } + fmt.Fprintln(w, lineStr) + } + return w.Flush() +} + +func RewriteConfig(postConf interface{}, defaultConf string, sep string, isWrap bool) error { + conf := &DeployConfig{} + err := conf.Init(defaultConf, sep) + if err != nil { + log.WarnErrorf(err, "open %s file failed.\n", defaultConf) + return err + } + conf.Reset(postConf, isWrap) + conf.Show() + var newConf = defaultConf + ".tmp" + if err = conf.ReWrite(newConf); err != nil { + return err + } + if err = os.Remove(defaultConf); err != nil { + return err + } + return os.Rename(newConf, defaultConf) +}