diff --git a/dex/src/main/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActor.scala b/dex/src/main/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActor.scala index dcc7bb87ac..4fd64f427e 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActor.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActor.scala @@ -94,7 +94,11 @@ object ExchangeTransactionBroadcastActor { } case Command.Tick => - val updatedInProgress = inProgress.view.mapValues(_.decreasedAttempts) + //noinspection SortFilter + val updatedInProgress = inProgress.view + .mapValues(_.decreasedAttempts) + .toList + .sortBy { case (_, v) => v.tx.timestamp } .filter { case (txId, x) => val valid = x.isValid // This could be in Event.Broadcasted diff --git a/dex/src/test/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActorSpecification.scala b/dex/src/test/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActorSpecification.scala index 9a648c8273..a669e1f903 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActorSpecification.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/actors/tx/ExchangeTransactionBroadcastActorSpecification.scala @@ -51,6 +51,38 @@ class ExchangeTransactionBroadcastActorSpecification broadcasted shouldBe Seq(event.tx) } + "ordered transactions broadcast on retry" in { + var broadcasted = Seq.empty[ExchangeTransaction] + val attempts = new AtomicInteger(0) + val actor = defaultActorWithSettings( + Settings( + interval = 20.millis, + maxPendingTime = 200.millis + ) + ) { tx => + if (attempts.incrementAndGet() > 2) { + broadcasted = broadcasted :+ tx + Future.successful(CheckedBroadcastResult.Confirmed) + } else Future.successful(CheckedBroadcastResult.Failed("Couldn't broadcast transaction", canRetry = true)) + } + + val client = testKit.createTestProbe[Observed]() + + val createdTs = System.currentTimeMillis() + val time = new TestTime(createdTs) + val event1 = sampleEvent(client.ref, createdTs - 500L, time) + val event2 = sampleEvent(client.ref, createdTs, time) + + actor ! event2 + actor ! event1 + + (1 to 2).foreach(_ => manualTime.timePasses(21.millis)) + + eventually { + broadcasted shouldBe Seq(event1.tx, event2.tx) + } + } + "send a response to a client, if a transaction" - { def test(result: CheckedBroadcastResult): Unit = { val actor = defaultActor { _ => @@ -264,45 +296,51 @@ class ExchangeTransactionBroadcastActorSpecification clientRef: ActorRef[Observed], createdTs: Long = System.currentTimeMillis(), time: Time = new TestTime() - ): ExchangeTransactionBroadcastActor.Command.Broadcast = { - val now = time.getTimestamp() - val expiration = now + 1.day.toMillis + ): ExchangeTransactionBroadcastActor.Command.Broadcast = ExchangeTransactionBroadcastActor.Command.Broadcast( clientRef = clientRef, addressSpendings = Map.empty, - tx = ExchangeTransactionV3 - .mk( - amountAssetDecimals = 8, - priceAssetDecimals = 8, - buyOrder = Order.buy( - sender = KeyPair(Array.emptyByteArray), - matcher = KeyPair(Array.emptyByteArray), - pair = pair, - amount = 100, - price = 6000000L, - timestamp = now, - expiration = expiration, - matcherFee = 100 - ), - sellOrder = Order.sell( - sender = KeyPair(Array.emptyByteArray), - matcher = KeyPair(Array.emptyByteArray), - pair = pair, - amount = 100, - price = 6000000L, - timestamp = now, - expiration = expiration, - matcherFee = 100 - ), + tx = sampleTransaction(createdTs, time) + ) + + private def sampleTransaction( + createdTs: Long = System.currentTimeMillis(), + time: Time = new TestTime() + ): ExchangeTransaction = { + val now = time.getTimestamp() + val expiration = now + 1.day.toMillis + ExchangeTransactionV3 + .mk( + amountAssetDecimals = 8, + priceAssetDecimals = 8, + buyOrder = Order.buy( + sender = KeyPair(Array.emptyByteArray), + matcher = KeyPair(Array.emptyByteArray), + pair = pair, amount = 100, price = 6000000L, - buyMatcherFee = 0L, - sellMatcherFee = 0L, - fee = 300000L, - timestamp = createdTs, - proofs = Proofs.empty - ).transaction - ) + timestamp = now, + expiration = expiration, + matcherFee = 100 + ), + sellOrder = Order.sell( + sender = KeyPair(Array.emptyByteArray), + matcher = KeyPair(Array.emptyByteArray), + pair = pair, + amount = 100, + price = 6000000L, + timestamp = now, + expiration = expiration, + matcherFee = 100 + ), + amount = 100, + price = 6000000L, + buyMatcherFee = 0L, + sellMatcherFee = 0L, + fee = 300000L, + timestamp = createdTs, + proofs = Proofs.empty + ).transaction } } diff --git a/waves-ext/src/main/resources/application.conf b/waves-ext/src/main/resources/application.conf index d45e184c6f..c3f6e929c8 100644 --- a/waves-ext/src/main/resources/application.conf +++ b/waves-ext/src/main/resources/application.conf @@ -25,9 +25,18 @@ waves { } -akka.actor.waves-dex-grpc-scheduler { - type = "Dispatcher" - executor = "thread-pool-executor" - thread-pool-executor.fixed-pool-size = 8 - throughput = 10 +akka.actor { + waves-dex-grpc-scheduler { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor.fixed-pool-size = 8 + throughput = 10 + } + + broadcast-dispatcher { + type = "Dispatcher" + executor = "thread-pool-executor" + thread-pool-executor.fixed-pool-size = 1 + throughput = 1 + } } diff --git a/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/DEXExtension.scala b/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/DEXExtension.scala index 8acbbab7d0..10b861be3c 100644 --- a/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/DEXExtension.scala +++ b/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/DEXExtension.scala @@ -31,6 +31,8 @@ class DEXExtension(context: ExtensionContext) extends Extension with ScorexLoggi executionModel = ExecutionModel.AlwaysAsyncExecution ) + private val broadcastEc = context.actorSystem.dispatchers.lookup("akka.actor.broadcast-dispatcher") + implicit val byteStrValueReader: ValueReader[ByteStr] = (config: Config, path: String) => { val str = config.as[String](path) decodeBase58(str, config) @@ -45,7 +47,7 @@ class DEXExtension(context: ExtensionContext) extends Extension with ScorexLoggi val lpAccountsFilePath: String = context.settings.config.as[String]("waves.dex.lp-accounts.file-path") val lpAccounts: Set[ByteStr] = lpAccountsFromPath(lpAccountsFilePath, context.settings.config) - apiService = new WavesBlockchainApiGrpcService(context, allowedBlockchainStateAccounts, lpAccounts) + apiService = new WavesBlockchainApiGrpcService(context, allowedBlockchainStateAccounts, lpAccounts, broadcastEc) val bindAddress = new InetSocketAddress(host, port) diff --git a/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/services/WavesBlockchainApiGrpcService.scala b/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/services/WavesBlockchainApiGrpcService.scala index 5588ef9206..60f955835e 100644 --- a/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/services/WavesBlockchainApiGrpcService.scala +++ b/waves-ext/src/main/scala/com/wavesplatform/dex/grpc/integration/services/WavesBlockchainApiGrpcService.scala @@ -44,11 +44,16 @@ import monix.reactive.subjects.ConcurrentSubject import shapeless.Coproduct import java.util.concurrent.ConcurrentHashMap -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} -class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchainStateAccounts: Set[ByteStr], lpAccounts: Set[ByteStr])(implicit +class WavesBlockchainApiGrpcService( + context: ExtensionContext, + allowedBlockchainStateAccounts: Set[ByteStr], + lpAccounts: Set[ByteStr], + broadcastEc: ExecutionContext +)(implicit sc: Scheduler ) extends WavesBlockchainApiGrpc.WavesBlockchainApi with ScorexLogging { @@ -124,12 +129,18 @@ class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchain .explicitGetErr() } - override def checkedBroadcast(request: CheckedBroadcastRequest): Future[CheckedBroadcastResponse] = + override def checkedBroadcast(request: CheckedBroadcastRequest): Future[CheckedBroadcastResponse] = { + //"sc" name is mandatory here in order to override outer "implicit sc" + implicit val sc: ExecutionContext = broadcastEc + + val maybeTx = request.transaction + .fold(GenericError("The signed transaction must be specified").asLeft[SignedExchangeTransaction])(_.asRight[GenericError]) + .flatMap(_.toVanilla) + log.info(s"Broadcasting (1) ${maybeTx.map(_.id().toString).getOrElse("*")}") + Future { for { - grpcTx <- request.transaction - .fold(GenericError("The signed transaction must be specified").asLeft[SignedExchangeTransaction])(_.asRight[GenericError]) - tx <- grpcTx.toVanilla + tx <- maybeTx isConfirmed <- context.transactionsApi.transactionById(tx.id()).fold(false)(_ => true).asRight isInUtx <- context.transactionsApi.unconfirmedTransactionById(tx.id()).fold(false)(_ => true).asRight } yield (tx, isConfirmed, isInUtx) @@ -153,6 +164,7 @@ class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchain val message = Option(e.getMessage).getOrElse(e.getClass.getName) CheckedBroadcastResponse(CheckedBroadcastResponse.Result.Failed(CheckedBroadcastResponse.Failure(message))) } + } private def handleTxInUtx(tx: exchange.ExchangeTransaction): Future[CheckedBroadcastResponse.Result] = broadcastTransaction(tx).map { @@ -164,11 +176,13 @@ class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchain } } - private def broadcastTransaction(tx: exchange.ExchangeTransaction): Future[TracedResult[ValidationError, Boolean]] = + private def broadcastTransaction(tx: exchange.ExchangeTransaction): Future[TracedResult[ValidationError, Boolean]] = { + log.info(s"Broadcasting (2) ${tx.id()}") context.transactionsApi.broadcastTransaction(tx).andThen { - case Success(r) => log.info(s"Broadcast ${tx.id()}: ${r.resultE}") + case Success(r) => log.info(s"Broadcasting (3) ${tx.id()}: ${r.resultE}") case Failure(e) => log.warn(s"Can't broadcast ${tx.id()}", e) } + } override def isFeatureActivated(request: IsFeatureActivatedRequest): Future[IsFeatureActivatedResponse] = Future { IsFeatureActivatedResponse(