Skip to content

Commit

Permalink
top定义移除req泛型
Browse files Browse the repository at this point in the history
  • Loading branch information
viciousstar committed Dec 14, 2023
1 parent c8f31f1 commit 388cd8c
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 153 deletions.
2 changes: 1 addition & 1 deletion agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use rt::spawn;
use protocol::{Parser, Result};
use stream::{Backend, Request};
type Endpoint = Backend<Request>;
type Topology = endpoint::TopologyProtocol<Endpoint, Request, Parser>;
type Topology = endpoint::TopologyProtocol<Endpoint, Parser>;

// 默认支持
fn main() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use stream::pipeline::copy_bidirectional;
use stream::{Backend, CheckedTopology, Request, StreamMetrics};

type Endpoint = Backend<Request>;
type Topology = endpoint::TopologyProtocol<Endpoint, Request, Parser>;
type Topology = endpoint::TopologyProtocol<Endpoint, Parser>;
// 一直侦听,直到成功侦听或者取消侦听(当前尚未支持取消侦听)
// 1. 尝试侦听之前,先确保服务配置信息已经更新完成
pub(super) async fn process_one(
Expand Down
27 changes: 12 additions & 15 deletions endpoint/src/cacheservice/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::PerformanceTuning;
use protocol::Bit;

#[derive(Clone)]
pub struct CacheService<E, Req, P> {
pub struct CacheService<E, P> {
// 一共有n组,每组1个连接。
// 排列顺序: master, master l1, slave, slave l1
streams: Distance<Shards<E>>,
Expand All @@ -29,10 +29,9 @@ pub struct CacheService<E, Req, P> {

// 保留本设置,非必要场景,减少一次slave访问
backend_no_storage: bool, // true:mc后面没有存储
_marker: std::marker::PhantomData<Req>,
}

impl<E, Req, P> From<P> for CacheService<E, Req, P> {
impl<E, P> From<P> for CacheService<E, P> {
#[inline]
fn from(parser: P) -> Self {
Self {
Expand All @@ -41,13 +40,12 @@ impl<E, Req, P> From<P> for CacheService<E, Req, P> {
exp_sec: 0,
// force_write_all: false, // 兼容考虑默认为false,set master失败后,不更新其他layers,新业务推荐用true
hasher: Default::default(),
_marker: Default::default(),
backend_no_storage: false,
}
}
}

impl<E, Req, P> discovery::Inited for CacheService<E, Req, P>
impl<E, P> discovery::Inited for CacheService<E, P>
where
E: discovery::Inited,
{
Expand All @@ -61,10 +59,9 @@ where
}
}

impl<E, Req, P> Hash for CacheService<E, Req, P>
impl<E, P> Hash for CacheService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
E: Endpoint,
P: Protocol,
{
#[inline]
Expand All @@ -73,7 +70,7 @@ where
}
}

impl<E, Req, P> Topology for CacheService<E, Req, P>
impl<E, Req, P> Topology for CacheService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
Expand All @@ -85,7 +82,7 @@ where
}
}

impl<E, Req, P> Endpoint for CacheService<E, Req, P>
impl<E, Req, P> Endpoint for CacheService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
Expand Down Expand Up @@ -128,7 +125,7 @@ where
unsafe { self.streams.get_unchecked(idx).send(req) };
}
}
impl<E, Req: Request, P: Protocol> CacheService<E, Req, P>
impl<E, Req: Request, P: Protocol> CacheService<E, P>
where
E: Endpoint<Item = Req>,
{
Expand Down Expand Up @@ -195,10 +192,10 @@ where
(idx, try_next, write_back)
}
}
impl<E, Req, P> TopologyWrite for CacheService<E, Req, P>
impl<E, P> TopologyWrite for CacheService<E, P>
where
P: Protocol,
E: Endpoint<Item = Req>,
E: Endpoint,
{
#[inline]
fn update(&mut self, namespace: &str, cfg: &str) {
Expand All @@ -212,7 +209,7 @@ where
let dist = &ns.distribution.clone();

// 把所有的endpoints cache下来
let mut endpoints: Endpoints<'_, Req, P, E> =
let mut endpoints: Endpoints<'_, P, E> =
Endpoints::new(namespace, &self.parser, Memcache);
self.streams.take().into_iter().for_each(|shard| {
endpoints.cache(shard.into());
Expand Down Expand Up @@ -254,7 +251,7 @@ where
}

use std::fmt::{self, Display, Formatter};
impl<E, Req, P> Display for CacheService<E, Req, P> {
impl<E, P> Display for CacheService<E, P> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
Expand Down
25 changes: 11 additions & 14 deletions endpoint/src/kv/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,30 @@ use super::config::Years;
use super::strategy::Strategist;
use super::KVCtx;
#[derive(Clone)]
pub struct KvService<E, Req, P> {
pub struct KvService<E, P> {
shards: Shards<E>,
// selector: Selector,
strategist: Strategist,
parser: P,
cfg: Box<DnsConfig<KvNamespace>>,
_mark: std::marker::PhantomData<Req>,
}

impl<E, Req, P> From<P> for KvService<E, Req, P> {
impl<E, P> From<P> for KvService<E, P> {
#[inline]
fn from(parser: P) -> Self {
Self {
parser,
shards: Default::default(),
strategist: Default::default(),
cfg: Default::default(),
_mark: std::marker::PhantomData,
// selector: Selector::Random,
}
}
}

impl<E, Req, P> Hash for KvService<E, Req, P>
impl<E, P> Hash for KvService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
E: Endpoint,
P: Protocol,
{
#[inline]
Expand All @@ -59,15 +56,15 @@ where
}
}

impl<E, Req, P> Topology for KvService<E, Req, P>
impl<E, Req, P> Topology for KvService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
P: Protocol,
{
}

impl<E, Req, P> Endpoint for KvService<E, Req, P>
impl<E, Req, P> Endpoint for KvService<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
Expand Down Expand Up @@ -152,10 +149,10 @@ where
}
}

impl<E, Req, P> TopologyWrite for KvService<E, Req, P>
impl<E, P> TopologyWrite for KvService<E, P>
where
P: Protocol,
E: Endpoint<Item = Req>,
E: Endpoint,
{
fn need_load(&self) -> bool {
self.shards.len() != self.cfg.shards_url.len() || self.cfg.need_load()
Expand All @@ -170,10 +167,10 @@ where
}
}
}
impl<E, Req, P> KvService<E, Req, P>
impl<E, P> KvService<E, P>
where
P: Protocol,
E: Endpoint<Item = Req>,
E: Endpoint,
{
// #[inline]
fn take_or_build(
Expand Down Expand Up @@ -306,7 +303,7 @@ where
true
}
}
impl<E, Req, P> discovery::Inited for KvService<E, Req, P>
impl<E, P> discovery::Inited for KvService<E, P>
where
E: discovery::Inited,
{
Expand Down
30 changes: 13 additions & 17 deletions endpoint/src/msgque/topo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const OFFLINE_STOP_READ_SECONDS: u64 = 60 * 20;
const OFFLINE_CLEAN_SECONDS: u64 = OFFLINE_STOP_READ_SECONDS + 60 * 2;

#[derive(Clone)]
pub struct MsgQue<E, Req, P> {
pub struct MsgQue<E, P> {
service: String,

// 读写stream需要分开,读会有大量的空读
Expand All @@ -51,10 +51,9 @@ pub struct MsgQue<E, Req, P> {

timeout_write: Timeout,
timeout_read: Timeout,
_marker: std::marker::PhantomData<Req>,
}

impl<E, Req, P> From<P> for MsgQue<E, Req, P> {
impl<E, P> From<P> for MsgQue<E, P> {
#[inline]
fn from(parser: P) -> Self {
Self {
Expand All @@ -70,12 +69,11 @@ impl<E, Req, P> From<P> for MsgQue<E, Req, P> {
max_size: super::BLOCK_SIZE,
timeout_write: Timeout::from_millis(200),
timeout_read: Timeout::from_millis(100),
_marker: Default::default(),
}
}
}

impl<E, Req, P> discovery::Inited for MsgQue<E, Req, P>
impl<E, P> discovery::Inited for MsgQue<E, P>
where
E: discovery::Inited,
{
Expand Down Expand Up @@ -109,10 +107,9 @@ where

const PADDING: Hasher = Hasher::Padding(Padding);

impl<E, Req, P> Hash for MsgQue<E, Req, P>
impl<E, P> Hash for MsgQue<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
E: Endpoint,
P: Protocol,
{
#[inline]
Expand All @@ -121,7 +118,7 @@ where
}
}

impl<E, Req, P> Topology for MsgQue<E, Req, P>
impl<E, Req, P> Topology for MsgQue<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
Expand All @@ -136,7 +133,7 @@ where
}

//TODO: 验证的时候需要考虑512字节这种边界msg
impl<E, Req, P> Endpoint for MsgQue<E, Req, P>
impl<E, Req, P> Endpoint for MsgQue<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
Expand Down Expand Up @@ -209,10 +206,9 @@ where
}

//获得待读取的streams和qid,返回的bool指示是否从offline streams中读取,true为都offline stream
impl<E, Req, P> MsgQue<E, Req, P>
impl<E, P> MsgQue<E, P>
where
E: Endpoint<Item = Req>,
Req: Request,
E: Endpoint,
P: Protocol,
{
fn get_read_idx(&self, ctx: &Context, inited: bool, rw_count: usize) -> (bool, usize) {
Expand Down Expand Up @@ -245,9 +241,9 @@ where
}
}

impl<E, Req, P> MsgQue<E, Req, P>
impl<E, P> MsgQue<E, P>
where
E: Endpoint<Item = Req>,
E: Endpoint,
P: Protocol,
{
// 构建下线的队列
Expand Down Expand Up @@ -380,10 +376,10 @@ where
// }
}

impl<E, Req, P> TopologyWrite for MsgQue<E, Req, P>
impl<E, P> TopologyWrite for MsgQue<E, P>
where
P: Protocol,
E: Endpoint<Item = Req>,
E: Endpoint,
{
#[inline]
fn update(&mut self, name: &str, cfg: &str) {
Expand Down
Loading

0 comments on commit 388cd8c

Please sign in to comment.