Skip to content

Commit

Permalink
chore: use actor ctx instead of loop to rerun agent -> balancer conne…
Browse files Browse the repository at this point in the history
…ction
  • Loading branch information
mcharytoniuk committed Nov 14, 2024
1 parent 6e9a8c5 commit a58e2ba
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions src/agent/state_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use actix::{fut::future::WrapFuture, AsyncContext};
use log::error;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::time;

use crate::balancer::status_update::StatusUpdate;
use crate::errors::result::Result;
Expand Down Expand Up @@ -33,33 +32,32 @@ impl actix::Actor for StateReporter {
let stats_endpoint_url = self.stats_endpoint_url.clone();
let status_update_tx = self.status_update_tx.clone();

ctx.spawn(
async move {
loop {
let rx = status_update_tx.subscribe();
let stream = tokio_stream::wrappers::BroadcastStream::new(rx);
let reqwest_body = reqwest::Body::wrap_stream(stream);
let fut = async move {
let rx = status_update_tx.subscribe();
let stream = tokio_stream::wrappers::BroadcastStream::new(rx);
let reqwest_body = reqwest::Body::wrap_stream(stream);

let result = reqwest::Client::new()
.post(stats_endpoint_url.clone())
.body(reqwest_body)
.send()
.await;
let result = reqwest::Client::new()
.post(stats_endpoint_url.clone())
.body(reqwest_body)
.send()
.await;

match result {
Ok(_) => {
error!("Mangement server connection closed");
}
Err(err) => {
error!("Management server error: {}", err);
}
}

time::sleep(time::Duration::from_secs(1)).await;
match result {
Ok(_) => {
error!("Management server connection closed");
}
Err(err) => {
error!("Management server error: {}", err);
}
}
.into_actor(self),
);
}
.into_actor(self);

ctx.wait(fut);
ctx.run_later(std::time::Duration::from_secs(1), |act, ctx| {
act.started(ctx);
});
}
}

Expand Down

0 comments on commit a58e2ba

Please sign in to comment.