Skip to content

Commit

Permalink
Batch cycles improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 authored Oct 1, 2024
1 parent dd8d566 commit 596eaad
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
23 changes: 20 additions & 3 deletions core/payment/src/api/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chrono::{DateTime, Utc};
use serde::Deserialize;
use ya_core_model::payment::local as pay_local;
use ya_core_model::payment::local::{
batch_cycle_response_to_json, ProcessBatchCycleInfo, ProcessBatchCycleSet,
batch_cycle_response_to_json, ProcessBatchCycleInfo, ProcessBatchCycleSet, ProcessPaymentsNow,
};
use ya_service_api_web::middleware::Identity;
use ya_service_bus::typed as bus;
Expand All @@ -16,6 +16,7 @@ pub fn register_endpoints(scope: Scope) -> Scope {
.route("/batchCycles", get().to(get_batch_cycles))
.route("/batchCycle/{platform}", get().to(get_batch_cycle))
.route("/batchCycle", post().to(set_batch_cycle))
.route("/batchCycle/{platform}/now", post().to(set_batch_cycle_now))
}

async fn get_batch_cycles(id: Identity) -> HttpResponse {
Expand Down Expand Up @@ -66,7 +67,7 @@ async fn get_batch_cycle(id: Identity, platform: web::Path<String>) -> HttpRespo
match bus::service(pay_local::BUS_ID)
.call(ProcessBatchCycleInfo {
node_id,
platform: platform.to_string(),
platform: platform.into_inner(),
})
.await
{
Expand All @@ -85,7 +86,23 @@ struct ProcessBatchCycleSetPost {
extra_pay_time_sec: Option<u64>,
next_update: Option<DateTime<Utc>>,
}
async fn set_batch_cycle_now(platform: web::Path<String>, id: Identity) -> HttpResponse {
let node_id = id.identity;

match bus::service(pay_local::BUS_ID)
.call(ProcessPaymentsNow {
node_id,
platform: platform.into_inner(),
skip_resolve: false,
skip_send: false,
})
.await
{
Ok(Ok(batch_cycle)) => response::ok(batch_cycle),
Ok(Err(e)) => response::server_error(&e),
Err(e) => response::server_error(&e),
}
}
async fn set_batch_cycle(body: web::Json<ProcessBatchCycleSetPost>, id: Identity) -> HttpResponse {
let node_id = id.identity;
let cycle_set = body.into_inner();
Expand All @@ -95,7 +112,7 @@ async fn set_batch_cycle(body: web::Json<ProcessBatchCycleSetPost>, id: Identity
let extra_pay_time = cycle_set
.extra_pay_time_sec
.map(core::time::Duration::from_secs)
.unwrap_or(DEFAULT_EXTRA_PAY_TIME.to_std().unwrap());
.unwrap_or(PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME.to_std().unwrap());
let next_update = cycle_set.next_update;

match bus::service(pay_local::BUS_ID)
Expand Down
4 changes: 3 additions & 1 deletion core/payment/src/dao.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub use self::allocation::AllocationReleaseStatus;
pub use self::allocation::AllocationStatus;
pub use self::allocation::{spend_from_allocation, SpendFromAllocationArgs};
pub use self::batch::{BatchDao, BatchItemFilter};
pub use self::cycle::{BatchCycleDao, DEFAULT_EXTRA_PAY_TIME};
pub use self::cycle::{
BatchCycleDao, PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME, PAYMENT_CYCLE_DEFAULT_INTERVAL,
};
pub use self::debit_note::DebitNoteDao;
pub use self::debit_note_event::DebitNoteEventDao;
pub use self::invoice::InvoiceDao;
Expand Down
37 changes: 31 additions & 6 deletions core/payment/src/dao/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::models::cycle::{
use crate::schema::pay_batch_cycle::dsl;
use chrono::{DateTime, Duration, Utc};
use diesel::{self, BoolExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
use lazy_static::lazy_static;
use std::env;
use ya_client_model::NodeId;
use ya_persistence::executor::{do_with_transaction, AsDao, ConnType, PoolType};
use ya_persistence::types::AdaptTimestamp;
Expand All @@ -22,8 +24,31 @@ impl<'c> AsDao<'c> for BatchCycleDao<'c> {
}
}

pub const DEFAULT_INTERVAL: Duration = Duration::minutes(5);
pub const DEFAULT_EXTRA_PAY_TIME: Duration = Duration::minutes(4);
fn get_default_payment_cycle_interval() -> chrono::Duration {
Duration::from_std(
humantime::parse_duration(
&env::var("PAYMENT_CYCLE_DEFAULT_INTERVAL").unwrap_or("24h".to_string()),
)
.expect("Failed to parse PAYMENT_CYCLE_DEFAULT_INTERVAL"),
)
.expect("Failed to convert PAYMENT_CYCLE_DEFAULT_INTERVAL to chrono::Duration")
}

fn get_default_payment_cycle_extra_pay_time() -> chrono::Duration {
Duration::from_std(
humantime::parse_duration(
&env::var("PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME").unwrap_or("1h".to_string()),
)
.expect("Failed to parse PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME"),
)
.expect("Failed to convert PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME to chrono::Duration")
}

lazy_static! {
pub static ref PAYMENT_CYCLE_DEFAULT_INTERVAL: Duration = get_default_payment_cycle_interval();
pub static ref PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME: Duration =
get_default_payment_cycle_extra_pay_time();
}

fn get_or_insert_default_entry_private(
conn: &ConnType,
Expand All @@ -47,8 +72,8 @@ fn get_or_insert_default_entry_private(
let batch_cycle = create_batch_cycle_based_on_interval(
node_id,
platform.clone(),
DEFAULT_INTERVAL,
DEFAULT_EXTRA_PAY_TIME,
*PAYMENT_CYCLE_DEFAULT_INTERVAL,
*PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME,
)
.expect("Failed to create default batch cycle");
diesel::insert_into(dsl::pay_batch_cycle)
Expand Down Expand Up @@ -139,7 +164,7 @@ impl<'c> BatchCycleDao<'c> {
owner_id,
platform.clone(),
interval,
safe_payout.unwrap_or(DEFAULT_EXTRA_PAY_TIME),
safe_payout.unwrap_or(*PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME),
) {
Ok(cycle) => cycle,
Err(err) => {
Expand All @@ -154,7 +179,7 @@ impl<'c> BatchCycleDao<'c> {
owner_id,
platform.clone(),
&cron,
safe_payout.unwrap_or(DEFAULT_EXTRA_PAY_TIME),
safe_payout.unwrap_or(*PAYMENT_CYCLE_DEFAULT_EXTRA_PAY_TIME),
) {
Ok(cycle) => cycle,
Err(err) => {
Expand Down

0 comments on commit 596eaad

Please sign in to comment.