Skip to content

Commit

Permalink
payment: allow skipping starting the sync job during service bind
Browse files Browse the repository at this point in the history
  • Loading branch information
kamirr committed Dec 4, 2023
1 parent 5713701 commit 155e4f6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
4 changes: 3 additions & 1 deletion core/payment/examples/payment_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethsign::keyfile::Bytes;
use ethsign::{KeyFile, Protected, SecretKey};
use futures::Future;
use rand::Rng;
use ya_payment::service::BindOptions;

use std::convert::TryInto;
use std::io::Write;
Expand Down Expand Up @@ -245,10 +246,11 @@ async fn main() -> anyhow::Result<()> {
db.apply_migration(migrations::run_with_output)?;

ya_sb_router::bind_gsb_router(None).await?;

log::debug!("bind_gsb_router()");

let processor = PaymentProcessor::new(db.clone());
ya_payment::service::bind_service(&db, processor);
ya_payment::service::bind_service(&db, processor, BindOptions::default().run_sync_job(false));
log::debug!("bind_service()");

let driver_name = match args.driver {
Expand Down
3 changes: 2 additions & 1 deletion core/payment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![allow(unused_variables)] // Crate under development
use crate::processor::PaymentProcessor;
use futures::FutureExt;
use service::BindOptions;
use std::time::Duration;
use ya_core_model::payment::local as pay_local;
use ya_persistence::executor::DbExecutor;
Expand Down Expand Up @@ -52,7 +53,7 @@ impl PaymentService {
db.apply_migration(migrations::run_with_output)?;

let processor = PaymentProcessor::new(db.clone());
self::service::bind_service(&db, processor.clone());
self::service::bind_service(&db, processor.clone(), BindOptions::default());

tokio::task::spawn(async move {
processor.release_allocations(false).await;
Expand Down
2 changes: 1 addition & 1 deletion core/payment/src/payment_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub fn send_sync_notifs_job(db: DbExecutor) {
default_sleep
}
Ok(duration) => {
log::trace!("PaymentSyncNeeded sendout job done");
log::debug!("PaymentSyncNeeded sendout job done");
duration.unwrap_or(default_sleep)
}
};
Expand Down
39 changes: 34 additions & 5 deletions core/payment/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,35 @@ use ya_core_model::payment::public::{AcceptDebitNote, AcceptInvoice, PaymentSync
use ya_persistence::executor::DbExecutor;
use ya_service_bus::typed::{service, ServiceBinder};

pub fn bind_service(db: &DbExecutor, processor: PaymentProcessor) {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BindOptions {
/// Enables background job for synchronizing invoice / debit note document status.
///
/// This depends on the identity service being enabled to work. If you're working with a limited
/// subsets of services (e.g. in payment_api.rs example) you might wish to disable that.
pub run_sync_job: bool,
}

impl BindOptions {
/// Configure the `run_async_job` option.
pub fn run_sync_job(mut self, value: bool) -> Self {
self.run_sync_job = value;
self
}
}

impl Default for BindOptions {
fn default() -> Self {
BindOptions { run_sync_job: true }
}
}

pub fn bind_service(db: &DbExecutor, processor: PaymentProcessor, opts: BindOptions) {
log::debug!("Binding payment service to service bus");

let processor = Arc::new(Mutex::new(processor));
local::bind_service(db, processor.clone());
public::bind_service(db, processor);
public::bind_service(db, processor, opts);

log::debug!("Successfully bound payment service to service bus");
}
Expand Down Expand Up @@ -413,7 +436,11 @@ mod public {
use ya_core_model::payment::public::*;
use ya_persistence::types::Role;

pub fn bind_service(db: &DbExecutor, processor: Arc<Mutex<PaymentProcessor>>) {
pub fn bind_service(
db: &DbExecutor,
processor: Arc<Mutex<PaymentProcessor>>,
opts: BindOptions,
) {
log::debug!("Binding payment public service to service bus");

ServiceBinder::new(BUS_ID, db, processor)
Expand All @@ -429,8 +456,10 @@ mod public {
.bind_with_processor(send_payment)
.bind_with_processor(sync_payment);

send_sync_notifs_job(db.clone());
send_sync_requests(db.clone());
if opts.run_sync_job {
send_sync_notifs_job(db.clone());
send_sync_requests(db.clone());
}

log::debug!("Successfully bound payment public service to service bus");
}
Expand Down

0 comments on commit 155e4f6

Please sign in to comment.