Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move streams implementation to storage layer. #2242

Merged
merged 14 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 15 additions & 24 deletions include/pika_stream.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_STREAM_H_
#define PIKA_STREAM_H_

#include "include/acl.h"
#include "include/pika_command.h"
#include "include/pika_stream_base.h"
#include "include/pika_stream_meta_value.h"
#include "include/pika_stream_types.h"
#include "storage/src/redis_streams.h"
#include "storage/storage.h"

/*
* stream
*/

inline void ParseAddOrTrimArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, StreamAddTrimArgs& args, int* idpos,
bool is_xadd);
inline void ParseAddOrTrimArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, storage::StreamAddTrimArgs& args,
int* idpos, bool is_xadd);

inline void ParseReadOrReadGroupArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv, StreamReadGroupReadArgs& args,
bool is_xreadgroup);
inline void ParseReadOrReadGroupArgsOrReply(CmdRes& res, const PikaCmdArgsType& argv,
storage::StreamReadGroupReadArgs& args, bool is_xreadgroup);

// @field_values is the result of ScanStream.
// field is the serialized message id,
Expand All @@ -39,11 +38,10 @@ class XAddCmd : public Cmd {

private:
std::string key_;
StreamAddTrimArgs args_;
storage::StreamAddTrimArgs args_;
int field_pos_{0};

void DoInitial() override;
inline void GenerateStreamIDOrReply(const StreamMetaValue& stream_meta);
};

class XDelCmd : public Cmd {
Expand All @@ -58,13 +56,10 @@ class XDelCmd : public Cmd {

private:
std::string key_;
std::vector<streamID> ids_;
std::vector<storage::streamID> ids_;

void DoInitial() override;
void Clear() override { ids_.clear(); }
inline void SetFirstOrLastIDOrReply(StreamMetaValue& stream_meta, const DB* db, bool is_set_first);
inline void SetFirstIDOrReply(StreamMetaValue& stream_meta, const DB* db);
inline void SetLastIDOrReply(StreamMetaValue& stream_meta, const DB* db);
};

class XReadCmd : public Cmd {
Expand All @@ -77,7 +72,7 @@ class XReadCmd : public Cmd {
Cmd* Clone() override { return new XReadCmd(*this); }

private:
StreamReadGroupReadArgs args_;
storage::StreamReadGroupReadArgs args_;

void DoInitial() override;
void Clear() override {
Expand All @@ -97,11 +92,7 @@ class XRangeCmd : public Cmd {

protected:
std::string key_;
streamID start_sid;
streamID end_sid;
int32_t count_{INT32_MAX};
bool start_ex_{false};
bool end_ex_{false};
storage::StreamScanArgs args_;

void DoInitial() override;
};
Expand Down Expand Up @@ -141,7 +132,7 @@ class XTrimCmd : public Cmd {

private:
std::string key_;
StreamAddTrimArgs args_;
storage::StreamAddTrimArgs args_;

void DoInitial() override;
};
Expand Down
255 changes: 0 additions & 255 deletions include/pika_stream_base.h

This file was deleted.

Loading
Loading