Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

invoice rejection #2926

Merged
merged 13 commits into from
Jan 30, 2024
Merged
1 change: 0 additions & 1 deletion core/model/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ pub struct Ack {}
bitflags! {
#[derive(Serialize, Deserialize)]
pub struct AccountMode : usize {
const NONE = 0b000;
const RECV = 0b001;
const SEND = 0b010;
const ALL = Self::RECV.bits | Self::SEND.bits;
Expand Down
19 changes: 16 additions & 3 deletions core/model/src/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,13 +710,24 @@ pub mod public {

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RejectInvoice {
pub struct RejectInvoiceV2 {
pub invoice_id: String,
pub rejection: Rejection,
pub issuer_id: NodeId,
}

impl RejectInvoiceV2 {
pub fn new(invoice_id: String, rejection: Rejection, issuer_id: NodeId) -> Self {
Self {
invoice_id,
rejection,
issuer_id,
}
}
}

impl RpcMessage for RejectInvoice {
const ID: &'static str = "RejectInvoice";
impl RpcMessage for RejectInvoiceV2 {
const ID: &'static str = "RejectInvoiceV2";
type Item = Ack;
type Error = AcceptRejectError;
}
Expand Down Expand Up @@ -763,6 +774,8 @@ pub mod public {
pub payments: Vec<SendPayment>,
/// Invoice acceptances.
pub invoice_accepts: Vec<AcceptInvoice>,
/// Invoice rejections.
pub invoice_rejects: Vec<RejectInvoiceV2>,
/// Debit note acceptances.
///
/// Only last debit note in chain is included per agreement.
Expand Down
22 changes: 21 additions & 1 deletion core/payment/examples/payment_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn fake_list_identities(identities: Vec<NodeId>) {
is_locked: false,
});
}
async move { Ok(accounts) }
std::future::ready(Ok(accounts))
});
}

Expand Down Expand Up @@ -204,6 +204,7 @@ async fn main() -> anyhow::Result<()> {
.provider_addr
.unwrap_or_else(|| provider_id.clone())
.to_lowercase();
let provider_pub_key = provider_account.public();

let requestor_pass: Protected = args.requestor_pass.clone().into();
let requestor_account = load_or_generate(&args.requestor_key_path, requestor_pass);
Expand All @@ -212,6 +213,7 @@ async fn main() -> anyhow::Result<()> {
.requestor_addr
.unwrap_or_else(|| requestor_id.clone())
.to_lowercase();
let requestor_pub_key = requestor_account.public();

log::info!(
"Provider ID: {}\nProvider address: {}\nRequestor ID: {}\nRequestor address: {}",
Expand Down Expand Up @@ -319,6 +321,24 @@ async fn main() -> anyhow::Result<()> {
utils::fake_get_agreement(args.agreement_id.clone(), agreement);
utils::provider::fake_get_agreement_id(args.agreement_id.clone());

bus::bind(identity::BUS_ID, {
let provider_key = provider_pub_key.clone();
let requestor_key = requestor_pub_key.clone();
move |msg: identity::GetPubKey| {
let node_id: &[u8; 20] = msg.0.as_ref();
let pub_key = if node_id == provider_key.address() {
Some(provider_key.bytes())
} else if node_id == requestor_key.address() {
Some(requestor_key.bytes())
} else {
None
}
.map(|bytes| bytes.to_vec())
.ok_or(identity::Error::NodeNotFound(Box::new(msg.0)));
std::future::ready(pub_key)
}
});

let provider_id = provider_id.parse()?;
let requestor_id = requestor_id.parse()?;
log::info!("bind remote...");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX pay_invoice_send_reject_idx;

ALTER TABLE pay_invoice REMOVE COLUMN send_reject;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE pay_invoice ADD COLUMN send_reject BOOLEAN NOT NULL DEFAULT FALSE;

CREATE INDEX pay_invoice_send_reject_idx ON pay_invoice (send_reject);
2 changes: 1 addition & 1 deletion core/payment/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) struct Account {

pub(crate) async fn init_account(account: Account) -> anyhow::Result<()> {
log::debug!("Initializing payment account {:?}...", account);
let mut mode = AccountMode::NONE;
let mut mode = AccountMode::empty();
mode.set(AccountMode::SEND, account.send);
mode.set(AccountMode::RECV, account.receive);
match bus::service(driver_bus_id(account.driver.clone()))
Expand Down
91 changes: 88 additions & 3 deletions core/payment/src/api/invoices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use metrics::{counter, timing};
use ya_client_model::payment::*;
use ya_core_model::payment::local::{SchedulePayment, BUS_ID as LOCAL_SERVICE};
use ya_core_model::payment::public::{
AcceptInvoice, AcceptRejectError, CancelError, CancelInvoice, SendError, SendInvoice,
BUS_ID as PUBLIC_SERVICE,
AcceptInvoice, AcceptRejectError, CancelError, CancelInvoice, RejectInvoiceV2, SendError,
SendInvoice, BUS_ID as PUBLIC_SERVICE,
};
use ya_core_model::payment::RpcMessageError;
use ya_net::RemoteEndpoint;
Expand Down Expand Up @@ -487,6 +487,7 @@ async fn accept_invoice(
response?;
} else {
log::debug!("AcceptInvoice not delivered");
// TODO: should this be issuer_id instead? It will notify itself....
sync_dao.upsert(node_id).await?;
SYNC_NOTIFS_NOTIFY.notify_one();
}
Expand Down Expand Up @@ -527,6 +528,90 @@ async fn reject_invoice(
path: Path<params::InvoiceId>,
query: Query<params::Timeout>,
body: Json<Rejection>,
id: Identity,
) -> HttpResponse {
response::not_implemented() // TODO
let start = Instant::now();

let invoice_id = path.invoice_id.clone();
let node_id = id.identity;
let rejection = body.into_inner();

log::debug!("Requested reject invoice [{}]", invoice_id);
counter!("payment.invoices.requestor.rejected.call", 1);

let dao: InvoiceDao = db.as_dao();
let sync_dao: SyncNotifsDao = db.as_dao();

log::trace!("Querying DB for Invoice [{}]", invoice_id);
let invoice = match dao.get(invoice_id.clone(), node_id).await {
Ok(Some(invoice)) => invoice,
Ok(None) => return response::not_found(),
Err(e) => return response::server_error(&e),
};

match invoice.status {
DocumentStatus::Received => (),
DocumentStatus::Rejected => return response::ok(Null),
DocumentStatus::Failed => (),
DocumentStatus::Accepted => return response::bad_request(&"Invoice accepted"),
DocumentStatus::Settled => return response::bad_request(&"Invoice settled"),
DocumentStatus::Cancelled => return response::bad_request(&"Invoice cancelled"),
DocumentStatus::Issued => return response::server_error(&"Illegal status: issued"),
}

let timeout = query.timeout.unwrap_or(params::DEFAULT_ACK_TIMEOUT);
let result = async move {
let issuer_id = invoice.issuer_id;
let reject_msg = RejectInvoiceV2::new(invoice_id.clone(), rejection.clone(), issuer_id);
match async move {
log::trace!("Rejecting Invoice [{}] in DB", invoice_id);
dao.reject(invoice_id.clone(), node_id, rejection).await?;
log::trace!("Invoice rejected successfully for [{}]", invoice_id);

log::debug!(
"Sending RejectInvoiceV2 [{}] to [{}]",
invoice_id,
issuer_id
);
let send_result = ya_net::from(node_id)
.to(issuer_id)
.service(PUBLIC_SERVICE)
.call(reject_msg)
.await;

if let Ok(response) = send_result {
log::debug!("RejectInvoiceV2 delivered");
dao.mark_reject_sent(invoice_id.clone(), node_id).await?;
response?;
} else {
log::debug!("RejectInvoiceV2 not delivered");
sync_dao.upsert(issuer_id).await?;
SYNC_NOTIFS_NOTIFY.notify_one();
}

Ok(())
}
.timeout(Some(timeout))
.await
{
Ok(Ok(_)) => {
counter!("payment.invoices.requestor.rejected", 1);
log::info!("Invoice [{}] rejected.", path.invoice_id);
response::ok(Null)
}
Ok(Err(Error::Rpc(RpcMessageError::AcceptReject(AcceptRejectError::BadRequest(
e,
))))) => response::bad_request(&e),
Ok(Err(e)) => response::server_error(&e),
Err(_) => response::timeout(&"Timeout rejecting Invoice on remote Node."),
}
}
.await;

timing!(
"payment.invoices.requestor.rejected.time",
start,
Instant::now()
);
result
}
83 changes: 68 additions & 15 deletions core/payment/src/dao/invoice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use diesel::{
};
use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use ya_client_model::payment::{DocumentStatus, Invoice, InvoiceEventType, NewInvoice};
use ya_client_model::payment::{DocumentStatus, Invoice, InvoiceEventType, NewInvoice, Rejection};
use ya_client_model::NodeId;
use ya_core_model::payment::local::StatValue;
use ya_persistence::executor::{
Expand Down Expand Up @@ -413,20 +413,73 @@ impl<'c> InvoiceDao<'c> {
.await
}

// TODO: Implement reject invoice
// pub async fn reject(&self, invoice_id: String, owner_id: NodeId) -> DbResult<()> {
// do_with_transaction(self.pool, move |conn| {
// let (agreement_id, amount, role): (String, BigDecimalField, Role) = dsl::pay_invoice
// .find((&invoice_id, &owner_id))
// .select((dsl::agreement_id, dsl::amount, dsl::role))
// .first(conn)?;
// update_status(&invoice_id, &owner_id, &DocumentStatus::Accepted, conn)?;
// invoice_event::create::<()>(invoice_id, owner_id, InvoiceEventType::InvoiceRejectedEvent { ... }, None, conn)?;
//
// Ok(())
// })
// .await
// }
pub async fn reject(
&self,
invoice_id: String,
owner_id: NodeId,
rejection: Rejection,
) -> DbResult<()> {
do_with_transaction(self.pool, "invoice_reject", move |conn| {
let (agreement_id, amount, role): (String, BigDecimalField, Role) = dsl::pay_invoice
.find((&invoice_id, &owner_id))
.select((dsl::agreement_id, dsl::amount, dsl::role))
.first(conn)?;
update_status(&invoice_id, &owner_id, &DocumentStatus::Rejected, conn)?;
if role == Role::Requestor {
diesel::update(
dsl::pay_invoice
.filter(dsl::id.eq(invoice_id.clone()))
.filter(dsl::owner_id.eq(owner_id)),
)
.set(dsl::send_reject.eq(true))
.execute(conn)?;
}
invoice_event::create(
invoice_id,
owner_id,
InvoiceEventType::InvoiceRejectedEvent { rejection },
conn,
)?;
Ok(())
})
.await
}

pub async fn mark_reject_sent(&self, invoice_id: String, owner_id: NodeId) -> DbResult<()> {
do_with_transaction(self.pool, "mark_reject_sent", move |conn| {
diesel::update(
dsl::pay_invoice
.filter(dsl::id.eq(invoice_id))
.filter(dsl::owner_id.eq(owner_id)),
)
.set(dsl::send_reject.eq(false))
.execute(conn)?;
Ok(())
})
.await
}

pub async fn unsent_rejected(&self, owner_id: NodeId) -> DbResult<Vec<Invoice>> {
readonly_transaction(self.pool, "unsent_rejected", move |conn| {
let invoices: Vec<ReadObj> = query!()
.filter(dsl::owner_id.eq(owner_id))
.filter(dsl::send_reject.eq(true))
.filter(dsl::status.eq(DocumentStatus::Rejected.to_string()))
.load(conn)?;

let activities = activity_dsl::pay_invoice_x_activity
.inner_join(
dsl::pay_invoice.on(activity_dsl::owner_id
.eq(dsl::owner_id)
.and(activity_dsl::invoice_id.eq(dsl::id))),
)
.filter(dsl::owner_id.eq(owner_id))
.select(crate::schema::pay_invoice_x_activity::all_columns)
.load(conn)?;
join_invoices_with_activities(invoices, activities)
})
.await
}

pub async fn cancel(&self, invoice_id: String, owner_id: NodeId) -> DbResult<()> {
do_with_transaction(self.pool, "invoice_dao_cancel", move |conn| {
Expand Down
Loading
Loading