Skip to content

Commit

Permalink
feat: move streams implementation to storage layer. (OpenAtomFoundati…
Browse files Browse the repository at this point in the history
…on#2242)

* feat: move stream implememtation to storage with some bugs.

* fix: fix all problem of stream commands

* refactor: adjust stream code structure.

1. remove logic of tree_id generate, the tree will be implemented when supporting
   XGROUP commands.
2. move all the helper functions to redis_stream.h.

* feat: support some basic function of storage layer.

* fix: Add licence.

* fix: compile problem in macos.

* fix: length of stream now using int32_t.

* fix: Support some commands and API of blackwidow.

* KEYS
* Storage::DoCompactRange()
* Storage::PKHScanRange()
* Storage::PKHRSranRange()

* fix: build problem on ubuntu.

* fix: remote key lock.
  • Loading branch information
KKorpse authored Feb 1, 2024
1 parent 76e1530 commit d077c08
Show file tree
Hide file tree
Showing 16 changed files with 2,323 additions and 1,637 deletions.
39 changes: 15 additions & 24 deletions include/pika_stream.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_STREAM_H_
#define PIKA_STREAM_H_

#include "include/acl.h"
#include "include/pika_command.h"
#include "include/pika_stream_base.h"
#include "include/pika_stream_meta_value.h"
#include "include/pika_stream_types.h"
#include "storage/src/redis_streams.h"
#include "storage/storage.h"

/*
* stream
*/

inline void ParseAddOrTrimArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, StreamAddTrimArgs& args, int* idpos,
bool is_xadd);
inline void ParseAddOrTrimArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, storage::StreamAddTrimArgs& args,
int* idpos, bool is_xadd);

inline void ParseReadOrReadGroupArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, StreamReadGroupReadArgs& args,
bool is_xreadgroup);
inline void ParseReadOrReadGroupArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv,
storage::StreamReadGroupReadArgs& args, bool is_xreadgroup);

// @field_values is the result of ScanStream.
// field is the serialized message id,
Expand All @@ -39,11 +38,10 @@ class XAddCmd : public Cmd {

private:
std::string key_;
StreamAddTrimArgs args_;
storage::StreamAddTrimArgs args_;
int field_pos_{0};

void DoInitial() override;
inline void GenerateStreamIDOrReply(const StreamMetaValue& stream_meta);
};

class XDelCmd : public Cmd {
Expand All @@ -58,13 +56,10 @@ class XDelCmd : public Cmd {

private:
std::string key_;
std::vector<streamID> ids_;
std::vector<storage::streamID> ids_;

void DoInitial() override;
void Clear() override { ids_.clear(); }
inline void SetFirstOrLastIDOrReply(StreamMetaValue& stream_meta, const DB* db, bool is_set_first);
inline void SetFirstIDOrReply(StreamMetaValue& stream_meta, const DB* db);
inline void SetLastIDOrReply(StreamMetaValue& stream_meta, const DB* db);
};

class XReadCmd : public Cmd {
Expand All @@ -77,7 +72,7 @@ class XReadCmd : public Cmd {
Cmd* Clone() override { return new XReadCmd(*this); }

private:
StreamReadGroupReadArgs args_;
storage::StreamReadGroupReadArgs args_;

void DoInitial() override;
void Clear() override {
Expand All @@ -97,11 +92,7 @@ class XRangeCmd : public Cmd {

protected:
std::string key_;
streamID start_sid;
streamID end_sid;
int32_t count_{INT32_MAX};
bool start_ex_{false};
bool end_ex_{false};
storage::StreamScanArgs args_;

void DoInitial() override;
};
Expand Down Expand Up @@ -141,7 +132,7 @@ class XTrimCmd : public Cmd {

private:
std::string key_;
StreamAddTrimArgs args_;
storage::StreamAddTrimArgs args_;

void DoInitial() override;
};
Expand Down
255 changes: 0 additions & 255 deletions include/pika_stream_base.h

This file was deleted.

Loading

0 comments on commit d077c08

Please sign in to comment.