Skip to content

Commit

Permalink
chore: persistent connection
Browse files Browse the repository at this point in the history
  • Loading branch information
mcharytoniuk committed Nov 13, 2024
1 parent d18bb5e commit e9a98be
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 85 deletions.
72 changes: 55 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ actix = "0.13.5"
actix-web = "4.9.0"
clap = { version = "4.5.20", features = ["derive"] }
env_logger = "0.11.5"
futures-util = { version = "0.3.31", features = ["tokio-io"] }
log = "0.4.22"
reqwest = { version = "0.12.9", features = ["json", "stream"] }
serde = { version = "1.0.215", features = ["derive"] }
Expand Down
5 changes: 1 addition & 4 deletions src/agent/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ pub struct Agent {
}

impl Agent {
pub fn new(
llamacpp_client: LlamacppClient,
name: Option<String>,
) -> Self {
pub fn new(llamacpp_client: LlamacppClient, name: Option<String>) -> Self {
Self {
id: uuid::Uuid::new_v4(),
name,
Expand Down
39 changes: 16 additions & 23 deletions src/agent/state_reporter.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,64 @@
use actix::{
AsyncContext,
fut::future::WrapFuture,
};
use actix::{fut::future::WrapFuture, AsyncContext};
use log::error;
use std::sync::Arc;
use tokio::time;
use tokio::sync::broadcast;
use tokio::time;

use crate::balancer::status_update::StatusUpdate;
use crate::errors::result::Result;

#[allow(dead_code)]
pub struct StateReporter {
management_addr: url::Url,
stats_endpoint_url: String,
status_update_rx: broadcast::Receiver<actix_web::web::Bytes>,
status_update_tx: Arc<broadcast::Sender<actix_web::web::Bytes>>,
}

impl StateReporter {
pub fn new(management_addr: url::Url) -> Self {
pub fn new(management_addr: url::Url) -> Result<Self> {
let (tx, rx) = broadcast::channel(1);

Self {
management_addr,
Ok(Self {
stats_endpoint_url: management_addr.join("/stream")?.to_string(),
status_update_rx: rx,
status_update_tx: Arc::new(tx),
}
})
}
}

impl actix::Actor for StateReporter {
type Context = actix::Context<Self>;

fn started(&mut self, ctx: &mut actix::Context<Self>) {
let management_addr = self.management_addr.clone();
let stats_endpoint_url = self.stats_endpoint_url.clone();
let status_update_tx = self.status_update_tx.clone();

ctx.spawn(
async move {
let mut interval = time::interval(time::Duration::from_secs(1));

loop {
interval.tick().await;

let rx = status_update_tx.subscribe();
let stream = tokio_stream::wrappers::BroadcastStream::new(rx);
let reqwest_body = reqwest::Body::wrap_stream(stream);

let result = reqwest::Client::new()
.post(management_addr.clone())
.post(stats_endpoint_url.clone())
.body(reqwest_body)
.send()
.await
;
.await;

match result {
Ok(_) => {
error!("Mangement server connection closed");
},
}
Err(err) => {
error!("Management server error: {}", err);
},
}
}

time::sleep(time::Duration::from_secs(1)).await;
}
}
.into_actor(self)
.into_actor(self),
);
}
}
Expand All @@ -72,8 +67,6 @@ impl actix::Handler<StatusUpdate> for StateReporter {
type Result = ();

fn handle(&mut self, msg: StatusUpdate, _ctx: &mut actix::Context<Self>) {
println!("Received status update: {:?}", msg);

serde_json::to_vec(&msg)
.map(|bytes| actix_web::web::Bytes::from(bytes))
.map_err(|err| {
Expand Down
1 change: 1 addition & 0 deletions src/balancer/http_route/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod receive_status_update;
17 changes: 17 additions & 0 deletions src/balancer/http_route/receive_status_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use actix_web::{post, web, Error, HttpResponse};
use futures_util::StreamExt as _;

pub fn register(cfg: &mut web::ServiceConfig) {
cfg.service(respond);
}

#[post("/stream")]
async fn respond(mut payload: web::Payload) -> Result<HttpResponse, Error> {
while let Some(chunk) = payload.next().await {
println!("Chunk: {:?}", chunk);
}

println!("Stream ended");

Ok(HttpResponse::Ok().finish())
}
1 change: 1 addition & 0 deletions src/balancer/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod http_route;
pub mod status_update;
8 changes: 2 additions & 6 deletions src/balancer/status_update.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};

use crate::llamacpp::slot::Slot;

Expand All @@ -10,11 +10,7 @@ pub struct StatusUpdate {
}

impl StatusUpdate {
pub fn new(
agent_id: uuid::Uuid,
agent_name: Option<String>,
slots: Vec<Slot>
) -> Self {
pub fn new(agent_id: uuid::Uuid, agent_name: Option<String>, slots: Vec<Slot>) -> Self {
Self {
agent_id,
agent_name,
Expand Down
Loading

0 comments on commit e9a98be

Please sign in to comment.