Skip to content

Commit

Permalink
DEX-827 Invalid balances (#316)
Browse files Browse the repository at this point in the history
* Fixed a bug with invalid balances;
* Added logs of balance changes;
* Added a test to reproduce the issue.
  • Loading branch information
vsuharnikov authored Jul 10, 2020
1 parent 9bf1d38 commit 1a4a964
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.wavesplatform.it.sync.api.ws

import cats.syntax.option._
import com.typesafe.config.{Config, ConfigFactory}
import com.wavesplatform.dex.api.websockets._
import com.wavesplatform.dex.api.websockets.connection.WsConnection
Expand Down Expand Up @@ -228,19 +229,37 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
placeAndAwaitAtDex(bo)
placeAndAwaitAtNode(mkOrderDP(alice, wavesUsdPair, SELL, 5.waves, 1.0))

assertChanges(wsc)(
Map(usd -> WsBalances(0, 10), Waves -> WsBalances(9.997, 0.003)), // Waves balance on Node = 10
Map(usd -> WsBalances(0, 5), Waves -> WsBalances(9.997, 0.0015)), // Waves balance on Node = 10
Map(Waves -> WsBalances(14.997, 0.0015)) // since balance increasing comes after transaction mining, + 5 - 0.0015, Waves balance on Node = 14.9985
)(
WsOrder.fromDomain(limitOrder),
WsOrder(limitOrder.id, status = OrderStatus.PartiallyFilled.name, filledAmount = 5.0, filledFee = 0.0015, avgWeighedPrice = 1.0)
)
eventually {
wsc.balanceChanges.squashed should matchTo(
Map(
usd -> WsBalances(0, 5),
Waves -> WsBalances(14.997, 0.0015) // since balance increasing comes after transaction mining, + 5 - 0.0015, Waves balance on Node = 14.9985
)
)

wsc.orderChanges.squashed should matchTo(
Map(
limitOrder.id -> WsOrder
.fromDomain(limitOrder)
.copy(
id = limitOrder.id,
status = OrderStatus.PartiallyFilled.name.some,
filledAmount = 5.0.some,
filledFee = 0.0015.some,
avgWeighedPrice = 1.0.some
)
))
}
wsc.clearMessages()

dex1.api.cancelAll(acc)
assertChanges(wsc, squash = false) { Map(usd -> WsBalances(5, 0), Waves -> WsBalances(14.9985, 0)) }(
WsOrder(bo.id(), status = OrderStatus.Cancelled.name)
)

eventually {
wsc.balanceChanges.squashed should matchTo(Map(usd -> WsBalances(5, 0), Waves -> WsBalances(14.9985, 0)))
wsc.orderChanges.squashed should matchTo(
Map(limitOrder.id -> WsOrder(bo.id(), status = OrderStatus.Cancelled.name))
)
}

wsc.close()
}
Expand Down Expand Up @@ -493,5 +512,46 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
}
}
}

"DEX-827 Wrong balance" in {
val btcBalance = 461
val carol = mkAccountWithBalance(25.waves -> Waves, btcBalance.btc -> btc)
val wsc = mkWsAddressConnection(carol)

val now = System.currentTimeMillis()
val order1 = mkOrderDP(carol, wavesBtcPair, BUY, 4.7.waves, 6, matcherFee = 0.003.waves, ts = now + 1)
val order2 = mkOrderDP(carol, wavesBtcPair, BUY, 4.7.waves, 6, matcherFee = 0.003.waves, ts = now + 2)
val order3 = mkOrderDP(carol, wavesBtcPair, SELL, 10.waves, 6, matcherFee = 0.003.waves)

dex1.api.place(order1)
dex1.api.place(order2)

placeAndAwaitAtDex(order3, OrderStatus.PartiallyFilled)
dex1.api.cancelAll(carol)

waitForOrderAtNode(order1)
waitForOrderAtNode(order2)
waitForOrderAtNode(order3)

wavesNode1.api.waitForHeightArise()

val expectedWavesBalance = 25.0 - 0.003 * 2 - 0.003 * 4.7 * 2 / 10

wavesNode1.api.balance(carol, Waves) shouldBe expectedWavesBalance.waves
wavesNode1.api.balance(carol, btc) shouldBe btcBalance.btc

dex1.api.tradableBalance(carol, wavesBtcPair) should matchTo(
Map(
Waves -> expectedWavesBalance.waves,
btc -> btcBalance.btc
)
)

wsc.balanceChanges.squashed should matchTo(
Map(
Waves -> WsBalances(expectedWavesBalance, 0),
btc -> WsBalances(btcBalance, 0)
))
}
}
}
11 changes: 0 additions & 11 deletions dex/src/main/scala/com/wavesplatform/dex/AddressActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,17 +436,6 @@ class AddressActor(owner: Address,
}

if (addressWsMutableState.hasActiveSubscriptions) {
// OrderExecuted event and ExchangeTransaction creation are separated in time!
// We should notify SpendableBalanceActor about balances changing, otherwise WS subscribers
// will receive balance changes (its reduction as a result of order partial execution) with
// sensible lag (only after exchange transaction will be put in UTX pool). The increase in
// the balance will be sent to subscribers after this tx will be forged

if (openVolumeDiff.nonEmpty) {
val correction = Group.inverse(openVolumeDiff)
spendableBalancesActor ! SpendableBalancesActor.Command.Subtract(owner, correction)
}

// Further improvements will be made in DEX-467
addressWsMutableState = status match {
case OrderStatus.Accepted => addressWsMutableState.putOrderUpdate(remaining.id, WsOrder.fromDomain(remaining, status))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package com.wavesplatform.dex

import akka.actor.{Actor, ActorRef, Status}
import cats.instances.long.catsKernelStdGroupForLong
import cats.syntax.either._
import cats.syntax.group.catsSyntaxGroup
import com.wavesplatform.dex.domain.account.Address
import com.wavesplatform.dex.domain.asset.Asset
import com.wavesplatform.dex.domain.utils.ScorexLogging
import com.wavesplatform.dex.error.{MatcherError, WavesNodeConnectionBroken}
import com.wavesplatform.dex.fp.MapImplicits.group
import com.wavesplatform.dex.grpc.integration.exceptions.WavesNodeConnectionLostException

import scala.concurrent.Future
Expand Down Expand Up @@ -83,6 +80,7 @@ class SpendableBalancesActor(spendableBalances: (Address, Set[Asset]) => Future[
case None =>
val addressState = state ++ incompleteStateChanges.getOrElse(address, Map.empty)
fullState += address -> addressState
log.info(s"[$address] Full state is set up") // We don't log values, because there is too much data from the old accounts
incompleteStateChanges -= address
addressState
}
Expand All @@ -95,14 +93,13 @@ class SpendableBalancesActor(spendableBalances: (Address, Set[Asset]) => Future[
val knownBalance = addressFullState orElse incompleteStateChanges.get(address) getOrElse Map.empty
val (clean, forAudit) = if (knownBalance.isEmpty) (stateUpdate, stateUpdate) else getCleanAndForAuditChanges(stateUpdate, knownBalance)

if (addressFullState.isDefined) fullState = fullState.updated(address, knownBalance ++ clean)
else incompleteStateChanges = incompleteStateChanges.updated(address, knownBalance ++ clean)
if (addressFullState.isDefined) {
fullState = fullState.updated(address, knownBalance ++ clean)
log.info(s"[$address] Full state updates: $clean")
} else incompleteStateChanges = incompleteStateChanges.updated(address, knownBalance ++ clean)

addressDirectory ! AddressDirectory.Envelope(address, AddressActor.Message.BalanceChanged(clean.keySet, forAudit))
}

// Subtract is called when there is a web socket connection and thus we have `fullState` for this address
case SpendableBalancesActor.Command.Subtract(address, balance) => fullState = fullState.updated(address, fullState(address) |-| balance)
}

/**
Expand All @@ -123,7 +120,6 @@ object SpendableBalancesActor {
trait Command
object Command {
final case class SetState(address: Address, state: Map[Asset, Long]) extends Command
final case class Subtract(address: Address, balance: Map[Asset, Long]) extends Command
final case class UpdateStates(changes: Map[Address, Map[Asset, Long]]) extends Command
}

Expand Down
Loading

0 comments on commit 1a4a964

Please sign in to comment.