Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add channel extend method in indexer #375

Merged
merged 22 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/indexer-coordinator/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@subql/indexer-coordinator",
"version": "2.0.5",
"version": "2.0.6",
"description": "",
"author": "SubQuery",
"license": "Apache-2.0",
Expand Down
5 changes: 3 additions & 2 deletions apps/indexer-coordinator/src/core/onchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ export class OnChainService implements OnApplicationBootstrap {
const { status, expiredAt, terminatedAt } = channel;
const now = Math.floor(Date.now() / 1000);

const isOpenChannelClaimable = status === ChannelStatus.OPEN && expiredAt.lt(now);
// TODO terminate
// const isOpenChannelClaimable = status === ChannelStatus.OPEN && expiredAt.lt(now);
const isTerminateChannelClaimable =
status === ChannelStatus.TERMINATING && terminatedAt.lt(now);
if (!isOpenChannelClaimable && !isTerminateChannelClaimable) continue;
if (!isTerminateChannelClaimable) continue;

await this.contractService.sendTransaction(
`claim unfinalized plan for ${node.consumer}`,
Expand Down
8 changes: 8 additions & 0 deletions apps/indexer-coordinator/src/payg/payg.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ export class PaygResolver {
return this.paygService.update(id, spent, isFinal, indexerSign, consumerSign);
}

@Mutation(() => ChannelType)
channelExtend(
@Args('id') id: string,
@Args('expiration') expiration: number
) {
return this.paygService.extend(id, expiration);
}

@Mutation(() => ChannelType)
channelCheckpoint(@Args('id') id: string) {
return this.paygService.checkpoint(id);
Expand Down
27 changes: 25 additions & 2 deletions apps/indexer-coordinator/src/payg/payg.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ export class PaygService {
remote: '0',
lastFinal: false,
spent: spent.toString(),
expiredAt: 0,
});
}

Expand All @@ -127,7 +128,10 @@ export class PaygService {
channelEntity.deploymentId = bytes32ToCid(deploymentId);
channelEntity.total = total.toString();
channelEntity.onchain = spent.toString();
channelEntity.expiredAt = expiredAt.toNumber();
const newExpiredAt = expiredAt.toNumber();
if (channelEntity.expiredAt < newExpiredAt) {
channelEntity.expiredAt = newExpiredAt;
}
channelEntity.terminatedAt = terminatedAt.toNumber();
channelEntity.terminateByIndexer = terminateByIndexer;

Expand Down Expand Up @@ -171,6 +175,7 @@ export class PaygService {
spent: '0',
remote: '0',
lastFinal: false,
expiredAt: 0,
});
}

Expand Down Expand Up @@ -203,7 +208,10 @@ export class PaygService {
channelEntity.deploymentId = deployment.id;
channelEntity.total = total.toString();
channelEntity.onchain = spent.toString();
channelEntity.expiredAt = new Date(expiredAt).getTime() / 1000;
const newExpiredAt = new Date(expiredAt).getTime() / 1000;
if (channelEntity.expiredAt < newExpiredAt) {
channelEntity.expiredAt = newExpiredAt;
}
channelEntity.terminatedAt = new Date(terminatedAt).getTime() / 1000;
channelEntity.terminateByIndexer = terminateByIndexer;

Expand Down Expand Up @@ -355,6 +363,21 @@ export class PaygService {
}
}

async extend(id: string, expiration: number): Promise<Channel> {
const channel = await this.channel(id);
if (!channel) {
throw new Error(`channel not exist: ${id}`);
}

if (channel.expiredAt < expiration) {
channel.expiredAt = expiration;
}

logger.debug(`Extend state channel ${id}`);

return this.saveAndPublish(channel, PaygEvent.State);
}

async checkpoint(id: string): Promise<Channel> {
const channel = await this.channel(id);
if (!channel) {
Expand Down
4 changes: 3 additions & 1 deletion apps/indexer-coordinator/src/payg/payg.sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ export class PaygSyncService implements OnApplicationBootstrap {
const channel = await this.paygService.channel(id);
if (!channel) return;

channel.expiredAt = expiredAt;
if (channel.expiredAt < expiredAt) {
channel.expiredAt = expiredAt;
}
channel.terminatedAt = expiredAt;
await this.paygService.saveAndPublish(channel, PaygEvent.State);
}
Expand Down
2 changes: 2 additions & 0 deletions apps/indexer-proxy/doc/error-code-dictionary.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- `1046` - Invalid request: payg query cannot fetch data from graphql.
- `1047` - Invalid request: payg query missing project and query field.
- `1048` - Invalid project price: price signer is not controller.
- `1049` - Invalid project price: extend price lower.
- `1050` - Payg conflict: local state > remote state + price * max_conflict_number.
- `1051` - Daily limit: reach the max limit of agreement in 24h.
- `1052` - Rate limit: reach the max limit of agreement in 1s.
Expand Down Expand Up @@ -88,3 +89,4 @@
- `1140` - Serialize: subquery's query is invalid.
- `1200` - Service exception: EVM RPC invalid
- `1201` - Service exception: EVM RPC last block
- `1202` - Service exception: indexer service exception.
2 changes: 1 addition & 1 deletion apps/indexer-proxy/proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "subql-indexer-proxy"
version = "2.0.5"
version = "2.0.6"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion apps/indexer-proxy/proxy/src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub const PROJECT_QUERY: &str =
pub const PAYG_QUERY: &str = "query { getAlivePaygs { id price token expiration overflow } }";

pub const CHANNEL_QUERY: &str =
"query { getAliveChannels { id consumer agent total spent remote price lastFinal expiredAt } }";
"query { getAliveChannels { id consumer deploymentId agent total spent remote price lastFinal expiredAt } }";

pub fn poi_with_block(block: u64) -> String {
format!(
Expand Down
144 changes: 131 additions & 13 deletions apps/indexer-proxy/proxy/src/payg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ use base64::{engine::general_purpose, Engine as _};
use chrono::prelude::*;
use ethers::{
signers::LocalWallet,
types::{Address, U256},
types::{Address, H256, U256},
};
use redis::RedisResult;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use subql_indexer_utils::{
error::Error,
payg::{convert_sign_to_string, price_recover, price_sign, OpenState, QueryState},
payg::{
convert_sign_to_string, convert_string_to_sign, extend_recover, extend_sign, price_recover,
price_sign, OpenState, QueryState,
},
request::{graphql_request, GraphQLQuery},
tools::deployment_cid,
tools::{cid_deployment, deployment_cid},
types::Result,
};

Expand All @@ -49,9 +52,12 @@ use crate::metrics::{MetricsNetwork, MetricsQuery};
use crate::p2p::report_conflict;
use crate::project::{get_project, list_projects, Project};

const CURRENT_VERSION: u8 = 1;
const CURRENT_VERSION: u8 = 2;

pub struct StateCache {
pub expiration: i64,
pub agent: Address,
pub deployment: H256,
pub price: U256,
pub total: U256,
pub spent: U256,
Expand All @@ -63,24 +69,34 @@ pub struct StateCache {

impl StateCache {
fn from_bytes(bytes: &[u8]) -> Result<StateCache> {
if bytes.len() < 169 {
if bytes[0] != CURRENT_VERSION {
return Err(Error::Serialize(1136));
}
if bytes[0] != CURRENT_VERSION {

if bytes.len() < 229 {
return Err(Error::Serialize(1136));
}

let price = U256::from_little_endian(&bytes[1..33]);
let total = U256::from_little_endian(&bytes[33..65]);
let spent = U256::from_little_endian(&bytes[65..97]);
let remote = U256::from_little_endian(&bytes[97..129]);
let coordi = U256::from_little_endian(&bytes[129..161]);
let mut expiration_bytes = [0u8; 8];
expiration_bytes.copy_from_slice(&bytes[1..9]);
let expiration = i64::from_le_bytes(expiration_bytes);
let agent = Address::from_slice(&bytes[9..29]);
let deployment = H256::from_slice(&bytes[29..61]);

let price = U256::from_little_endian(&bytes[61..93]);
let total = U256::from_little_endian(&bytes[93..125]);
let spent = U256::from_little_endian(&bytes[125..157]);
let remote = U256::from_little_endian(&bytes[157..189]);
let coordi = U256::from_little_endian(&bytes[189..221]);
let mut conflict_bytes = [0u8; 8];
conflict_bytes.copy_from_slice(&bytes[161..169]);
conflict_bytes.copy_from_slice(&bytes[221..229]);
let conflict = i64::from_le_bytes(conflict_bytes);
let signer = ConsumerType::from_bytes(&bytes[169..])?;
let signer = ConsumerType::from_bytes(&bytes[229..])?;

Ok(StateCache {
expiration,
agent,
deployment,
price,
total,
spent,
Expand All @@ -94,6 +110,10 @@ impl StateCache {
fn to_bytes(&self) -> Vec<u8> {
let mut bytes = vec![CURRENT_VERSION];

bytes.extend(&self.expiration.to_le_bytes());
bytes.extend(self.agent.as_fixed_bytes());
bytes.extend(self.deployment.as_fixed_bytes());

let mut u256_bytes = [0u8; 32];
self.price.to_little_endian(&mut u256_bytes);
bytes.extend(u256_bytes);
Expand Down Expand Up @@ -433,10 +453,103 @@ pub async fn query_state(
Ok((data, signature, state_string))
}

pub async fn extend_channel(
channel: String,
expired: i64,
expiration: i32,
signature: String,
) -> Result<String> {
// check channel & signature
let channel_id = U256::from_str_radix(&channel.trim_start_matches("0x"), 16)
.map_err(|_e| Error::Serialize(1120))?;
let sign = convert_string_to_sign(&signature);

let (state_cache, _keyname) = fetch_channel_cache(channel_id).await?;

// check price
let project_id = deployment_cid(&state_cache.deployment);
let project = get_project(&project_id).await?;
if project.payg_price > state_cache.price {
return Err(Error::InvalidProjectPrice(1049));
}

let gap = if expired > state_cache.expiration {
expired - state_cache.expiration
} else {
state_cache.expiration - expired
};
if gap > 600 {
return Err(Error::InvalidProjectPrice(1049));
}

let account = ACCOUNT.read().await;
let indexer = account.indexer;
drop(account);

let signer = extend_recover(
channel_id,
indexer,
state_cache.agent,
U256::from(expired),
U256::from(expiration),
sign,
)?;

// check signer
if !state_cache.signer.contains(&signer) {
warn!(
"Extend: {:?} {} {:?} {:?} {} {} {}",
signer,
channel_id,
indexer,
state_cache.agent,
state_cache.expiration,
expiration,
convert_sign_to_string(&sign)
);
return Err(Error::InvalidSignature(1055));
}

// send to coordinator
let expired_at = expired + expiration as i64;
let mdata = format!(
r#"mutation {{
channelExtend(
id:"{:#X}",
expiration:{},
)
{{ id, expiredAt }}
}}"#,
channel_id, expired_at
);
let url = COMMAND.graphql_url();
let query = GraphQLQuery::query(&mdata);
graphql_request(&url, &query).await.map_err(|e| {
error!("{:?}", e);
Error::ServiceException(1202)
})?;

let account = ACCOUNT.read().await;
let indexer_sign = extend_sign(
channel_id,
indexer,
state_cache.agent,
U256::from(expired),
U256::from(expiration),
&account.controller,
)
.await?;
drop(account);

Ok(convert_sign_to_string(&indexer_sign))
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ChannelItem {
pub id: String,
pub consumer: String,
#[serde(rename = "deploymentId")]
pub deployment: String,
pub agent: String,
pub total: String,
pub spent: String,
Expand All @@ -463,6 +576,7 @@ pub async fn handle_channel(value: &Value) -> Result<()> {
.consumer
.parse()
.map_err(|_e| Error::Serialize(1121))?;
let deployment: H256 = cid_deployment(&channel.deployment);
let agent: Address = channel.agent.parse().unwrap_or(Address::zero());
let total = U256::from_dec_str(&channel.total).map_err(|_e| Error::Serialize(1122))?;
let spent = U256::from_dec_str(&channel.spent).map_err(|_e| Error::Serialize(1123))?;
Expand Down Expand Up @@ -494,6 +608,7 @@ pub async fn handle_channel(value: &Value) -> Result<()> {
None
};
let state_cache = if let Some(mut state_cache) = state_cache_op {
state_cache.expiration = channel.expired;
state_cache.total = total;
if state_cache.remote != remote {
debug!(
Expand Down Expand Up @@ -521,6 +636,9 @@ pub async fn handle_channel(value: &Value) -> Result<()> {
} else {
let signer = check_state_channel_consumer(consumer, agent).await?;
StateCache {
expiration: channel.expired,
agent,
deployment,
price,
total,
spent,
Expand Down
Loading
Loading