diff --git a/it/src/main/scala/com/wavesplatform/it/api/AsyncMatcherHttpApi.scala b/it/src/main/scala/com/wavesplatform/it/api/AsyncMatcherHttpApi.scala index a67642aab15..b6668c6fdf9 100644 --- a/it/src/main/scala/com/wavesplatform/it/api/AsyncMatcherHttpApi.scala +++ b/it/src/main/scala/com/wavesplatform/it/api/AsyncMatcherHttpApi.scala @@ -1,7 +1,7 @@ package com.wavesplatform.it.api import com.google.common.primitives.Longs -import com.wavesplatform.account.{PrivateKeyAccount, PublicKeyAccount} +import com.wavesplatform.account.PrivateKeyAccount import com.wavesplatform.crypto import com.wavesplatform.http.api_key import com.wavesplatform.it.Node @@ -10,7 +10,7 @@ import com.wavesplatform.matcher.api.CancelOrderRequest import com.wavesplatform.state.ByteStr import com.wavesplatform.transaction.assets.exchange.{AssetPair, Order, OrderType} import com.wavesplatform.utils.Base58 -import org.asynchttpclient.Dsl.{get => _get, delete => _delete} +import org.asynchttpclient.Dsl.{delete => _delete, get => _get} import org.asynchttpclient.util.HttpConstants import org.asynchttpclient.{RequestBuilder, Response} import org.scalatest.Assertions @@ -26,6 +26,18 @@ object AsyncMatcherHttpApi extends Assertions { val DefaultMatcherFee: Int = 300000 + def cancelRequest(sender: PrivateKeyAccount, orderId: String): CancelOrderRequest = { + val req = CancelOrderRequest(sender, Some(ByteStr.decodeBase58(orderId).get), None, Array.emptyByteArray) + val signature = crypto.sign(sender, req.toSign) + req.copy(signature = signature) + } + + def batchCancelRequest(sender: PrivateKeyAccount, timestamp: Long): CancelOrderRequest = { + val req = CancelOrderRequest(sender, None, Some(timestamp), Array.emptyByteArray) + val signature = crypto.sign(sender, req.toSign) + req.copy(signature = signature) + } + implicit class MatcherAsyncHttpApi(matcherNode: Node) extends NodeAsyncHttpApi(matcherNode) { def matcherGet(path: String, @@ -43,11 +55,14 @@ object AsyncMatcherHttpApi extends Assertions { .build() ) - def matcherGetWithSignature(path: String, ts: Long, signature: ByteStr, f: RequestBuilder => RequestBuilder = identity): Future[Response] = + def matcherGetWithSignature(path: String, + sender: PrivateKeyAccount, + timestamp: Long = System.currentTimeMillis(), + f: RequestBuilder => RequestBuilder = identity): Future[Response] = retrying { _get(s"${matcherNode.matcherApiEndpoint}$path") - .setHeader("Timestamp", ts) - .setHeader("Signature", signature) + .setHeader("Timestamp", timestamp) + .setHeader("Signature", Base58.encode(crypto.sign(sender, sender.publicKey ++ Longs.toByteArray(timestamp)))) .build() } @@ -67,113 +82,72 @@ object AsyncMatcherHttpApi extends Assertions { ) def orderStatus(orderId: String, assetPair: AssetPair, waitForStatus: Boolean = true): Future[MatcherStatusResponse] = { - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - matcherGet(s"/matcher/orderbook/$amountAsset/$priceAsset/$orderId", waitForStatus = waitForStatus) + matcherGet(s"/matcher/orderbook/${assetPair.toUri}/$orderId", waitForStatus = waitForStatus) .as[MatcherStatusResponse] } def transactionsByOrder(orderId: String): Future[Seq[ExchangeTransaction]] = matcherGet(s"/matcher/transactions/$orderId").as[Seq[ExchangeTransaction]] - def orderBook(assetPair: AssetPair): Future[OrderBookResponse] = { - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - matcherGet(s"/matcher/orderbook/$amountAsset/$priceAsset").as[OrderBookResponse] - } + def orderBook(assetPair: AssetPair): Future[OrderBookResponse] = + matcherGet(s"/matcher/orderbook/${assetPair.toUri}").as[OrderBookResponse] - def deleteOrderBook(assetPair: AssetPair): Future[OrderBookResponse] = { - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - retrying(_delete(s"${matcherNode.matcherApiEndpoint}/matcher/orderbook/$amountAsset/$priceAsset").withApiKey(matcherNode.apiKey).build()) + def deleteOrderBook(assetPair: AssetPair): Future[OrderBookResponse] = + retrying(_delete(s"${matcherNode.matcherApiEndpoint}/matcher/orderbook/${assetPair.toUri}").withApiKey(matcherNode.apiKey).build()) .as[OrderBookResponse] - } - def marketStatus(assetPair: AssetPair): Future[MarketStatusResponse] = { - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - matcherGet(s"/matcher/orderbook/$amountAsset/$priceAsset/status").as[MarketStatusResponse] - } + def marketStatus(assetPair: AssetPair): Future[MarketStatusResponse] = + matcherGet(s"/matcher/orderbook/${assetPair.toUri}/status").as[MarketStatusResponse] - def parseAssetPair(assetPair: AssetPair): (String, String) = { - val amountAsset = assetPair.amountAsset match { - case None => "WAVES" - case _ => assetPair.amountAsset.get.base58 - } - val priceAsset = assetPair.priceAsset match { - case None => "WAVES" - case _ => assetPair.priceAsset.get.base58 + def cancelOrder(sender: Node, assetPair: AssetPair, orderId: String): Future[MatcherStatusResponse] = + cancelOrder(sender.privateKey, assetPair, orderId) + + def cancelOrder(sender: PrivateKeyAccount, assetPair: AssetPair, orderId: String): Future[MatcherStatusResponse] = + matcherPost(s"/matcher/orderbook/${assetPair.toUri}/cancel", cancelRequest(sender, orderId)).as[MatcherStatusResponse] + + def expectCancelRejected(sender: PrivateKeyAccount, assetPair: AssetPair, orderId: String): Future[Unit] = { + val requestUri = s"/matcher/orderbook/${assetPair.toUri}/cancel" + matcherPost(requestUri, cancelRequest(sender, orderId)).transform { + case Failure(UnexpectedStatusCodeException(_, 400, body)) if (Json.parse(body) \ "status").as[String] == "OrderCancelRejected" => Success(()) + case Failure(cause) => Failure(cause) + case Success(resp) => Failure(UnexpectedStatusCodeException(requestUri, resp.getStatusCode, resp.getResponseBody)) } - (amountAsset, priceAsset) } - def cancelOrder(sender: Node, assetPair: AssetPair, orderId: Option[String], timestamp: Option[Long] = None): Future[MatcherStatusResponse] = - cancelOrder(sender.privateKey, assetPair, orderId, timestamp) - - def cancelOrder(sender: PrivateKeyAccount, - assetPair: AssetPair, - orderId: Option[String], - timestamp: Option[Long]): Future[MatcherStatusResponse] = { - val privateKey = sender.privateKey - val publicKey = PublicKeyAccount(sender.publicKey) - val request = CancelOrderRequest(publicKey, orderId.map(ByteStr.decodeBase58(_).get), timestamp, Array.emptyByteArray) - val sig = crypto.sign(privateKey, request.toSign) - val signedRequest = request.copy(signature = sig) - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - matcherPost(s"/matcher/orderbook/$amountAsset/$priceAsset/cancel", signedRequest).as[MatcherStatusResponse] - } + def cancelOrdersForPair(sender: Node, assetPair: AssetPair, timestamp: Long): Future[MatcherStatusResponse] = + matcherPost(s"/matcher/orderbook/${assetPair.toUri}/cancel", Json.toJson(batchCancelRequest(sender.privateKey, timestamp))) + .as[MatcherStatusResponse] - def cancelAllOrders(sender: Node, timestamp: Option[Long]): Future[MatcherStatusResponse] = { - val privateKey = sender.privateKey - val publicKey = sender.publicKey - val request = CancelOrderRequest(publicKey, None, timestamp, Array.emptyByteArray) - val sig = crypto.sign(privateKey, request.toSign) - val signedRequest = request.copy(signature = sig) - matcherPost(s"/matcher/orderbook/cancel", Json.toJson(signedRequest)).as[MatcherStatusResponse] - } + def cancelAllOrders(sender: Node, timestamp: Long): Future[MatcherStatusResponse] = + matcherPost(s"/matcher/orderbook/cancel", Json.toJson(batchCancelRequest(sender.privateKey, timestamp))).as[MatcherStatusResponse] - def cancelOrderWithApiKey(orderId: String): Future[MatcherStatusResponse] = { + def cancelOrderWithApiKey(orderId: String): Future[MatcherStatusResponse] = postWithAPiKey(s"/matcher/orders/cancel/$orderId").as[MatcherStatusResponse] - } - def fullOrdersHistory(sender: Node): Future[Seq[OrderbookHistory]] = { - val ts = System.currentTimeMillis() - matcherGetWithSignature(s"/matcher/orderbook/${sender.publicKeyStr}", ts, signature(sender, ts)).as[Seq[OrderbookHistory]] - } + def fullOrdersHistory(sender: Node): Future[Seq[OrderbookHistory]] = + matcherGetWithSignature(s"/matcher/orderbook/${sender.publicKeyStr}", sender.privateKey).as[Seq[OrderbookHistory]] def orderHistoryByPair(sender: Node, assetPair: AssetPair, activeOnly: Boolean = false): Future[Seq[OrderbookHistory]] = { - val ts = System.currentTimeMillis() - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - matcherGetWithSignature(s"/matcher/orderbook/$amountAsset/$priceAsset/publicKey/${sender.publicKeyStr}?activeOnly=$activeOnly", - ts, - signature(sender, ts)) + matcherGetWithSignature(s"/matcher/orderbook/${assetPair.toUri}/publicKey/${sender.publicKeyStr}?activeOnly=$activeOnly", sender.privateKey) .as[Seq[OrderbookHistory]] } def activeOrderHistory(sender: Node): Future[Seq[OrderbookHistory]] = { - val ts = System.currentTimeMillis() - matcherGetWithSignature(s"/matcher/orderbook/${sender.publicKeyStr}?activeOnly=true", ts, signature(sender, ts)).as[Seq[OrderbookHistory]] + matcherGetWithSignature(s"/matcher/orderbook/${sender.publicKeyStr}?activeOnly=true", sender.privateKey).as[Seq[OrderbookHistory]] } def reservedBalance(sender: Node): Future[Map[String, Long]] = reservedBalance(sender.privateKey) - def reservedBalance(sender: PrivateKeyAccount): Future[Map[String, Long]] = { - val ts = System.currentTimeMillis() - val privateKey = sender.privateKey - val publicKey = sender.publicKey - val signature = ByteStr(crypto.sign(privateKey, publicKey ++ Longs.toByteArray(ts))) + def reservedBalance(sender: PrivateKeyAccount): Future[Map[String, Long]] = + matcherGetWithSignature(s"/matcher/balance/reserved/${Base58.encode(sender.publicKey)}", sender).as[Map[String, Long]] - matcherGetWithSignature(s"/matcher/balance/reserved/${Base58.encode(sender.publicKey)}", ts, signature).as[Map[String, Long]] - } + def tradableBalance(sender: Node, assetPair: AssetPair): Future[Map[String, Long]] = + matcherGet(s"/matcher/orderbook/${assetPair.toUri}/tradableBalance/${sender.address}").as[Map[String, Long]] - def tradableBalance(sender: Node, assetPair: AssetPair): Future[Map[String, Long]] = { - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - matcherGet(s"/matcher/orderbook/$amountAsset/$priceAsset/tradableBalance/${sender.address}").as[Map[String, Long]] - } + def tradableBalance(sender: PrivateKeyAccount, assetPair: AssetPair): Future[Map[String, Long]] = + matcherGet(s"/matcher/orderbook/${assetPair.toUri}/tradableBalance/${sender.address}").as[Map[String, Long]] - def tradableBalance(sender: PrivateKeyAccount, assetPair: AssetPair): Future[Map[String, Long]] = { - val (amountAsset, priceAsset) = parseAssetPair(assetPair) - matcherGet(s"/matcher/orderbook/$amountAsset/$priceAsset/tradableBalance/${sender.address}").as[Map[String, Long]] - } - - def tradingMarkets(): Future[MarketDataInfo] = - matcherGet(s"/matcher/orderbook").as[MarketDataInfo] + def tradingMarkets(): Future[MarketDataInfo] = matcherGet(s"/matcher/orderbook").as[MarketDataInfo] def waitOrderStatus(assetPair: AssetPair, orderId: String, @@ -242,17 +216,13 @@ object AsyncMatcherHttpApi extends Assertions { def ordersByAddress(senderAddress: String, activeOnly: Boolean): Future[Seq[OrderbookHistory]] = matcherGetWithApiKey(s"/matcher/orders/$senderAddress?activeOnly=$activeOnly").as[Seq[OrderbookHistory]] - - private def signature(node: Node, timestamp: Long) = { - val privateKey = node.privateKey - val publicKey = node.publicKey.publicKey - ByteStr(crypto.sign(privateKey, publicKey ++ Longs.toByteArray(timestamp))) - } - } implicit class RequestBuilderOps(self: RequestBuilder) { def withApiKey(x: String): RequestBuilder = self.setHeader(api_key.name, x) } + implicit class AssetPairExt(val p: AssetPair) extends AnyVal { + def toUri: String = s"${AssetPair.assetIdStr(p.amountAsset)}/${AssetPair.assetIdStr(p.priceAsset)}" + } } diff --git a/it/src/main/scala/com/wavesplatform/it/api/SyncMatcherHttpApi.scala b/it/src/main/scala/com/wavesplatform/it/api/SyncMatcherHttpApi.scala index 70c3ce69505..6a083b444ba 100644 --- a/it/src/main/scala/com/wavesplatform/it/api/SyncMatcherHttpApi.scala +++ b/it/src/main/scala/com/wavesplatform/it/api/SyncMatcherHttpApi.scala @@ -6,18 +6,17 @@ import com.wavesplatform.it.Node import com.wavesplatform.transaction.assets.exchange.{AssetPair, Order, OrderType} import org.asynchttpclient.util.HttpConstants import org.asynchttpclient.{RequestBuilder, Response} -import org.scalatest.{Assertions, Matchers} import play.api.libs.json.{Format, Json, Writes} import scala.concurrent.Await import scala.concurrent.duration._ -object SyncMatcherHttpApi extends Assertions { +object SyncMatcherHttpApi { case class ErrorMessage(error: Int, message: String) implicit val errorMessageFormat: Format[ErrorMessage] = Json.format - implicit class MatcherNodeExtSync(m: Node) extends Matchers { + implicit class MatcherNodeExtSync(m: Node) { import com.wavesplatform.it.api.AsyncMatcherHttpApi.{MatcherAsyncHttpApi => async} @@ -93,21 +92,24 @@ object SyncMatcherHttpApi extends Assertions { waitTime: Duration = OrderRequestAwaitTime): Boolean = Await.result(async(m).expectIncorrectOrderPlacement(order, expectedStatusCode, expectedStatus), waitTime) - def cancelOrder(sender: Node, - assetPair: AssetPair, - orderId: Option[String], - timestamp: Option[Long] = None, - waitTime: Duration = OrderRequestAwaitTime): MatcherStatusResponse = - cancelOrder(sender.privateKey, assetPair, orderId, timestamp, waitTime) - - def cancelOrder(sender: PrivateKeyAccount, - assetPair: AssetPair, - orderId: Option[String], - timestamp: Option[Long], - waitTime: Duration): MatcherStatusResponse = - Await.result(async(m).cancelOrder(sender, assetPair, orderId, timestamp), waitTime) - - def cancelAllOrders(sender: Node, timestamp: Option[Long] = None, waitTime: Duration = OrderRequestAwaitTime): MatcherStatusResponse = + def expectCancelRejected(sender: PrivateKeyAccount, assetPair: AssetPair, orderId: String, waitTime: Duration = OrderRequestAwaitTime): Unit = + Await.result(async(m).expectCancelRejected(sender, assetPair, orderId), waitTime) + + def cancelOrder(sender: Node, assetPair: AssetPair, orderId: String, waitTime: Duration = OrderRequestAwaitTime): MatcherStatusResponse = + cancelOrder(sender.privateKey, assetPair, orderId, waitTime) + + def cancelOrder(sender: PrivateKeyAccount, assetPair: AssetPair, orderId: String, waitTime: Duration): MatcherStatusResponse = + Await.result(async(m).cancelOrder(sender, assetPair, orderId), waitTime) + + def cancelOrdersForPair(sender: Node, + assetPair: AssetPair, + timestamp: Long = System.currentTimeMillis(), + waitTime: Duration = OrderRequestAwaitTime): MatcherStatusResponse = + Await.result(async(m).cancelOrdersForPair(sender, assetPair, timestamp), waitTime) + + def cancelAllOrders(sender: Node, + timestamp: Long = System.currentTimeMillis(), + waitTime: Duration = OrderRequestAwaitTime): MatcherStatusResponse = Await.result(async(m).cancelAllOrders(sender, timestamp), waitTime) def cancelOrderWithApiKey(orderId: String, waitTime: Duration = OrderRequestAwaitTime): MatcherStatusResponse = diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/CancelOrderTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/CancelOrderTestSuite.scala index 73d5d36f5b2..64c314c6034 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/CancelOrderTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/CancelOrderTestSuite.scala @@ -1,19 +1,16 @@ package com.wavesplatform.it.sync.matcher -import com.typesafe.config.{Config} +import com.typesafe.config.Config import com.wavesplatform.it.ReportingTestName import com.wavesplatform.it.api.SyncHttpApi._ import com.wavesplatform.it.api.SyncMatcherHttpApi._ +import com.wavesplatform.it.sync.matcher.config.MatcherPriceAssetConfig._ import com.wavesplatform.it.transactions.NodesFromDocker import com.wavesplatform.it.util._ -import com.wavesplatform.transaction.assets.exchange.OrderType.BUY -import com.wavesplatform.transaction.assets.exchange.{Order, OrderType} +import com.wavesplatform.transaction.assets.exchange.{AssetPair, OrderType} import org.scalatest.{BeforeAndAfterAll, CancelAfterFailure, FreeSpec, Matchers} import scala.concurrent.duration._ -import scala.math.BigDecimal.RoundingMode -import scala.util.{Try} -import com.wavesplatform.it.sync.matcher.config.MatcherPriceAssetConfig._ class CancelOrderTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll with CancelAfterFailure with NodesFromDocker with ReportingTestName { @@ -21,64 +18,85 @@ class CancelOrderTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll private def matcherNode = nodes.head - private def aliceNode = nodes(1) - private def bobNode = nodes(2) - matcherNode.signedIssue(createSignedIssueRequest(IssueUsdTx)) - matcherNode.signedIssue(createSignedIssueRequest(IssueWctTx)) - nodes.waitForHeightArise() + private val wavesBtcPair = AssetPair(None, Some(BtcId)) + + override protected def beforeAll(): Unit = { + super.beforeAll() + matcherNode.signedIssue(createSignedIssueRequest(IssueUsdTx)) + matcherNode.signedIssue(createSignedIssueRequest(IssueBtcTx)) + nodes.waitForHeightArise() + } + + "Order can be canceled" - { + "by sender" in { + val orderId = matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves, 800).message.id + matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Accepted", 1.minute) + matcherNode.cancelOrder(bobNode, wavesUsdPair, orderId) + matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Cancelled", 1.minute) + matcherNode.orderHistoryByPair(bobNode, wavesUsdPair).collectFirst { + case o if o.id == orderId => o.status shouldEqual "Cancelled" + } + } + "with API key" in { + val orderId = matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves, 800).message.id + matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Accepted", 1.minute) + + matcherNode.cancelOrderWithApiKey(orderId) + matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Cancelled", 1.minute) + + matcherNode.fullOrderHistory(bobNode).filter(_.id == orderId).head.status shouldBe "Cancelled" + matcherNode.orderHistoryByPair(bobNode, wavesUsdPair).filter(_.id == orderId).head.status shouldBe "Cancelled" - "cancel order using api-key" in { - val orderId = matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves, 800).message.id - matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Accepted", 1.minute) + val orderBook = matcherNode.orderBook(wavesUsdPair) + orderBook.bids shouldBe empty + orderBook.asks shouldBe empty + } + } - matcherNode.cancelOrderWithApiKey(orderId) - matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Cancelled", 1.minute) + "Cancel is rejected" - { + "when request sender is not the sender of and order" in { + val orderId = matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves, 800).message.id + matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Accepted", 1.minute) - matcherNode.fullOrderHistory(bobNode).filter(_.id == orderId).head.status shouldBe "Cancelled" - matcherNode.orderHistoryByPair(bobNode, wavesUsdPair).filter(_.id == orderId).head.status shouldBe "Cancelled" - matcherNode.orderBook(wavesUsdPair).bids shouldBe empty - matcherNode.orderBook(wavesUsdPair).asks shouldBe empty + matcherNode.expectCancelRejected(matcherNode.privateKey, wavesUsdPair, orderId) + } } - "Alice and Bob trade WAVES-USD" - { - "place usd-waves order" in { - // Alice wants to sell USD for Waves - val orderId1 = matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves, 800).message.id - val orderId2 = matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves, 700).message.id - val bobSellOrder3 = matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves, 600).message.id + "Batch cancel" - { + "works for" - { + "all orders placed by an address" in { + val usdOrderIds = 1 to 5 map { i => + matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves + i, 400).message.id + } - matcherNode.fullOrderHistory(aliceNode) - matcherNode.fullOrderHistory(bobNode) + val btcOrderIds = 1 to 5 map { i => + matcherNode.placeOrder(bobNode, wavesBtcPair, OrderType.BUY, 100.waves + i, 400).message.id + } - matcherNode.waitOrderStatus(wavesUsdPair, bobSellOrder3, "Accepted", 1.minute) + (usdOrderIds ++ btcOrderIds).foreach(id => matcherNode.waitOrderStatus(wavesUsdPair, id, "Accepted")) - val aliceOrder = matcherNode.prepareOrder(aliceNode, wavesUsdPair, OrderType.BUY, 800, 0.00125.waves) - matcherNode.placeOrder(aliceOrder).message.id + matcherNode.cancelAllOrders(bobNode) - Thread.sleep(2000) - matcherNode.fullOrderHistory(aliceNode) - val orders = matcherNode.fullOrderHistory(bobNode) - for (orderId <- Seq(orderId1, orderId2)) { - orders.filter(_.id == orderId).head.status shouldBe "Accepted" + (usdOrderIds ++ btcOrderIds).foreach(id => matcherNode.waitOrderStatus(wavesUsdPair, id, "Cancelled")) } - } + "a pair" in { + val usdOrderIds = 1 to 5 map { i => + matcherNode.placeOrder(bobNode, wavesUsdPair, OrderType.SELL, 100.waves + i, 400).message.id + } - } + val btcOrderIds = 1 to 5 map { i => + matcherNode.placeOrder(bobNode, wavesBtcPair, OrderType.BUY, 100.waves + i, 400).message.id + } - def correctAmount(a: Long, price: Long): Long = { - val min = (BigDecimal(Order.PriceConstant) / price).setScale(0, RoundingMode.CEILING) - if (min > 0) - Try(((BigDecimal(a) / min).toBigInt() * min.toBigInt()).bigInteger.longValueExact()).getOrElse(Long.MaxValue) - else - a - } + (usdOrderIds ++ btcOrderIds).foreach(id => matcherNode.waitOrderStatus(wavesUsdPair, id, "Accepted")) - def receiveAmount(ot: OrderType, matchAmount: Long, matchPrice: Long): Long = - if (ot == BUY) correctAmount(matchAmount, matchPrice) - else { - (BigInt(matchAmount) * matchPrice / Order.PriceConstant).bigInteger.longValueExact() - } + matcherNode.cancelOrdersForPair(bobNode, wavesBtcPair) + btcOrderIds.foreach(id => matcherNode.waitOrderStatus(wavesUsdPair, id, "Cancelled")) + usdOrderIds.foreach(id => matcherNode.waitOrderStatus(wavesUsdPair, id, "Accepted")) + } + } + } } diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherRestartTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherRestartTestSuite.scala index 742c698c258..c1670ef4bf5 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherRestartTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherRestartTestSuite.scala @@ -1,8 +1,10 @@ package com.wavesplatform.it.sync.matcher -import com.typesafe.config.{Config} +import com.typesafe.config.Config import com.wavesplatform.it.api.SyncHttpApi._ import com.wavesplatform.it.api.SyncMatcherHttpApi._ +import com.wavesplatform.it.sync._ +import com.wavesplatform.it.sync.matcher.config.MatcherDefaultConfig._ import com.wavesplatform.it.transactions.NodesFromDocker import com.wavesplatform.it.util._ import com.wavesplatform.it.{TransferSending, _} @@ -12,8 +14,6 @@ import org.scalatest.concurrent.Eventually import org.scalatest.{BeforeAndAfterAll, CancelAfterFailure, FreeSpec, Matchers} import scala.concurrent.duration._ -import com.wavesplatform.it.sync._ -import com.wavesplatform.it.sync.matcher.config.MatcherDefaultConfig._ class MatcherRestartTestSuite extends FreeSpec @@ -84,7 +84,7 @@ class MatcherRestartTestSuite orders2.asks.head.price shouldBe 2.waves * Order.PriceConstant } - val cancel = matcherNode.cancelOrder(aliceNode, aliceWavesPair, Some(firstOrder)) + val cancel = matcherNode.cancelOrder(aliceNode, aliceWavesPair, firstOrder) cancel.status should be("OrderCanceled") eventually { diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherTestSuite.scala index 20328fdbac0..b0359514091 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/MatcherTestSuite.scala @@ -188,7 +188,7 @@ class MatcherTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll wit "order could be canceled and resubmitted again" in { // Alice cancels the very first order (100 left) - val status1 = matcherNode.cancelOrder(aliceNode, aliceWavesPair, Some(order1Response.message.id)) + val status1 = matcherNode.cancelOrder(aliceNode, aliceWavesPair, order1Response.message.id) status1.status should be("OrderCanceled") // Alice checks that the order book is empty @@ -315,81 +315,11 @@ class MatcherTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll wit // Cleanup nodes.waitForHeightArise() - matcherNode.cancelOrder(bobNode, bobWavesPair, Some(order8.message.id)).status should be("OrderCanceled") + matcherNode.cancelOrder(bobNode, bobWavesPair, order8.message.id).status should be("OrderCanceled") val transferBobId = aliceNode.transfer(aliceNode.address, bobNode.address, transferAmount, TransactionFee, None, None).id nodes.waitForHeightAriseAndTxPresent(transferBobId) } } - - "batch cancel" - { - val ordersNum = 5 - def fileOrders(n: Int, pair: AssetPair): Seq[String] = 0 until n map { _ => - val o = matcherNode.placeOrder(matcherNode.prepareOrder(aliceNode, pair, BUY, 100, 1.waves)) - o.status should be("OrderAccepted") - o.message.id - } - - val asset2 = - aliceNode.issue(aliceNode.address, "AliceCoin2", "AliceCoin for matcher's tests", someAssetAmount, 0, reissuable = false, 100000000L).id - nodes.waitForHeightAriseAndTxPresent(asset2) - val aliceWavesPair2 = AssetPair(ByteStr.decodeBase58(asset2).toOption, None) - - "canceling an order doesn't affect other orders for the same pair" in { - val orders = fileOrders(ordersNum, aliceWavesPair) - val (orderToCancel, ordersToRetain) = (orders.head, orders.tail) - - val cancel = matcherNode.cancelOrder(aliceNode, aliceWavesPair, Some(orderToCancel)) - cancel.status should be("OrderCanceled") - - ordersToRetain foreach { - matcherNode.waitOrderStatus(aliceWavesPair, _, "Accepted") - } - } - - "cancel orders by pair" ignore { - val ordersToCancel = fileOrders(orderLimit + ordersNum, aliceWavesPair) - val ordersToRetain = fileOrders(ordersNum, aliceWavesPair2) - val ts = Some(System.currentTimeMillis) - - val cancel = matcherNode.cancelOrder(aliceNode, aliceWavesPair, None, ts) - cancel.status should be("Cancelled") - - ordersToCancel foreach { - matcherNode.waitOrderStatus(aliceWavesPair, _, "Cancelled") - } - ordersToRetain foreach { - matcherNode.waitOrderStatus(aliceWavesPair2, _, "Accepted") - } - - // signed timestamp is mandatory - assertBadRequestAndMessage(matcherNode.cancelOrder(aliceNode, aliceWavesPair, None, None), "invalid signature") - - // timestamp reuse shouldn't be allowed - assertBadRequest(matcherNode.cancelOrder(aliceNode, aliceWavesPair, None, ts)) - } - - "cancel all orders" ignore { - val orders1 = fileOrders(orderLimit + ordersNum, aliceWavesPair) - val orders2 = fileOrders(orderLimit + ordersNum, aliceWavesPair2) - val ts = Some(System.currentTimeMillis) - - val cancel = matcherNode.cancelAllOrders(aliceNode, ts) - cancel.status should be("Cancelled") - - orders1 foreach { - matcherNode.waitOrderStatus(aliceWavesPair, _, "Cancelled") - } - orders2 foreach { - matcherNode.waitOrderStatus(aliceWavesPair2, _, "Cancelled") - } - - // signed timestamp is mandatory - assertBadRequestAndMessage(matcherNode.cancelAllOrders(aliceNode, None), "invalid signature") - - // timestamp reuse shouldn't be allowed - assertBadRequest(matcherNode.cancelAllOrders(aliceNode, ts)) - } - } } } diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/RestOrderLimitTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/RestOrderLimitTestSuite.scala index 6e64bd45e02..d8fc20ab07d 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/RestOrderLimitTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/RestOrderLimitTestSuite.scala @@ -1,18 +1,18 @@ package com.wavesplatform.it.sync.matcher -import com.typesafe.config.ConfigFactory.parseString import com.typesafe.config.Config -import com.wavesplatform.it.{Node, ReportingTestName} +import com.typesafe.config.ConfigFactory.parseString import com.wavesplatform.it.api.SyncHttpApi._ import com.wavesplatform.it.api.SyncMatcherHttpApi._ import com.wavesplatform.it.sync.matcher.config.MatcherDefaultConfig._ import com.wavesplatform.it.sync.someAssetAmount import com.wavesplatform.it.transactions.NodesFromDocker +import com.wavesplatform.it.util._ +import com.wavesplatform.it.{Node, ReportingTestName} import com.wavesplatform.state.ByteStr.decodeBase58 import com.wavesplatform.transaction.assets.exchange.AssetPair import com.wavesplatform.transaction.assets.exchange.OrderType.{BUY, SELL} import org.scalatest.{BeforeAndAfterAll, CancelAfterFailure, FreeSpec, Matchers} -import com.wavesplatform.it.util._ import scala.concurrent.duration._ @@ -82,8 +82,8 @@ class RestOrderLimitTestSuite matcher.placeOrder(bob, bobPair, SELL, 1, 4.waves).message.id // fill filled2 matcher.placeOrder(bob, bobPair, SELL, 1, 3.waves).message.id // part fill partial2 - matcher.cancelOrder(alice, alicePair, Some(cancelled1)) - matcher.cancelOrder(alice, alicePair, Some(cancelled2)) + matcher.cancelOrder(alice, alicePair, cancelled1) + matcher.cancelOrder(alice, alicePair, cancelled2) matcher.waitOrderStatus(bobPair, cancelled2, "Cancelled", 1.minutes) val activeOrdersAllFive = Seq(partial2, active2, partial1, active1, active0) diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/RoundingIssuesTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/RoundingIssuesTestSuite.scala index 04640c6a9c4..28db7fae563 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/RoundingIssuesTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/RoundingIssuesTestSuite.scala @@ -7,7 +7,7 @@ import com.wavesplatform.it.api.SyncHttpApi._ import com.wavesplatform.it.api.SyncMatcherHttpApi._ import com.wavesplatform.it.sync.matcher.config.MatcherPriceAssetConfig._ import com.wavesplatform.it.transactions.NodesFromDocker -import com.wavesplatform.transaction.assets.exchange.{OrderType} +import com.wavesplatform.transaction.assets.exchange.OrderType import org.scalatest.{BeforeAndAfterAll, CancelAfterFailure, FreeSpec, Matchers} import scala.concurrent.duration._ @@ -45,7 +45,7 @@ class RoundingIssuesTestSuite matcherNode.waitOrderStatusAndAmount(wavesUsdPair, submittedId, "Filled", Some(filledAmount), 1.minute) matcherNode.waitOrderStatusAndAmount(wavesUsdPair, counterId, "PartiallyFilled", Some(filledAmount), 1.minute) - matcherNode.cancelOrder(aliceNode, wavesUsdPair, Some(counterId)) + matcherNode.cancelOrder(aliceNode, wavesUsdPair, counterId) val tx = matcherNode.transactionsByOrder(counterId).head matcherNode.waitForTransaction(tx.id) @@ -76,7 +76,7 @@ class RoundingIssuesTestSuite withClue("Alice's reserved balance before cancel")(matcherNode.reservedBalance(aliceNode) shouldBe empty) - matcherNode.cancelOrder(bobNode, ethBtcPair, Some(counterId)) + matcherNode.cancelOrder(bobNode, ethBtcPair, counterId) val tx = matcherNode.transactionsByOrder(counterId).head matcherNode.waitForTransaction(tx.id) diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/SeveralPartialOrdersTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/SeveralPartialOrdersTestSuite.scala index 06cfbdf7f1b..77a2f12581f 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/SeveralPartialOrdersTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/SeveralPartialOrdersTestSuite.scala @@ -77,7 +77,7 @@ class SeveralPartialOrdersTestSuite orderBook2.asks shouldBe List(LevelResponse(bobOrder2.amount, bobOrder2.price)) orderBook2.bids shouldBe empty - matcherNode.cancelOrder(bobNode, wavesUsdPair, Some(bobOrder2Id)) + matcherNode.cancelOrder(bobNode, wavesUsdPair, bobOrder2Id) nodes.waitForHeightArise() matcherNode.reservedBalance(bobNode) shouldBe empty diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradeBalanceAndRoundingTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradeBalanceAndRoundingTestSuite.scala index 1826960b50b..fff0350a633 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradeBalanceAndRoundingTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradeBalanceAndRoundingTestSuite.scala @@ -125,7 +125,7 @@ class TradeBalanceAndRoundingTestSuite val orderId = matcherNode.fullOrderHistory(bobNode).head.id matcherNode.fullOrderHistory(bobNode).size should be(1) - matcherNode.cancelOrder(bobNode, wavesUsdPair, Some(orderId)) + matcherNode.cancelOrder(bobNode, wavesUsdPair, orderId) matcherNode.waitOrderStatus(wavesUsdPair, orderId, "Cancelled", 1.minute) matcherNode.tradableBalance(bobNode, wavesUsdPair)("WAVES") shouldBe bobNode.accountBalances(bobNode.address)._1 } @@ -160,7 +160,7 @@ class TradeBalanceAndRoundingTestSuite // Each side get fair amount of assets val exchangeTx = matcherNode.transactionsByOrder(aliceOrder.id().base58).headOption.getOrElse(fail("Expected an exchange transaction")) nodes.waitForHeightAriseAndTxPresent(exchangeTx.id) - matcherNode.cancelOrder(bobNode, wavesUsdPair, Some(bobOrder1Id)) + matcherNode.cancelOrder(bobNode, wavesUsdPair, bobOrder1Id) } } @@ -179,7 +179,7 @@ class TradeBalanceAndRoundingTestSuite val exchangeTx = matcherNode.transactionsByOrder(aliceOrderId).headOption.getOrElse(fail("Expected an exchange transaction")) nodes.waitForHeightAriseAndTxPresent(exchangeTx.id) - matcherNode.cancelOrder(bobNode, wctUsdPair, Some(bobOrderId)) + matcherNode.cancelOrder(bobNode, wctUsdPair, bobOrderId) matcherNode.waitOrderStatus(wctUsdPair, bobOrderId, "Cancelled", 1.minute) @@ -224,7 +224,7 @@ class TradeBalanceAndRoundingTestSuite val expectedReservedWaves = matcherFee - LimitOrder.getPartialFee(matcherFee, wctUsdSellAmount, executedAmount) matcherNode.reservedBalance(bobNode)("WAVES") shouldBe expectedReservedWaves - matcherNode.cancelOrder(bobNode, wctUsdPair, Some(matcherNode.fullOrderHistory(bobNode).head.id)) + matcherNode.cancelOrder(bobNode, wctUsdPair, matcherNode.fullOrderHistory(bobNode).head.id) } "reserved balance is empty after the total execution" in { @@ -267,7 +267,7 @@ class TradeBalanceAndRoundingTestSuite matcherNode.waitOrderStatus(wctWavesPair, bobOrderId, "Accepted", 1.minute) matcherNode.tradableBalance(bobNode, wctWavesPair)("WAVES") shouldBe matcherFee / 2 + receiveAmount(SELL, wctWavesSellAmount, wctWavesPrice) - matcherFee - matcherNode.cancelOrder(bobNode, wctWavesPair, Some(bobOrderId)) + matcherNode.cancelOrder(bobNode, wctWavesPair, bobOrderId) assertBadRequestAndResponse(matcherNode.placeOrder(bobNode, wctWavesPair, SELL, wctWavesSellAmount / 2, wctWavesPrice), "Not enough tradable balance") @@ -295,7 +295,7 @@ class TradeBalanceAndRoundingTestSuite nodes.waitForHeightAriseAndTxPresent(exchangeTx.id) matcherNode.reservedBalance(bobNode) shouldBe empty - matcherNode.cancelOrder(aliceNode, ethWavesPair, Some(counterId2)) + matcherNode.cancelOrder(aliceNode, ethWavesPair, counterId2) } } @@ -319,7 +319,7 @@ class TradeBalanceAndRoundingTestSuite val order = aliceOrders.find(_.id == aliceOrderId).getOrElse(throw new IllegalStateException(s"Alice should have the $aliceOrderId order")) order.status shouldBe "Filled" - matcherNode.cancelOrder(matcherNode, wavesUsdPair, Some(bobOrderId)) + matcherNode.cancelOrder(matcherNode, wavesUsdPair, bobOrderId) } def correctAmount(a: Long, price: Long): Long = { diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradersTestSuite.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradersTestSuite.scala index 533f6428d33..bf17a32d912 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradersTestSuite.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/TradersTestSuite.scala @@ -87,7 +87,7 @@ class TradersTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll wit // Cleanup nodes.waitForHeightArise() - matcherNode.cancelOrder(bobNode, twoAssetsPair, Some(newestOrderId)).status should be("OrderCanceled") + matcherNode.cancelOrder(bobNode, twoAssetsPair, newestOrderId).status should be("OrderCanceled") val transferBackId = aliceNode.transfer(aliceNode.address, bobNode.address, 3050, TransactionFee, Some(bobNewAsset), None).id nodes.waitForHeightAriseAndTxPresent(transferBackId) @@ -112,7 +112,7 @@ class TradersTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll wit // Cleanup nodes.waitForHeightArise() - matcherNode.cancelOrder(bobNode, twoAssetsPair, Some(newestOrderId)).status should be("OrderCanceled") + matcherNode.cancelOrder(bobNode, twoAssetsPair, newestOrderId).status should be("OrderCanceled") val cancelLeaseId = bobNode.cancelLease(bobNode.address, leaseId, TransactionFee).id nodes.waitForHeightAriseAndTxPresent(cancelLeaseId) } @@ -136,7 +136,7 @@ class TradersTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll wit // Cleanup nodes.waitForHeightArise() - matcherNode.cancelOrder(bobNode, twoAssetsPair, Some(newestOrderId)).status should be("OrderCanceled") + matcherNode.cancelOrder(bobNode, twoAssetsPair, newestOrderId).status should be("OrderCanceled") val transferBackId = aliceNode.transfer(aliceNode.address, bobNode.address, transferAmount, TransactionFee, None, None).id nodes.waitForHeightAriseAndTxPresent(transferBackId) } @@ -164,7 +164,7 @@ class TradersTestSuite extends FreeSpec with Matchers with BeforeAndAfterAll wit // Cleanup nodes.waitForHeightArise() - matcherNode.cancelOrder(bobNode, bobWavesPair, Some(newestOrderId)).status should be("OrderCanceled") + matcherNode.cancelOrder(bobNode, bobWavesPair, newestOrderId).status should be("OrderCanceled") val cancelLeaseId = bobNode.cancelLease(bobNode.address, leaseId, TransactionFee).id nodes.waitForHeightAriseAndTxPresent(cancelLeaseId) } diff --git a/it/src/test/scala/com/wavesplatform/it/sync/matcher/config/MatcherPriceAssetConfig.scala b/it/src/test/scala/com/wavesplatform/it/sync/matcher/config/MatcherPriceAssetConfig.scala index bc6c73e0e95..28715f7da3a 100644 --- a/it/src/test/scala/com/wavesplatform/it/sync/matcher/config/MatcherPriceAssetConfig.scala +++ b/it/src/test/scala/com/wavesplatform/it/sync/matcher/config/MatcherPriceAssetConfig.scala @@ -124,12 +124,10 @@ object MatcherPriceAssetConfig { val orderLimit = 10 - private val updatedMatcherConfig = parseString(s""" - |waves.matcher { + private val updatedMatcherConfig = parseString(s"""waves.matcher { | price-assets = [ "$UsdId", "$BtcId", "WAVES" ] - | rest-order-limit=$orderLimit - |} - """.stripMargin) + | rest-order-limit = $orderLimit + |}""".stripMargin) val Configs: Seq[Config] = _Configs.map(updatedMatcherConfig.withFallback(_)) diff --git a/src/main/scala/com/wavesplatform/matcher/Matcher.scala b/src/main/scala/com/wavesplatform/matcher/Matcher.scala index 9dd86eb5fa3..d54887c11f3 100644 --- a/src/main/scala/com/wavesplatform/matcher/Matcher.scala +++ b/src/main/scala/com/wavesplatform/matcher/Matcher.scala @@ -67,7 +67,8 @@ class Matcher(actorSystem: ActorSystem, p => Option(marketStatuses.get(p)), orderBooksSnapshotCache, settings, - db + db, + NTP ) ) diff --git a/src/main/scala/com/wavesplatform/matcher/MatcherKeys.scala b/src/main/scala/com/wavesplatform/matcher/MatcherKeys.scala index 8703261bd49..bbaa3bf4a8b 100644 --- a/src/main/scala/com/wavesplatform/matcher/MatcherKeys.scala +++ b/src/main/scala/com/wavesplatform/matcher/MatcherKeys.scala @@ -88,5 +88,5 @@ object MatcherKeys { def finalizedPair(address: Address, pair: AssetPair, seqNr: Int): Key[Option[Order.Id]] = Key.opt(bytes(16, address.bytes.arr ++ pair.bytes ++ Ints.toByteArray(seqNr)), ByteStr(_), _.arr) - def lastOrderTimestamp(address: Address): Key[Option[Long]] = Key.opt(bytes(17, address.bytes.arr), Longs.fromByteArray, Longs.toByteArray) + def lastCommandTimestamp(address: Address): Key[Option[Long]] = Key.opt(bytes(17, address.bytes.arr), Longs.fromByteArray, Longs.toByteArray) } diff --git a/src/main/scala/com/wavesplatform/matcher/api/BatchCancel.scala b/src/main/scala/com/wavesplatform/matcher/api/BatchCancel.scala new file mode 100644 index 00000000000..b4ceefe34fd --- /dev/null +++ b/src/main/scala/com/wavesplatform/matcher/api/BatchCancel.scala @@ -0,0 +1,5 @@ +package com.wavesplatform.matcher.api +import com.wavesplatform.account.Address +import com.wavesplatform.transaction.assets.exchange.AssetPair + +case class BatchCancel(address: Address, assetPair: Option[AssetPair], timestamp: Long) diff --git a/src/main/scala/com/wavesplatform/matcher/api/DBUtils.scala b/src/main/scala/com/wavesplatform/matcher/api/DBUtils.scala index a827f3141c9..68d18f39a08 100644 --- a/src/main/scala/com/wavesplatform/matcher/api/DBUtils.scala +++ b/src/main/scala/com/wavesplatform/matcher/api/DBUtils.scala @@ -121,5 +121,5 @@ object DBUtils { key.parse(db.get(key.keyBytes)).getOrElse(0) } - def lastOrderTimestamp(db: DB, address: Address): Option[Long] = db.get(MatcherKeys.lastOrderTimestamp(address)) + def lastOrderTimestamp(db: DB, address: Address): Option[Long] = db.get(MatcherKeys.lastCommandTimestamp(address)) } diff --git a/src/main/scala/com/wavesplatform/matcher/api/MatcherApiRoute.scala b/src/main/scala/com/wavesplatform/matcher/api/MatcherApiRoute.scala index 30fc0ba92ff..c48f72af7e2 100644 --- a/src/main/scala/com/wavesplatform/matcher/api/MatcherApiRoute.scala +++ b/src/main/scala/com/wavesplatform/matcher/api/MatcherApiRoute.scala @@ -1,5 +1,7 @@ package com.wavesplatform.matcher.api +import java.util.concurrent.Executors + import akka.actor.ActorRef import akka.http.scaladsl.marshalling.ToResponseMarshallable import akka.http.scaladsl.model.StatusCodes @@ -21,7 +23,8 @@ import com.wavesplatform.state.ByteStr import com.wavesplatform.transaction.AssetAcc import com.wavesplatform.transaction.assets.exchange.OrderJson._ import com.wavesplatform.transaction.assets.exchange.{AssetPair, Order} -import com.wavesplatform.utils.{Base58, NTP, ScorexLogging} +import com.wavesplatform.utils.{Base58, NTP, ScorexLogging, Time} +import io.netty.util.concurrent.DefaultThreadFactory import io.swagger.annotations._ import javax.ws.rs.Path import kamon.Kamon @@ -29,8 +32,8 @@ import org.iq80.leveldb.DB import play.api.libs.json._ import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @Path("/matcher") @@ -43,7 +46,8 @@ case class MatcherApiRoute(assetPairBuilder: AssetPairBuilder, getMarketStatus: AssetPair => Option[MarketStatus], orderBookSnapshot: OrderBookSnapshotHttpCache, wavesSettings: WavesSettings, - db: DB) + db: DB, + time: Time) extends ApiRoute with ScorexLogging { @@ -51,18 +55,20 @@ case class MatcherApiRoute(assetPairBuilder: AssetPairBuilder, import PathMatchers._ import wavesSettings._ - override val settings = wavesSettings.restAPISettings + override val settings = restAPISettings private val timer = Kamon.timer("matcher.api-requests") private val placeTimer = timer.refine("action" -> "place") private val cancelTimer = timer.refine("action" -> "cancel") private val openVolumeTimer = timer.refine("action" -> "open-volume") + private val batchCancelExecutor = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new DefaultThreadFactory("batch-cancel", true))) + override lazy val route: Route = pathPrefix("matcher") { matcherPublicKey ~ getOrderBook ~ marketStatus ~ place ~ getAssetPairAndPublicKeyOrderHistory ~ getPublicKeyOrderHistory ~ getAllOrderHistory ~ getTradableBalance ~ reservedBalance ~ orderStatus ~ - historyDelete ~ cancel ~ orderbooks ~ orderBookDelete ~ getTransactionsByOrder ~ forceCancelOrder ~ + historyDelete ~ cancel ~ cancelAll ~ orderbooks ~ orderBookDelete ~ getTransactionsByOrder ~ forceCancelOrder ~ getSettings } @@ -163,6 +169,19 @@ case class MatcherApiRoute(assetPairBuilder: AssetPairBuilder, } } + private def doCancel(order: Order): Future[MatcherResponse] = orderBook(order.assetPair) match { + case Some(orderBookRef) => + log.trace(s"Canceling ${order.id()} for ${order.sender.address}") + (orderBookRef ? CancelOrder(order.id())).mapTo[MatcherResponse] + case None => + log.debug(s"Order book for ${order.assetPair} was not found, canceling ${order.id()} anyway") + (orderHistory ? OrderHistoryActor.ForceCancelOrderFromHistory(order.id())) + .map { + case Some(_) => OrderCanceled(order.id()) + case None => OrderCancelRejected(s"Order ${order.id()} not found") + } + } + private def cancelOrder(orderId: ByteStr, senderPublicKey: Option[PublicKeyAccount]): ToResponseMarshallable = { val st = cancelTimer.start() DBUtils.orderInfo(db, orderId).status match { @@ -174,26 +193,35 @@ case class MatcherApiRoute(assetPairBuilder: AssetPairBuilder, log.warn(s"Order $orderId was not found in history") StatusCodes.NotFound case Some(order) if senderPublicKey.exists(_ != order.senderPublicKey) => - StatusCodes.BadRequest -> Json.obj("message" -> "Public Key mismatch") + OrderCancelRejected("Public key mismatch") case Some(order) => - orderBook(order.assetPair) match { - case Some(orderBookRef) => - log.trace(s"Cancelling ${order.id()} for ${senderPublicKey.map(_.address)}") - (orderBookRef ? CancelOrder(orderId)).mapTo[MatcherResponse].andThen { case _ => st.stop() } - case None => - log.debug(s"Order book for ${order.assetPair} was not found, cancelling $orderId anyway") - (orderHistory ? OrderHistoryActor.ForceCancelOrderFromHistory(orderId)) - .mapTo[Option[Order]] - .map { - case None => StatusCodes.NotFound - case Some(_) => StatusCodes.OK - } - .andThen { case _ => st.stop() } - } + doCancel(order).andThen { case _ => st.stop() } } } } + private def batchCancel(senderPublicKey: PublicKeyAccount, assetPair: Option[AssetPair], requestTimestamp: Long): Future[ToResponseMarshallable] = { + (orderHistory ? BatchCancel(senderPublicKey, assetPair, requestTimestamp)) + .mapTo[Either[String, Long]] + .flatMap { + case Left(e) => Future.successful[ToResponseMarshallable](OrderCancelRejected(e)) + case Right(_) => + val ordersToCancel = assetPair match { + case Some(p) => DBUtils.ordersByAddressAndPair(db, senderPublicKey, p, true, DBUtils.indexes.active.MaxElements) + case None => DBUtils.ordersByAddress(db, senderPublicKey, true, DBUtils.indexes.active.MaxElements) + } + + ordersToCancel + .foldLeft(Future.successful(Json.arr())) { + case (f, (order, _)) => + f.zipWith(doCancel(order)) { (r, mr) => + r :+ mr.json + }(batchCancelExecutor) + } + .map(arr => StatusCodes.OK -> Json.obj("status" -> "BatchCancelCompleted", "message" -> arr)) + } + } + @Path("/orderbook/{amountAsset}/{priceAsset}/cancel") @ApiOperation( value = "Cancel order", @@ -218,15 +246,45 @@ case class MatcherApiRoute(assetPairBuilder: AssetPairBuilder, withAssetPair(p) { pair => orderBook(pair).fold[Route](complete(StatusCodes.NotFound -> Json.obj("message" -> "Invalid asset pair"))) { _ => json[CancelOrderRequest] { req => - if (req.isSignatureValid()) req.orderId match { - case Some(id) => cancelOrder(id, Some(req.sender)) - case None => NotImplemented("Batch cancel is not supported yet") + if (req.isSignatureValid()) (req.orderId, req.timestamp) match { + case (Some(id), None) => cancelOrder(id, Some(req.sender)) + case (None, Some(reqTimestamp)) => + batchCancel(req.sender, Some(pair), reqTimestamp).andThen { + case Failure(exception) => log.debug(s"Error validating batch cancel request from ${req.sender.toAddress} for $pair", exception) + } + case _ => StatusCodes.BadRequest -> "Either timestamp or orderId must be provided" } else InvalidSignature } } } } + @Path("/orderbook/cancel") + @ApiOperation( + value = "Cancel all active orders", + httpMethod = "POST", + produces = "application/json", + consumes = "application/json" + ) + @ApiImplicitParams( + Array( + new ApiImplicitParam( + name = "body", + value = "Json with data", + required = true, + paramType = "body", + dataType = "com.wavesplatform.matcher.api.CancelOrderRequest" + ) + )) + def cancelAll: Route = (path("orderbook" / "cancel") & post) { + json[CancelOrderRequest] { req => + if (req.isSignatureValid()) + req.timestamp.fold[Future[ToResponseMarshallable]](Future.successful(OrderCancelRejected("Timestamp must be specified"))) { reqTimestamp => + batchCancel(req.sender, None, reqTimestamp) + } else InvalidSignature + } + } + @Path("/orderbook/{amountAsset}/{priceAsset}/delete") @Deprecated @ApiOperation( diff --git a/src/main/scala/com/wavesplatform/matcher/market/OrderHistoryActor.scala b/src/main/scala/com/wavesplatform/matcher/market/OrderHistoryActor.scala index 05318c43199..a5a91bfd478 100644 --- a/src/main/scala/com/wavesplatform/matcher/market/OrderHistoryActor.scala +++ b/src/main/scala/com/wavesplatform/matcher/market/OrderHistoryActor.scala @@ -2,6 +2,7 @@ package com.wavesplatform.matcher.market import akka.actor.{Actor, Props} import com.wavesplatform.matcher.MatcherSettings +import com.wavesplatform.matcher.api.BatchCancel import com.wavesplatform.matcher.market.OrderHistoryActor._ import com.wavesplatform.matcher.model.Events.{OrderAdded, OrderCanceled, OrderExecuted} import com.wavesplatform.matcher.model._ @@ -34,6 +35,8 @@ class OrderHistoryActor(db: DB, settings: MatcherSettings) extends Actor { cancelledTimer.measure(orderHistory.process(ev)) case ForceCancelOrderFromHistory(id) => forceCancelOrder(id) + case BatchCancel(address, _, ts) => + sender() ! orderHistory.updateTimestamp(address, ts) } def forceCancelOrder(id: ByteStr): Unit = { diff --git a/src/main/scala/com/wavesplatform/matcher/model/OrderHistory.scala b/src/main/scala/com/wavesplatform/matcher/model/OrderHistory.scala index f8ff684e8ba..46d4861a212 100644 --- a/src/main/scala/com/wavesplatform/matcher/model/OrderHistory.scala +++ b/src/main/scala/com/wavesplatform/matcher/model/OrderHistory.scala @@ -25,13 +25,32 @@ class OrderHistory(db: DB, settings: MatcherSettings) extends ScorexLogging { private val saveOpenVolumeTimer = timer.refine("action" -> "save-open-volume") private val saveOrderInfoTimer = timer.refine("action" -> "save-order-info") + private def updateTimestamp(rw: RW, address: Address, newTimestamp: Long): Either[String, Long] = { + val k = lastCommandTimestamp(address) + rw.get(k) match { + case None => + rw.put(k, Some(newTimestamp)) + Right(newTimestamp) + case Some(prevTimestamp) => + val earliestTimestamp = prevTimestamp - settings.orderTimestampDrift + if (newTimestamp < earliestTimestamp) { + Left(s"Timestamp must be >= $earliestTimestamp") + } else if (newTimestamp <= prevTimestamp) { + Right(prevTimestamp) + } else { + rw.put(k, Some(newTimestamp)) + Right(newTimestamp) + } + } + } + + def updateTimestamp(address: Address, newTimestamp: Long): Either[String, Long] = db.readWrite(rw => updateTimestamp(rw, address, newTimestamp)) + def process(event: OrderAdded): Unit = saveOrderInfoTimer.measure { db.readWrite { rw => - val order = event.order.order - val k = lastOrderTimestamp(order.sender) - val newestTimestamp = rw.get(k).fold(order.timestamp)(_.max(order.timestamp)) + val order = event.order.order - rw.put(k, Some(newestTimestamp)) + updateTimestamp(rw, order.sender, order.timestamp) val origInfo = rw.get(orderInfoOpt(order.id())) if (!origInfo.exists(_.status.isFinal)) {