From 30da5e7603e5373b96762214f82b0329a5d4000d Mon Sep 17 00:00:00 2001 From: Fabio Pinheiro Date: Wed, 22 Nov 2023 14:00:17 +0000 Subject: [PATCH] feat: websockets support (#172) Support DIDComm via Websockets Support WebSocket URI as endpoints Update scala-did to 0.1.0-M15 Use of Scala DID framework and Transport Remove module 'http-utils' Support multi endpoint in the config file Better error handling Configuration update Cleanup duplicated code Work for https://github.com/input-output-hk/atala-prism-mediator/issues/144 --------- Signed-off-by: Pete Vielhaber Co-authored-by: Pete Vielhaber --- .github/workflows/integration-tests.yml | 4 +- README.md | 2 +- build.sbt | 21 +- docker-compose.yml | 2 +- .../mediator/comm/MessageDispatcherJVM.scala | 55 --- .../atala/mediator/utils/HttpHelpers.scala | 54 --- .../atala/mediator/comm/DispatcherFail.scala | 6 - .../mediator/comm/MessageDispatcher.scala | 17 - .../iohk/atala/mediator/comm/TapMessage.scala | 10 - .../iohk/atala/mediator/utils/MyHeaders.scala | 5 - .../charts/mediator/templates/deployment.yaml | 4 +- mediator/src/main/resources/application.conf | 4 +- .../mediator/AgentExecutorMediator.scala | 275 ++++++++++++++ .../iohk/atala/mediator/DIDCommRoutes.scala | 79 ++++ .../scala/io/iohk/atala/mediator/Error.scala | 7 +- .../atala/mediator/{app => }/IndexHtml.scala | 2 +- .../atala/mediator/MediaTypesExtension.scala | 15 + .../iohk/atala/mediator/MediatorAgent.scala | 104 ++++++ .../{app => }/MediatorStandalone.scala | 36 +- .../io/iohk/atala/mediator/OperatorImp.scala | 48 +++ .../iohk/atala/mediator/actions/Action.scala | 13 - .../atala/mediator/actions/ActionUtils.scala | 158 -------- .../mediator/actions/ProtocolExecute.scala | 88 ----- .../atala/mediator/app/MediatorAgent.scala | 351 ------------------ .../iohk/atala/mediator/db/DataModels.scala | 62 +++- .../atala/mediator/db/MessageItemRepo.scala | 5 +- .../atala/mediator/db/UserAccountRepo.scala | 6 +- .../protocols/BasicMessageExecuter.scala | 25 -- .../protocols/DiscoverFeaturesExecuter.scala | 17 +- .../protocols/ForwardMessageExecuter.scala | 14 +- .../MediatorCoordinationExecuter.scala | 24 +- .../protocols/MissingProtocolExecuter.scala | 9 +- .../protocols/NullProtocolExecuter.scala | 21 -- .../mediator/protocols/PickupExecuter.scala | 37 +- .../atala/mediator/protocols/Problems.scala | 25 +- .../protocols/TrustPingExecuter.scala | 46 --- .../io/iohk/atala/mediator/db/AgentStub.scala | 6 +- .../mediator/db/MessageItemRepoSpec.scala | 8 +- .../mediator/db/UserAccountRepoSpec.scala | 4 +- .../DiscoverFeaturesExecuterSpec.scala | 41 +- .../ForwardMessageExecutorSpec.scala | 37 +- .../MediatorCoordinationExecuterSpec.scala | 73 ++-- .../protocols/PickupExecuterSpec.scala | 76 ++-- .../io/iohk/atala/mediator/MediatorInfo.scala | 2 +- 44 files changed, 797 insertions(+), 1101 deletions(-) delete mode 100644 http-utils/jvm/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcherJVM.scala delete mode 100644 http-utils/jvm/src/main/scala/io/iohk/atala/mediator/utils/HttpHelpers.scala delete mode 100644 http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/DispatcherFail.scala delete mode 100644 http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcher.scala delete mode 100644 http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/TapMessage.scala delete mode 100644 http-utils/shared/src/main/scala/io/iohk/atala/mediator/utils/MyHeaders.scala create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala rename mediator/src/main/scala/io/iohk/atala/mediator/{app => }/IndexHtml.scala (97%) create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/MediaTypesExtension.scala create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/MediatorAgent.scala rename mediator/src/main/scala/io/iohk/atala/mediator/{app => }/MediatorStandalone.scala (79%) create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala delete mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/actions/Action.scala delete mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala delete mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala delete mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala delete mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala delete mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala delete mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index c042181a..6f125440 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -25,8 +25,8 @@ jobs: ATALA_GITHUB_ACTOR: ${{ secrets.ATALA_GITHUB_ACTOR }} ATALA_GITHUB_TOKEN: ${{ secrets.ATALA_GITHUB_TOKEN }} REPORTS_DIR: "didcomm-v2-mediator-test-suite/target/site/serenity" - DIDCOMM_V2_TESTSUITE_VERSION: "a6288ef6536cd15181c30896841f5c33ef0c050b" # old "v0.1.0" - MEDIATOR_DID: "did:peer:2.Ez6LSghwSE437wnDE1pt3X6hVDUQzSjsHzinpX3XFvMjRAm7y.Vz6Mkhh1e5CEYYq6JBUcTZ6Cp2ranCWRrv7Yax3Le4N59R6dd.SeyJ0IjoiZG0iLCJzIjoiaHR0cDovL2xvY2FsaG9zdDo4MDgwIiwiciI6W10sImEiOlsiZGlkY29tbS92MiJdfQ" + DIDCOMM_V2_TESTSUITE_VERSION: "13882b256bfdbe8ba738c2798dffc0ce25ca52e4" + MEDIATOR_DID: "did:peer:2.Ez6LSghwSE437wnDE1pt3X6hVDUQzSjsHzinpX3XFvMjRAm7y.Vz6Mkhh1e5CEYYq6JBUcTZ6Cp2ranCWRrv7Yax3Le4N59R6dd.SeyJ0IjoiZG0iLCJzIjoiaHR0cDovL2xvY2FsaG9zdDo4MDgwIiwiciI6W10sImEiOlsiZGlkY29tbS92MiJdfQ.SeyJ0IjoiZG0iLCJzIjoid3M6Ly9sb2NhbGhvc3Q6ODA4MC93cyIsInIiOltdLCJhIjpbImRpZGNvbW0vdjIiXX0" steps: - name: Checkout mediator uses: actions/checkout@v3 diff --git a/README.md b/README.md index 9328f82d..5c49cf1f 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,7 @@ To set up the mediator identity: - `KEY_AGREEMENT_X` - is the key agreement public key (MUST be a X25519 OKP key type). - `KEY_AUTHENTICATION_D` - is the key authentication private key (MUST be an Ed25519 OKP key type). - `KEY_AUTHENTICATION_X` - is the key authentication public key (MUST be an Ed25519 OKP key type). -- `SERVICE_ENDPOINT` - is the endpoint of the mediator. Where the mediator will be listening to incoming DID Comm messages. +- `SERVICE_ENDPOINTS` - is the list of endpoints of the mediator split by ';' where the mediator will listen to incoming DIDComm messages. #### mediator-storage To set up the mediator storage (MongoDB): diff --git a/build.sbt b/build.sbt index cffa8ab0..6ae1a121 100644 --- a/build.sbt +++ b/build.sbt @@ -9,8 +9,7 @@ inThisBuild( /** Versions */ lazy val V = new { - val scalaDID = "0.1.0-M13" -// val scalajsJavaSecureRandom = "1.0.0" + val scalaDID = "0.1.0-M15" // FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554 val munit = "1.0.0-M10" // "0.7.29" @@ -49,6 +48,7 @@ lazy val D = new { val scalaDID = Def.setting("app.fmgp" %%% "did" % V.scalaDID) val scalaDID_imp = Def.setting("app.fmgp" %%% "did-imp" % V.scalaDID) val scalaDID_peer = Def.setting("app.fmgp" %%% "did-method-peer" % V.scalaDID) + val scalaDID_framework = Def.setting("app.fmgp" %%% "did-framework" % V.scalaDID) // /** The [[java.security.SecureRandom]] is used by the [[java.util.UUID.randomUUID()]] method in [[MsgId]]. // * @@ -175,19 +175,6 @@ lazy val buildInfoConfigure: Project => Project = _.enablePlugins(BuildInfoPlugi ), ) -lazy val httpUtils = crossProject(JSPlatform, JVMPlatform) // project - .in(file("http-utils")) - .settings(publish / skip := true) - .settings((setupTestConfig): _*) - .settings( - libraryDependencies += D.scalaDID.value, - ) - .jsConfigure(scalaJSBundlerConfigure) - .jsSettings(Compile / npmDependencies ++= NPM.sha256) - .jvmSettings( - libraryDependencies += D.zioHttp.value, - ) - lazy val mediator = project .in(file("mediator")) .configure(buildInfoConfigure) @@ -204,6 +191,7 @@ lazy val mediator = project .settings( libraryDependencies += D.scalaDID_imp.value, libraryDependencies += D.scalaDID_peer.value, + libraryDependencies += D.scalaDID_framework.value, libraryDependencies += D.zioHttp.value, libraryDependencies ++= Seq( D.zioConfig.value, @@ -226,7 +214,7 @@ lazy val mediator = project testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework") ) .settings( - Compile / mainClass := Some("io.iohk.atala.mediator.app.MediatorStandalone"), + Compile / mainClass := Some("io.iohk.atala.mediator.MediatorStandalone"), Docker / maintainer := "atala-coredid@iohk.io", Docker / dockerUsername := Some("input-output-hk"), Docker / dockerRepository := Some("ghcr.io"), @@ -254,7 +242,6 @@ lazy val mediator = project Runtime / managedClasspath += (Assets / packageBin).value, ) .enablePlugins(WebScalaJSBundlerPlugin) - .dependsOn(httpUtils.jvm) // did, didExample, .enablePlugins(JavaAppPackaging, DockerPlugin) lazy val webapp = project diff --git a/docker-compose.yml b/docker-compose.yml index 950067b8..7299ff26 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,7 +23,7 @@ services: - KEY_AGREEMENT_X=Sr4SkIskjN_VdKTn0zkjYbhGTWArdUNE4j_DmUpnQGw - KEY_AUTHENTICATION_D=INXCnxFEl0atLIIQYruHzGd5sUivMRyQOzu87qVerug - KEY_AUTHENTICATION_X=MBjnXZxkMcoQVVL21hahWAw43RuAG-i64ipbeKKqwoA - - SERVICE_ENDPOINT=http://localhost:8080 + - SERVICE_ENDPOINTS=http://localhost:8080;ws://localhost:8080/ws - MONGODB_USER=admin - MONGODB_PASSWORD=admin - MONGODB_PROTOCOL=mongodb diff --git a/http-utils/jvm/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcherJVM.scala b/http-utils/jvm/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcherJVM.scala deleted file mode 100644 index 2704c260..00000000 --- a/http-utils/jvm/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcherJVM.scala +++ /dev/null @@ -1,55 +0,0 @@ -package io.iohk.atala.mediator.comm - -import fmgp.crypto.error.* -import fmgp.did.* -import fmgp.did.comm.* -import io.iohk.atala.mediator.comm.* -import io.iohk.atala.mediator.utils.MyHeaders -import zio.* -import zio.http.{MediaType => ZMediaType, *} -import zio.json.* - -import scala.util.chaining._ - -object MessageDispatcherJVM { - val layer: ZLayer[Client, Throwable, MessageDispatcher] = - ZLayer.fromZIO( - ZIO - .service[Client] - .map(MessageDispatcherJVM(_)) - ) -} - -class MessageDispatcherJVM(client: Client) extends MessageDispatcher { - def send( - msg: EncryptedMessage, - /*context*/ - destination: String, - xForwardedHost: Option[String], - ): ZIO[Any, DispatcherError, String] = { - val contentTypeHeader = msg.`protected`.obj.typ - .getOrElse(MediaTypes.ENCRYPTED) - // .pipe(e => Header.ContentType(ZMediaType(e.mainType, e.subType))) FIXME - .pipe(e => Header.ContentType(ZMediaType.application.any.copy(subType = "didcomm-encrypted+json"))) - val xForwardedHostHeader = xForwardedHost.map(x => Header.Custom(customName = MyHeaders.xForwardedHost, x)) - - for { - res <- Client - .request( - Request - .post(destination, Body.fromCharSequence(msg.toJson)) - .setHeaders(Headers(Seq(Some(contentTypeHeader), xForwardedHostHeader).flatten)) - ) - .tapError(ex => ZIO.logWarning(s"Fail when calling '$destination': ${ex.toString}")) - .mapError(ex => DispatcherError(ex)) - data <- res.body.asString - .tapError(ex => ZIO.logError(s"Fail parce http response body: ${ex.toString}")) - .mapError(ex => DispatcherError(ex)) - _ <- res.status.isError match - case true => ZIO.logWarning(data) - case false => ZIO.logInfo(data) - } yield (data) - } - .provideSomeLayer(Scope.default) - .provideEnvironment(ZEnvironment(client)) -} diff --git a/http-utils/jvm/src/main/scala/io/iohk/atala/mediator/utils/HttpHelpers.scala b/http-utils/jvm/src/main/scala/io/iohk/atala/mediator/utils/HttpHelpers.scala deleted file mode 100644 index 1038b029..00000000 --- a/http-utils/jvm/src/main/scala/io/iohk/atala/mediator/utils/HttpHelpers.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* -package io.iohk.atala.mediator.utils - -import zio.* -import zio.http.* -import java.util.concurrent.TimeUnit - -object MiddlewareUtils { - - final def serverTime: RequestHandlerMiddleware[Nothing, Any, Nothing, Any] = HttpAppMiddleware.patchZIO(_ => - for { - currentMilliseconds <- Clock.currentTime(TimeUnit.MILLISECONDS) - withHeader = Response.Patch.addHeader("X-Time", currentMilliseconds.toString) - } yield withHeader, - ) - - final def annotateHeaders: RequestHandlerMiddleware[Nothing, Any, Nothing, Any] = - new RequestHandlerMiddleware.Simple[Any, Nothing] { - override def apply[R1 <: Any, Err1 >: Nothing]( - handler: Handler[R1, Err1, Request, Response], - )(implicit trace: Trace): Handler[R1, Err1, Request, Response] = - Handler.fromFunctionZIO { (request: Request) => - - def annotations = request.headers.toSet.flatMap(h => - h.headerName.toLowerCase() match { - case "fly-client-ip" => Some(LogAnnotation("client-ip", h.renderedValue)) - case "fly-request-id" => Some(LogAnnotation("fly-request-id", h.renderedValue)) - case "x-request-id" => Some(LogAnnotation("x-request-id", h.renderedValue)) - case "user-agent" => Some(LogAnnotation("user-agent", h.renderedValue)) - case "host" => Some(LogAnnotation("host", h.renderedValue)) - case _ => None - } - ) - - val requestHandler = handler - .runZIO(request) - .sandbox - .exit - .timed - .tap { - case (duration, Exit.Success(response)) => - ZIO.log(s"${response.status.code} ${request.method} ${request.url.encode} ${duration.toMillis}ms") - case (duration, Exit.Failure(cause)) => - ZIO.log(s"Failed ${request.method} ${request.url.encode} ${duration.toMillis}ms: " + cause.prettyPrint) - } - .flatMap(_._2) - .unsandbox - - ZIO.logAnnotate(annotations)(requestHandler) - } - } - -} - */ diff --git a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/DispatcherFail.scala b/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/DispatcherFail.scala deleted file mode 100644 index a3be3f29..00000000 --- a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/DispatcherFail.scala +++ /dev/null @@ -1,6 +0,0 @@ -package io.iohk.atala.mediator.comm - -case class DispatcherError(error: String) -object DispatcherError { - def apply(throwable: Throwable) = new DispatcherError(throwable.getClass.getName() + ":" + throwable.getMessage) -} diff --git a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcher.scala b/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcher.scala deleted file mode 100644 index 06c2aece..00000000 --- a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/MessageDispatcher.scala +++ /dev/null @@ -1,17 +0,0 @@ -package io.iohk.atala.mediator.comm - -import fmgp.crypto.error.* -import fmgp.did.* -import fmgp.did.comm.* -import io.iohk.atala.mediator.utils.MyHeaders -import zio.* -import zio.json.* - -trait MessageDispatcher { - def send( - msg: EncryptedMessage, - /*context*/ - destination: String, - xForwardedHost: Option[String], - ): ZIO[Any, DispatcherError, String] -} diff --git a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/TapMessage.scala b/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/TapMessage.scala deleted file mode 100644 index 4e7a686f..00000000 --- a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/comm/TapMessage.scala +++ /dev/null @@ -1,10 +0,0 @@ -package io.iohk.atala.mediator.comm - -import zio.json.* -import fmgp.did.comm.* -final case class TapMessage(msg: EncryptedMessage, decrypted: PlaintextMessage) - -object TapMessage { - given decoder: JsonDecoder[TapMessage] = DeriveJsonDecoder.gen[TapMessage] - given encoder: JsonEncoder[TapMessage] = DeriveJsonEncoder.gen[TapMessage] -} diff --git a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/utils/MyHeaders.scala b/http-utils/shared/src/main/scala/io/iohk/atala/mediator/utils/MyHeaders.scala deleted file mode 100644 index 9558755c..00000000 --- a/http-utils/shared/src/main/scala/io/iohk/atala/mediator/utils/MyHeaders.scala +++ /dev/null @@ -1,5 +0,0 @@ -package io.iohk.atala.mediator.utils - -object MyHeaders { - final val xForwardedHost: CharSequence = "x-forwarded-host" -} diff --git a/infrastructure/charts/mediator/templates/deployment.yaml b/infrastructure/charts/mediator/templates/deployment.yaml index f9e4e59c..72718338 100644 --- a/infrastructure/charts/mediator/templates/deployment.yaml +++ b/infrastructure/charts/mediator/templates/deployment.yaml @@ -39,5 +39,5 @@ spec: value: "27017" - name: MONGODB_DB_NAME value: "mediator" - - name: SERVICE_ENDPOINT - value: "https://{{ index .Values.ingress.applicationUrls 0 }}" + - name: SERVICE_ENDPOINTS + value: "https://{{ index .Values.ingress.applicationUrls 0 }};https://{{ index .Values.ingress.applicationUrls 0 }}/ws" diff --git a/mediator/src/main/resources/application.conf b/mediator/src/main/resources/application.conf index 7ab2a465..3ae8f0c4 100644 --- a/mediator/src/main/resources/application.conf +++ b/mediator/src/main/resources/application.conf @@ -16,8 +16,8 @@ mediator = { x = "MBjnXZxkMcoQVVL21hahWAw43RuAG-i64ipbeKKqwoA" x = ${?KEY_AUTHENTICATION_X} } - endpoint = "http://localhost:8080" - endpoint = ${?SERVICE_ENDPOINT} + endpoints = "http://localhost:8080;ws://localhost:8080/ws" + endpoints = ${?SERVICE_ENDPOINTS} } server.http.port = 8080 server.http.port = ${?PORT} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala new file mode 100644 index 00000000..c842b1c0 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala @@ -0,0 +1,275 @@ +package io.iohk.atala.mediator + +import zio._ +import zio.json._ +import zio.stream._ +import zio.http._ + +import fmgp.crypto.error._ +import fmgp.did._ +import fmgp.did.comm._ +import fmgp.did.comm.protocol._ +import fmgp.did.framework._ +import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo} +import io.iohk.atala.mediator.protocols.Problems +import fmgp.did.comm.protocol.reportproblem2.ProblemReport + +case class AgentExecutorMediator( + agent: Agent, + transportManager: Ref[TransportManager], + protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], + userAccountRepo: UserAccountRepo, + messageItemRepo: MessageItemRepo, +) extends AgentExecutar { + val scope = Scope.global // TODO do not use global + val indentityLayer = ZLayer.succeed(agent) + val userAccountRepoLayer = ZLayer.succeed(userAccountRepo) + val messageItemRepoLayer = ZLayer.succeed(messageItemRepo) + override def subject: DIDSubject = agent.id.asDIDSubject + + override def acceptTransport( + transport: TransportDIDComm[Any] + ): URIO[Operations & Resolver, Unit] = + transport.inbound + .mapZIO(msg => jobExecuterProtocol(msg, transport)) + .runDrain + .forkIn(scope) + .unit // From Fiber.Runtime[fmgp.util.Transport.InErr, Unit] to Unit + + override def receiveMsg( + msg: SignedMessage | EncryptedMessage, + transport: TransportDIDComm[Any] + ): URIO[Operations & Resolver, Unit] = + for { + job <- acceptTransport(transport) + ret <- jobExecuterProtocol(msg, transport) // Run a single time (for the message already read) + } yield () + + def jobExecuterProtocol( + msg: SignedMessage | EncryptedMessage, + transport: TransportDIDComm[Any], + ): URIO[Operations & Resolver, Unit] = + this + .receiveMessage(msg, transport) + .tapError(ex => ZIO.logError(ex.toString)) + .provideSomeLayer(this.indentityLayer) + .provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer) + .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => e ++ ZEnvironment(protocolHandler)) + .orDieWith(ex => new RuntimeException(ex.toString)) + + def receiveMessage( + msg: SignedMessage | EncryptedMessage, + transport: TransportDIDComm[Any] + ): ZIO[ + OperatorImp.Services & ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], + MediatorError | StorageError, + Unit + ] = ZIO.logAnnotate("msg_sha256", msg.sha256) { + for { + _ <- ZIO.logDebug(s"Receive message with sha256: '${msg.sha256}'") + agent <- ZIO.service[Agent] + recipientsSubject <- msg match + case eMsg: EncryptedMessage => ZIO.succeed(eMsg.recipientsSubject) + case sMsg: SignedMessage => + ZIO + .fromEither(sMsg.payloadAsPlaintextMessage) + .map(_.to.toSet.flatten.map(_.toDIDSubject)) + .mapError(didFail => MediatorDidError(didFail)) + _ <- transportManager.get.flatMap { m => + ZIO.foreach(recipientsSubject)(subject => m.publish(subject.asTO, msg)) + } + _ <- + if (!recipientsSubject.contains(agent.id.asDIDSubject)) { + ZIO.logError(s"This agent '${agent.id.asDIDSubject}' is not a recipient") // TODO send a FAIL!!!!!! + } else { + for { + pMsgOrReplay <- AgentExecutorMediator + .decrypt(msg) + .tap { pMsg => + pMsg.from match + case None => ZIO.unit + case Some(from) => transportManager.update { _.link(from.asFROMTO, transport) } + } + .map(Right(_)) + .catchAll { didFail => + for { + _ <- ZIO.logWarning(s"Error Mediator fail to decrypt: $didFail") + agent <- ZIO.service[Agent] + problemReport = Problems.decryptFail( + from = agent.id.asFROM, + comment = "Fail to decrypt Message: " + didFail + ) + } yield Left(problemReport) + } + _ <- processMessage(msg, pMsgOrReplay, transport) + } yield () + } + } yield () + } + + def processMessage( + originalMsg: SignedMessage | EncryptedMessage, + pMsgOrProblemReport: Either[ProblemReport, PlaintextMessage], + transport: TransportDIDComm[Any] + ): ZIO[ + ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError] & OperatorImp.Services, + MediatorError | StorageError, + Unit + ] = + for { + action <- pMsgOrProblemReport match + case Left(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage)) + case Right(plaintextMessage) => + for { + messageItemRepo <- ZIO.service[MessageItemRepo] + maybeProblemReport <- messageItemRepo + .insert(originalMsg) // store all message + .map(_ /*WriteResult*/ => None) + .catchSome { + case StorageCollection(error) => + // This deals with connection errors to the database. + ZIO.logWarning(s"Error StorageCollection: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Problems.storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + ) + ) + case StorageThrowable(error) => + ZIO.logWarning(s"Error StorageThrowable: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Problems.storageError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + ) + ) + case DuplicateMessage(error) => + ZIO.logWarning(s"Error DuplicateMessageError: $error") *> + ZIO + .service[Agent] + .map(agent => + Some( + Problems.dejavuError( + to = plaintextMessage.from.map(_.asTO).toSet, + from = agent.id, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + ) + ) + ) + } + protocolHandler <- ZIO.service[ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError]] + goodAction <- maybeProblemReport match + case Some(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage)) + case None => + protocolHandler + .program(plaintextMessage) + .catchSome { case ProtocolExecutionFailToParse(failToParse) => + for { + _ <- ZIO.logWarning(s"Error ProtocolExecutionFailToParse: $failToParse") + agent <- ZIO.service[Agent] + problemReport = Problems.malformedError( + to = plaintextMessage.from.toSet.map(_.asTO), + from = agent.id.asFROM, + pthid = plaintextMessage.id, + piuri = plaintextMessage.`type`, + comment = failToParse.error + ) + } yield (Reply(problemReport.toPlaintextMessage)) + } + .tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) + } yield goodAction + ret <- action match + case NoReply => ZIO.unit // TODO Maybe infor transport of immediately reply/close + case reply: AnyReply => + import fmgp.did.comm.Operations._ + for { + message <- { + reply.msg.to.toSeq.flatten match { + case Seq() => + reply.msg.from match + case Some(from) => sign(reply.msg) + case None => ZIO.logError(s"No sender or recipient: ${reply.msg}") *> ZIO.fail(NoSenderOrRecipient) + case tos => // TODO FIXME is case is not a response + reply.msg.from match + case Some(from) => authEncrypt(reply.msg) + case None => anonEncrypt(reply.msg) + } + }.mapError(didFail => MediatorDidError(didFail)) + _ <- pMsgOrProblemReport match + case Left(value) => transport.send(message) // REVIEW we are forcing the message to be synchronous + case Right(plaintextMessage) => { + plaintextMessage.return_route match + case Some(ReturnRoute.none) | None => + for { + transportDispatcher: TransportDispatcher <- transportManager.get + _ <- reply.msg.to.toSeq.flatten match { + case Seq() => + message match + case sMsg: SignedMessage => + transport.send(sMsg) // REVIEW we are forcing the message to be synchronous + case eMsg: EncryptedMessage => + ZIO.logWarning("This reply message will be sented to nobody: " + reply.msg.toJson) + case tos => + ZIO.foreachParDiscard(tos) { to => + transportDispatcher + .send(to = to, msg = message, thid = reply.msg.thid, pthid = reply.msg.pthid) + .mapError(didFail => MediatorDidError(didFail)) + } + } + } yield () + case Some(ReturnRoute.all) | Some(ReturnRoute.thread) => transport.send(message) + } + } yield () + } yield () + +} + +object AgentExecutorMediator { + + def make[S >: Resolver & Operations]( + agent: Agent, + protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], + userAccountRepo: UserAccountRepo, + messageItemRepo: MessageItemRepo, + ): ZIO[TransportFactory, Nothing, AgentExecutar] = + for { + transportManager <- TransportManager.make + mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo) + } yield mediator + + // TODO move to another place & move validations and build a contex + def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, DidFail, PlaintextMessage] = + for { + ops <- ZIO.service[Operations] + plaintextMessage <- msg match + case pm: PlaintextMessage => ZIO.succeed(pm) + case em: EncryptedMessage => + { + em.`protected`.obj match + case AnonProtectedHeader(epk, apv, typ, enc, alg) => ops.anonDecrypt(em) + case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => ops.authDecrypt(em) + }.flatMap(decrypt _) + case sm: SignedMessage => + ops.verify(sm).flatMap { + case false => ZIO.fail(ValidationFailed) + case true => + sm.payload.content.fromJson[Message] match + case Left(error) => ZIO.fail(FailToParse(error)) + case Right(msg2) => decrypt(msg2) + } + } yield (plaintextMessage) + +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala b/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala new file mode 100644 index 00000000..79830b68 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala @@ -0,0 +1,79 @@ +package io.iohk.atala.mediator + +import zio._ +import zio.json._ +import zio.stream._ +import zio.http._ +import zio.http.Header.{AccessControlAllowOrigin, AccessControlAllowMethods} + +import fmgp.crypto._ +import fmgp.crypto.error._ +import fmgp.did._ +import fmgp.did.comm._ +import fmgp.did.framework._ +import fmgp.did.method.peer.DidPeerResolver +import fmgp.did.method.peer.DIDPeer.AgentDIDPeer + +object DIDCommRoutes { + + def app: HttpApp[Operator & Operations & Resolver] = routes.toHttpApp + + def routes: Routes[Operator & Operations & Resolver, Nothing] = Routes( + Method.GET / "ws" -> handler { (req: Request) => + for { + annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq) + webSocketApp = TransportWSImp.createWebSocketAppWithOperator(annotationMap) + ret <- webSocketApp.toResponse + } yield (ret) + }, + Method.POST / trailing -> handler { (req: Request) => + val SignedTyp = MediaTypes.SIGNED.typ + val EncryptedTyp = MediaTypes.ENCRYPTED.typ + // FIXME after https://github.com/zio/zio-http/issues/2416 + // .header(Header.ContentType) + // .exists { h => + // h.mediaType.mainType == MediaTypes.mainType && + // (h.mediaType.subType == MediaTypes.SIGNED.subType || h.mediaType.subType == MediaTypes.ENCRYPTED.subType) + req.headers.get("content-type") match + case Some(`SignedTyp`) | Some(`EncryptedTyp`) => + (for { + data <- req.body.asString + msg <- data.fromJson[Message] match + case Left(value) => ZIO.fail(Response.badRequest(s"Fail to parse DID Comm Message because: $value")) + case Right(pMsg: PlaintextMessage) => ZIO.fail(Response.badRequest("Message must not be in Plaintext")) + case Right(sMsg: SignedMessage) => ZIO.succeed(sMsg) + case Right(eMsg: EncryptedMessage) => ZIO.succeed(eMsg) + inboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1) + outboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1) + transport = new TransportDIDComm[Any] { + def id: TransportID = TransportID.http(req.headers.get("request_id")) + def inbound: ZStream[Any, Transport.InErr, SignedMessage | EncryptedMessage] = + ZStream.fromQueue(inboundQueue) + def outbound: ZSink[Any, Transport.OutErr, SignedMessage | EncryptedMessage, Nothing, Unit] = + ZSink.fromQueue(outboundQueue) + + // TODO def close = inboundQueue.shutdown <&> outboundQueue.shutdown + } + operator <- ZIO.service[Operator] + fiber <- operator.receiveTransport(transport).fork + _ <- inboundQueue.offer(msg) + ret <- outboundQueue.take + .timeout(3.seconds) + .tap(e => ZIO.logWarning("Request Timeout").when(e.isEmpty)) + .map { + case None => Response.status(Status.Accepted) + case Some(msg: SignedMessage) => + Response(Status.Ok, Headers(MediaTypes.SIGNED.asContentType), Body.fromCharSequence(msg.toJson)) + case Some(msg: EncryptedMessage) => + Response(Status.Ok, Headers(MediaTypes.ENCRYPTED.asContentType), Body.fromCharSequence(msg.toJson)) + } + shutdown <- inboundQueue.shutdown <&> outboundQueue.shutdown + _ <- fiber.join + } yield ret) + .tapErrorCause(ZIO.logErrorCause("Error", _)) + .catchAllCause(cause => ZIO.succeed(Response.fromCause(cause))) + case Some(_) | None => ZIO.succeed(Response.badRequest(s"The content-type must be $SignedTyp or $EncryptedTyp")) + + }, + ) +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala index c4aa84a0..6fd298f1 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/Error.scala @@ -9,10 +9,9 @@ sealed trait MediatorError case class MediatorException(fail: MediatorError) extends Exception(fail.toString()) +final case class ProtocolExecutionFailToParse(error: FailToParse) extends MediatorError + final case class MediatorDidError(val error: DidFail) extends MediatorError -object MediatorDidError { - def apply(error: DidFail) = new MediatorDidError(error) -} final case class MediatorThrowable(val error: String) extends MediatorError object MediatorThrowable { @@ -38,7 +37,7 @@ object StorageThrowable { final case class DuplicateMessage(val error: String) extends StorageError object DuplicateMessage { - val code = 11000 + val code = 11000 def apply(throwable: Throwable) = new DuplicateMessage(throwable.getClass.getName() + ":" + throwable.getMessage) } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/IndexHtml.scala b/mediator/src/main/scala/io/iohk/atala/mediator/IndexHtml.scala similarity index 97% rename from mediator/src/main/scala/io/iohk/atala/mediator/app/IndexHtml.scala rename to mediator/src/main/scala/io/iohk/atala/mediator/IndexHtml.scala index 0af47f53..e9430ee8 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/IndexHtml.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/IndexHtml.scala @@ -1,4 +1,4 @@ -package io.iohk.atala.mediator.app +package io.iohk.atala.mediator import zio.http._ import fmgp.did.DID diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediaTypesExtension.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediaTypesExtension.scala new file mode 100644 index 00000000..61688189 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediaTypesExtension.scala @@ -0,0 +1,15 @@ +package io.iohk.atala.mediator + +import zio.http.Header +import zio.http.MediaType +import fmgp.did.comm.MediaTypes + +extension (mediaType: MediaTypes) + def asContentType = mediaType match + case MediaTypes.PLAINTEXT => + Header.ContentType(MediaType(MediaTypes.PLAINTEXT.mainType, MediaTypes.PLAINTEXT.subType)) + case MediaTypes.SIGNED => + Header.ContentType(MediaType(MediaTypes.SIGNED.mainType, MediaTypes.SIGNED.subType)) + case MediaTypes.ENCRYPTED | MediaTypes.ANONCRYPT | MediaTypes.AUTHCRYPT | MediaTypes.ANONCRYPT_SIGN | + MediaTypes.AUTHCRYPT_SIGN | MediaTypes.ANONCRYPT_AUTHCRYPT => + Header.ContentType(MediaType(MediaTypes.ENCRYPTED.mainType, MediaTypes.ENCRYPTED.subType)) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorAgent.scala new file mode 100644 index 00000000..111a4969 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorAgent.scala @@ -0,0 +1,104 @@ +package io.iohk.atala.mediator + +import fmgp.crypto.* +import fmgp.crypto.error.* +import fmgp.did.* +import fmgp.did.comm.* +import fmgp.did.comm.protocol.* +import fmgp.did.comm.protocol.oobinvitation.OOBInvitation +import fmgp.did.comm.protocol.reportproblem2.ProblemReport +import io.iohk.atala.mediator.* +import io.iohk.atala.mediator.db.* +import io.iohk.atala.mediator.protocols.* +import io.netty.handler.codec.http.HttpHeaderNames +import reactivemongo.api.bson.{*, given} +import zio.* +import zio.http.* +import zio.json.* + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.Try +import scala.io.Source +import zio.http.Header.AccessControlAllowOrigin +import zio.http.Header.AccessControlAllowMethods +import zio.http.Header.HeaderType + +case class MediatorAgent( + override val id: DID, + override val keyStore: KeyStore, // Should we make it lazy with ZIO +) extends Agent { def keys: Seq[PrivateKey] = keyStore.keys.toSeq } + +object MediatorAgent { + + def make(id: DID, keyStore: KeyStore): ZIO[Any, Nothing, MediatorAgent] = ZIO.succeed(MediatorAgent(id, keyStore)) + + def didCommApp = { + Routes( + Method.GET / "headers" -> handler { (req: Request) => + val data = req.headers.toSeq.map(e => (e.headerName, e.renderedValue)) + ZIO.succeed(Response.text("HEADERS:\n" + data.mkString("\n") + "\nRemoteAddress:" + req.remoteAddress)).debug + }, + Method.GET / "health" -> handler { (req: Request) => ZIO.succeed(Response.ok) }, + Method.GET / "version" -> handler { (req: Request) => ZIO.succeed(Response.text(MediatorBuildInfo.version)) }, + Method.GET / "did" -> handler { (req: Request) => + for { + agent <- ZIO.service[MediatorAgent] + ret <- ZIO.succeed(Response.text(agent.id.string)) + } yield (ret) + }, + Method.GET / "invitation" -> handler { (req: Request) => + for { + agent <- ZIO.service[MediatorAgent] + annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq) + invitation = OOBInvitation( + from = agent.id, + goal_code = Some("request-mediate"), + goal = Some("RequestMediate"), + accept = Some(Seq("didcomm/v2")), + ) + _ <- ZIO.log("New mediate invitation MsgID: " + invitation.id.value) + ret <- ZIO.succeed(Response.json(invitation.toPlaintextMessage.toJson)) + } yield (ret) + }, + Method.GET / "invitationOOB" -> handler { (req: Request) => + for { + agent <- ZIO.service[MediatorAgent] + annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq) + invitation = OOBInvitation( + from = agent.id, + goal_code = Some("request-mediate"), + goal = Some("RequestMediate"), + accept = Some(Seq("didcomm/v2")), + ) + _ <- ZIO.log("New mediate invitation MsgID: " + invitation.id.value) + ret <- ZIO.succeed( + Response.text( + OutOfBand.from(invitation.toPlaintextMessage).makeURI("") + ) + ) + } yield (ret) + }, + Method.GET / trailing -> handler { (req: Request) => + for { + agent <- ZIO.service[MediatorAgent] + _ <- ZIO.log("index.html") + ret <- ZIO.succeed(IndexHtml.html(agent.id)) + } yield ret + }, + Method.GET / "public" / string("path") -> handler { (path: String, req: Request) => + // RoutesMiddleware + // TODO https://zio.dev/reference/stream/zpipeline/#:~:text=ZPipeline.gzip%20%E2%80%94%20The%20gzip%20pipeline%20compresses%20a%20stream%20of%20bytes%20as%20using%20gzip%20method%3A + val fullPath = s"public/$path" + val classLoader = Thread.currentThread().getContextClassLoader() + val headerContentType = fullPath match + case s if s.endsWith(".html") => Header.ContentType(MediaType.text.html) + case s if s.endsWith(".js") => Header.ContentType(MediaType.text.javascript) + case s if s.endsWith(".css") => Header.ContentType(MediaType.text.css) + case s if s.endsWith(".svg") => Header.ContentType(MediaType.image.`svg+xml`) + case s => Header.ContentType(MediaType.text.plain) + Handler.fromResource(fullPath).map(_.addHeader(headerContentType)) + }.flatten + ) + }.sandbox.toHttpApp + +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorStandalone.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala similarity index 79% rename from mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorStandalone.scala rename to mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala index e7d36fa4..2e59d326 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorStandalone.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala @@ -1,4 +1,4 @@ -package io.iohk.atala.mediator.app +package io.iohk.atala.mediator import fmgp.crypto.* import fmgp.crypto.error.* @@ -6,11 +6,8 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.method.peer.* -import io.iohk.atala.mediator.actions.* -import io.iohk.atala.mediator.comm.* import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.protocols.* -import io.iohk.atala.mediator.utils.* import zio.* import zio.config.* import zio.config.magnolia.* @@ -24,12 +21,14 @@ import zio.stream.* import java.time.format.DateTimeFormatter import scala.io.Source -case class MediatorConfig(endpoint: java.net.URI, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) { +import fmgp.did.framework.TransportFactoryImp +case class MediatorConfig(endpoints: String, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) { val did = DIDPeer2.makeAgent( Seq(keyAgreement, keyAuthentication), - Seq(DIDPeerServiceEncoded(s = endpoint.toString())) + endpoints.split(";").toSeq.map(e => DIDPeerServiceEncoded(s = e.toString)) ) - val agentLayer = ZLayer(MediatorAgent.make(id = did.id, keyStore = did.keyStore)) + val agentLayer: ZLayer[Any, Nothing, MediatorAgent] = + ZLayer(MediatorAgent.make(id = did.id, keyStore = did.keyStore)) } case class DataBaseConfig( protocol: String, @@ -52,8 +51,8 @@ object MediatorStandalone extends ZIOAppDefault { allAnnotations |-| cause.highlight - override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = - Runtime.removeDefaultLoggers >>> SLF4J.slf4j(mediatorColorFormat) + // override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = + // Runtime.removeDefaultLoggers >>> SLF4J.slf4j(mediatorColorFormat) override val run = for { _ <- Console.printLine( // https://patorjk.com/software/taag/#p=display&f=ANSI%20Shadow&t=Mediator @@ -68,13 +67,12 @@ object MediatorStandalone extends ZIOAppDefault { ) configs = ConfigProvider.fromResourcePath() mediatorConfig <- configs.nested("identity").nested("mediator").load(deriveConfig[MediatorConfig]) + agentLayer = mediatorConfig.agentLayer _ <- ZIO.log(s"Mediator APP. See https://github.com/input-output-hk/atala-prism-mediator") _ <- ZIO.log(s"MediatorConfig: $mediatorConfig") _ <- ZIO.log(s"DID: ${mediatorConfig.did.id.string}") mediatorDbConfig <- configs.nested("database").nested("mediator").load(deriveConfig[DataBaseConfig]) _ <- ZIO.log(s"MediatorDb Connection String: ${mediatorDbConfig.displayConnectionString}") - myHub <- Hub.sliding[String](5) - _ <- ZStream.fromHub(myHub).run(ZSink.foreach((str: String) => ZIO.logInfo("HUB: " + str))).fork port <- configs .nested("http") .nested("server") @@ -87,20 +85,24 @@ object MediatorStandalone extends ZIOAppDefault { .nested("mediator") .load(Config.string("escalateTo")) _ <- ZIO.log(s"Problem reports escalated to : $escalateTo") - client = Scope.default >>> Client.default - inboundHub <- Hub.bounded[String](5) + transportFactory = Scope.default >>> (Client.default >>> TransportFactoryImp.layer) + repos = { + AsyncDriverResource.layer + >>> ReactiveMongoApi.layer(mediatorDbConfig.connectionString) + >>> (MessageItemRepo.layer ++ UserAccountRepo.layer) + } + // inboundHub <- Hub.bounded[String](5) myServer <- Server - .serve(MediatorAgent.didCommApp @@ (Middleware.cors)) + .serve((MediatorAgent.didCommApp ++ DIDCommRoutes.app) @@ (Middleware.cors)) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) - .provideSomeLayer(mediatorConfig.agentLayer) // .provideSomeLayer(AgentByHost.layer) + .provideSomeLayer(agentLayer) + .provideSomeLayer((agentLayer ++ transportFactory ++ repos) >>> OperatorImp.layer) .provideSomeLayer( AsyncDriverResource.layer >>> ReactiveMongoApi.layer(mediatorDbConfig.connectionString) >>> MessageItemRepo.layer.and(UserAccountRepo.layer).and(OutboxMessageRepo.layer) ) .provideSomeLayer(Operations.layerDefault) - .provideSomeLayer(client >>> MessageDispatcherJVM.layer) - .provideSomeEnvironment { (env: ZEnvironment[Server]) => env.add(myHub) } .provide(Server.defaultWithPort(port)) .debug .fork diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala new file mode 100644 index 00000000..806cdcf4 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala @@ -0,0 +1,48 @@ +package io.iohk.atala.mediator + +import zio.* + +import fmgp.crypto.error.DidFail +import fmgp.did.* +import fmgp.util.* +import fmgp.did.comm.* +import fmgp.did.comm.protocol.* +import fmgp.did.framework.* +import io.iohk.atala.mediator.protocols.* +import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo} + +object OperatorImp { + type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo + + val protocolHandlerLayer: ULayer[ + ProtocolExecuter[Services, MediatorError | StorageError] + ] = + ZLayer.succeed( + ProtocolExecuterCollection( + BasicMessageExecuter.mapError(didFail => ProtocolExecutionFailToParse(didFail)), + (new TrustPingExecuter).mapError(didFail => ProtocolExecutionFailToParse(didFail)), + DiscoverFeaturesExecuter, + MediatorCoordinationExecuter, + ForwardMessageExecuter, + PickupExecuter + )(MissingProtocolExecuter) // (NullProtocolExecute.mapError(didFail => ProtocolExecutionFailToParse(didFail))) + ) + + val layer: ZLayer[MediatorAgent & UserAccountRepo & MessageItemRepo & TransportFactory, Nothing, Operator] = + protocolHandlerLayer >>> + ZLayer.fromZIO( + for { + protocolHandlerAux <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]] + mediator <- ZIO.service[MediatorAgent] + userAccountRepo <- ZIO.service[UserAccountRepo] + messageItemRepo <- ZIO.service[MessageItemRepo] + self <- AgentExecutorMediator.make(mediator, protocolHandlerAux, userAccountRepo, messageItemRepo) + _ <- ZIO.log("Operator: " + self.subject.toString) + operator = Operator( + selfOperator = self, + contacts = Seq(self) + ) + } yield operator + ) + +} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/Action.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/Action.scala deleted file mode 100644 index 32bb5846..00000000 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/Action.scala +++ /dev/null @@ -1,13 +0,0 @@ -package io.iohk.atala.mediator.actions - -import fmgp.did.comm.PlaintextMessage - -sealed trait Action -object NoReply extends Action -sealed trait AnyReply extends Action { def msg: PlaintextMessage } -// object AnyReply { def unapply(anyReply: AnyReply): Option[PlaintextMessage] = Some(anyReply.msg) } -sealed trait SyncReply extends AnyReply -sealed trait AsyncReply extends AnyReply -case class Reply(msg: PlaintextMessage) extends SyncReply with AsyncReply -case class SyncReplyOnly(msg: PlaintextMessage) extends SyncReply -case class AsyncReplyOnly(msg: PlaintextMessage) extends AsyncReply diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala deleted file mode 100644 index b5d71055..00000000 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ActionUtils.scala +++ /dev/null @@ -1,158 +0,0 @@ -package io.iohk.atala.mediator.actions - -import fmgp.crypto.error.* -import fmgp.did.* -import fmgp.did.comm.* -import fmgp.did.comm.Operations.* -import fmgp.did.comm.protocol.* -import fmgp.did.comm.protocol.basicmessage2.* -import fmgp.did.comm.protocol.trustping2.* -import io.iohk.atala.mediator.* -import io.iohk.atala.mediator.comm.* -import io.iohk.atala.mediator.db.* -import zio.* -import zio.json.* -import io.iohk.atala.mediator.protocols.MissingProtocolExecuter - -object ActionUtils { - - def packResponse( - originalMessage: Option[PlaintextMessage], - action: Action - ): ZIO[ - Operations & Agent & Resolver & MessageDispatcher & OutboxMessageRepo, - MediatorError, - Option[SignedMessage | EncryptedMessage] - ] = - action match { - case _: NoReply.type => ZIO.succeed(None) - case action: AnyReply => - val reply = action.msg - for { - - outboxRepo <- ZIO.service[OutboxMessageRepo] - xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value)) - // TODO forward message - maybeSyncReplyMsg: Option[SignedMessage | EncryptedMessage] <- reply.to.map(_.toSeq) match // TODO improve - case None => - ZIO.logWarning("Have a reply but the field 'to' is missing") *> - sign(reply) - .mapError(fail => MediatorDidError(fail)) - .map(Some(_)) - case Some(Seq()) => - ZIO.logWarning("Have a reply but the field 'to' is empty") *> - sign(reply) - .mapError(fail => MediatorDidError(fail)) - .map(Some(_)) - case Some(send2DIDs) => - for { - msg <- { - reply.from match - case Some(value) => authEncrypt(reply) - case None => anonEncrypt(reply) - }.mapError(fail => MediatorDidError(fail)) - - replyViaDIDCommMessagingProgramme = ZIO.foreach(send2DIDs) { to => - for { - messageDispatcher <- ZIO.service[MessageDispatcher] - resolver <- ZIO.service[Resolver] - - doc <- resolver - .didDocument(to) - .mapError(fail => MediatorDidError(fail)) - services = { - doc.service.toSeq.flatten - .collect { case service: DIDServiceDIDCommMessaging => - service - } - } - mURL = services.flatMap(_.endpoints.map(_.uri)).headOption // TODO head - jobToRun <- mURL match - case None => ZIO.logWarning(s"No url to send message") - case Some(url) => { - for { - _ <- ZIO.log(s"Send to url: $url") - response <- messageDispatcher - .send( - msg, - url, - None - // url match // FIXME REMOVE (use for local env) - // case http if http.startsWith("http://") => Some(url.drop(7).split(':').head.split('/').head) - // case https if https.startsWith("https://") => - // Some(url.drop(8).split(':').head.split('/').head) - // case _ => None - ) - .catchAll { case DispatcherError(error) => ZIO.logWarning(s"Dispatch Error: $error") } - - _ <- outboxRepo - .insert( - SentMessageItem( - msg = msg, - plaintext = reply, - recipient = Set(to), - distination = Some(url), - sendMethod = MessageSendMethod.HTTPS_POST, - result = response match - case str: String => Some(str) - case _: Unit => None - , - xRequestId = xRequestId - ) - ) // Maybe fork - .catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") } - } yield () - } - } yield () - } - returnTmp <- action match - case Reply(_) => - if ( - originalMessage // this condition is +- the opposite condition as below - .map { oMsg => oMsg.return_route.isEmpty || oMsg.return_route.contains(ReturnRoute.none) } - .getOrElse(true) // If originalMessage is None - ) (replyViaDIDCommMessagingProgramme *> ZIO.none) - else ZIO.some(msg) - case SyncReplyOnly(_) => ZIO.some(msg) - case AsyncReplyOnly(_) => replyViaDIDCommMessagingProgramme *> ZIO.none - } yield (returnTmp) - _ <- maybeSyncReplyMsg match { - case None => ZIO.unit - case Some(msg) => - ZIO // Store send message INLINE_REPLY - .succeed(msg) - .tap(msg => - outboxRepo - .insert( - SentMessageItem( - msg = msg, - plaintext = reply, - recipient = reply.to.getOrElse(Set.empty), - distination = None, - sendMethod = MessageSendMethod.INLINE_REPLY, - result = None, - xRequestId = xRequestId, - ) - ) - .catchAll { case error => ZIO.logError(s"Store Outbox Error: $error") } - ) - .when( - originalMessage - .map { oMsg => - { // Should replies use the same transport channel? - oMsg.return_route.contains(ReturnRoute.all) || oMsg.return_route.contains(ReturnRoute.thread) - } && { - msg match - case sMsg: SignedMessage => true // TODO If the Message is only sign shoud we reply back? - case eMsg: EncryptedMessage => // Is the reply back to the original sender/caller? - val recipients = eMsg.recipientsSubject.toSeq.map(subject => TO(subject.did)) - oMsg.from.map(_.asTO).exists(recipients.contains) - } - } - .getOrElse(false) // If originalMessage is None - ) - } - - } yield maybeSyncReplyMsg - } -} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala b/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala deleted file mode 100644 index 3738a443..00000000 --- a/mediator/src/main/scala/io/iohk/atala/mediator/actions/ProtocolExecute.scala +++ /dev/null @@ -1,88 +0,0 @@ -package io.iohk.atala.mediator.actions - -import fmgp.crypto.error.* -import fmgp.did.* -import fmgp.did.comm.* -import fmgp.did.comm.Operations.* -import fmgp.did.comm.protocol.* -import fmgp.did.comm.protocol.basicmessage2.* -import fmgp.did.comm.protocol.trustping2.* -import io.iohk.atala.mediator.* -import io.iohk.atala.mediator.comm.* -import io.iohk.atala.mediator.db.* -import io.iohk.atala.mediator.protocols.MissingProtocolExecuter -import zio.* -import zio.json.* -//TODO pick a better name // maybe "Protocol" only - -trait ProtocolExecuter[-R, +E] { // <: MediatorError | StorageError] { - - def supportedPIURI: Seq[PIURI] - - /** @return - * can return a Sync Reply Msg - * - * MUST be override - * {{{ - * override def execute[R1 <: R]( - * plaintextMessage: PlaintextMessage - * ): ZIO[R1, E, Option[SignedMessage | EncryptedMessage]] = - * program(plaintextMessage) *> ZIO.none - * }}} - */ - def execute[R1 <: R]( - plaintextMessage: PlaintextMessage - ): ZIO[R1, E, Option[SignedMessage | EncryptedMessage]] - - def program[R1 <: R](plaintextMessage: PlaintextMessage): ZIO[R1, E, Action] -} - -object ProtocolExecuter { - type Services = Resolver & Agent & Operations & MessageDispatcher & OutboxMessageRepo - type Erros = MediatorError | StorageError -} -case class ProtocolExecuterCollection[-R, +E]( - executers: ProtocolExecuter[R, E]* -)(fallback: ProtocolExecuter[R, E]) - extends ProtocolExecuter[R, E] { - - override def supportedPIURI: Seq[PIURI] = executers.flatMap(_.supportedPIURI) - - def selectExecutersFor(piuri: PIURI) = executers.find(_.supportedPIURI.contains(piuri)) - - override def execute[R1 <: R]( - plaintextMessage: PlaintextMessage, - ): ZIO[R, E, Option[SignedMessage | EncryptedMessage]] = - selectExecutersFor(plaintextMessage.`type`) match - // case None => NullProtocolExecuter.execute(plaintextMessage) - case None => fallback.execute(plaintextMessage) - case Some(px) => px.execute(plaintextMessage) - - override def program[R1 <: R]( - plaintextMessage: PlaintextMessage, - ): ZIO[R1, E, Action] = - selectExecutersFor(plaintextMessage.`type`) match - // case None => NullProtocolExecuter.program(plaintextMessage) - case None => fallback.program(plaintextMessage) - case Some(px) => px.program(plaintextMessage) -} - -trait ProtocolExecuterWithServices[ - -R <: ProtocolExecuter.Services, - +E >: MediatorError // ProtocolExecuter.Erros -] extends ProtocolExecuter[R, E] { - - override def execute[R1 <: R]( - plaintextMessage: PlaintextMessage, - // context: Context - ): ZIO[R1, E, Option[SignedMessage | EncryptedMessage]] = - program(plaintextMessage) - .tap(v => ZIO.logDebug(v.toString)) // DEBUG - .flatMap(action => ActionUtils.packResponse(Some(plaintextMessage), action)) - .debug - - override def program[R1 <: R]( - plaintextMessage: PlaintextMessage, - // context: Context - ): ZIO[R1, E, Action] -} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala b/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala deleted file mode 100644 index 31ae6f06..00000000 --- a/mediator/src/main/scala/io/iohk/atala/mediator/app/MediatorAgent.scala +++ /dev/null @@ -1,351 +0,0 @@ -package io.iohk.atala.mediator.app - -import fmgp.crypto.* -import fmgp.crypto.error.* -import fmgp.did.* -import fmgp.did.comm.* -import fmgp.did.comm.protocol.* -import fmgp.did.comm.protocol.oobinvitation.OOBInvitation -import fmgp.did.comm.protocol.reportproblem2.ProblemReport -import io.iohk.atala.mediator.* -import io.iohk.atala.mediator.actions.* -import io.iohk.atala.mediator.comm.* -import io.iohk.atala.mediator.db.* -import io.iohk.atala.mediator.protocols.* -import io.iohk.atala.mediator.utils.* -import io.netty.handler.codec.http.HttpHeaderNames -import reactivemongo.api.bson.{*, given} -import zio.* -import zio.http.* -import zio.json.* - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.Try -import scala.io.Source -import zio.http.Header.AccessControlAllowOrigin -import zio.http.Header.AccessControlAllowMethods -import zio.http.Header.HeaderType - -case class MediatorAgent( - override val id: DID, - override val keyStore: KeyStore, // Should we make it lazy with ZIO -) extends Agent { - override def keys: Seq[PrivateKey] = keyStore.keys.toSeq - - type Services = Resolver & Agent & Operations & MessageDispatcher & UserAccountRepo & MessageItemRepo & - OutboxMessageRepo - val protocolHandlerLayer: URLayer[UserAccountRepo & MessageItemRepo & OutboxMessageRepo, ProtocolExecuter[ - Services, - MediatorError | StorageError - ]] = - ZLayer.succeed( - ProtocolExecuterCollection[Services, MediatorError | StorageError]( - BasicMessageExecuter, - new TrustPingExecuter, - MediatorCoordinationExecuter, - ForwardMessageExecuter, - PickupExecuter, - )(fallback = MissingProtocolExecuter()) - ) - - val messageDispatcherLayer: ZLayer[Client, MediatorThrowable, MessageDispatcher] = - MessageDispatcherJVM.layer.mapError(ex => MediatorThrowable(ex)) - - // TODO move to another place & move validations and build a contex - def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, MediatorError | ProblemReport, PlaintextMessage] = { - for { - ops <- ZIO.service[Operations] - plaintextMessage <- msg match - case pm: PlaintextMessage => ZIO.succeed(pm) - case em: EncryptedMessage => - { - em.`protected`.obj match - case AnonProtectedHeader(epk, apv, typ, enc, alg) => - ops - .anonDecrypt(em) - .mapError(ex => MediatorDidError(ex)) - case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => - ops - .authDecrypt(em) - .mapError(ex => MediatorDidError(ex)) - }.flatMap(decrypt _) - case sm: SignedMessage => - ops - .verify(sm) - .mapError(ex => MediatorDidError(ex)) - .flatMap { - case false => ZIO.fail(MediatorDidError(ValidationFailed)) - case true => - sm.payload.content.fromJson[Message] match - case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) - case Right(msg2) => decrypt(msg2) - } - } yield (plaintextMessage) - } - - def receiveMessage(data: String): ZIO[ - Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo, - MediatorError | StorageError, - Option[SignedMessage | EncryptedMessage] - ] = - for { - msg <- data.fromJson[EncryptedMessage] match - case Left(error) => - ZIO.logError(s"Data is not a EncryptedMessage: $error") - *> ZIO.fail(MediatorDidError(FailToParse(error))) - case Right(message) => - ZIO.logDebug( - "Message's recipients KIDs: " + message.recipientsKid.mkString(",") + - "; DID: " + "Message's recipients DIDs: " + message.recipientsSubject.mkString(",") - ) *> ZIO.succeed(message) - maybeSyncReplyMsg <- receiveMessage(msg) - } yield (maybeSyncReplyMsg) - - private def receiveMessage(msg: EncryptedMessage): ZIO[ - Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo & OutboxMessageRepo, - MediatorError | StorageError, - Option[SignedMessage | EncryptedMessage] - ] = - ZIO - .logAnnotate("msgHash", msg.sha1) { - for { - _ <- ZIO.log("receivedMessage") - maybeSyncReplyMsg <- - if (!msg.recipientsSubject.contains(id)) - ZIO.logError(s"This mediator '${id.string}' is not a recipient") *> ZIO.none - else - { - for { - messageItemRepo <- ZIO.service[MessageItemRepo] - protocolHandler <- ZIO.service[ProtocolExecuter[Services, MediatorError | StorageError]] - plaintextMessage <- decrypt(msg) - maybeActionStorageError <- messageItemRepo - .insert(msg) // store all message - .map(_ /*WriteResult*/ => None) - .catchSome { - case StorageCollection(error) => - // This deals with connection errors to the database. - ZIO.logWarning(s"Error StorageCollection: $error") *> - ZIO - .service[Agent] - .map(agent => - Some( - Reply( - Problems - .storageError( - to = plaintextMessage.from.map(_.asTO).toSet, - from = agent.id, - pthid = plaintextMessage.id, - piuri = plaintextMessage.`type`, - ) - .toPlaintextMessage - ) - ) - ) - case StorageThrowable(error) => - ZIO.logWarning(s"Error StorageThrowable: $error") *> - ZIO - .service[Agent] - .map(agent => - Some( - Reply( - Problems - .storageError( - to = plaintextMessage.from.map(_.asTO).toSet, - from = agent.id, - pthid = plaintextMessage.id, - piuri = plaintextMessage.`type`, - ) - .toPlaintextMessage - ) - ) - ) - case DuplicateMessage(error) => - ZIO.logWarning(s"Error DuplicateMessageError: $error") *> - ZIO - .service[Agent] - .map(agent => - Some( - Reply( - Problems - .dejavuError( - to = plaintextMessage.from.map(_.asTO).toSet, - from = agent.id, - pthid = plaintextMessage.id, - piuri = plaintextMessage.`type`, - ) - .toPlaintextMessage - ) - ) - ) - } - // TODO Store context of the decrypt unwarping - // TODO SreceiveMessagetore context with MsgID and PIURI - agent <- ZIO.service[Agent] - ret <- { - maybeActionStorageError match - case Some(reply) => ActionUtils.packResponse(Some(plaintextMessage), reply) - case None => protocolHandler.execute(plaintextMessage) - }.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex")) - .catchSome { case MediatorDidError(error) => - ZIO.logError(s"Error MediatorDidError: $error") *> - ActionUtils.packResponse( - Some(plaintextMessage), - Reply( - Problems - .malformedError( - to = plaintextMessage.from.map(_.asTO).toSet, - from = agent.id, - pthid = plaintextMessage.id, - piuri = plaintextMessage.`type`, - ) - .toPlaintextMessage - ) - ) - } - } yield ret - }.catchAll { - case ex: MediatorError => ZIO.fail(ex) - case pr: ProblemReport => ActionUtils.packResponse(None, Reply(pr.toPlaintextMessage)) - case ex: StorageCollection => ZIO.fail(ex) - case ex: StorageThrowable => ZIO.fail(ex) - case ex: DuplicateMessage => ZIO.fail(ex) - } - } yield maybeSyncReplyMsg - } - .provideSomeLayer( /*resolverLayer ++ indentityLayer ++*/ protocolHandlerLayer) - -} - -object MediatorAgent { - - def make(id: DID, keyStore: KeyStore): ZIO[Any, Nothing, MediatorAgent] = ZIO.succeed(MediatorAgent(id, keyStore)) - - def didCommApp = { - Routes( - Method.GET / "headers" -> handler { (req: Request) => - val data = req.headers.toSeq.map(e => (e.headerName, e.renderedValue)) - ZIO.succeed(Response.text("HEADERS:\n" + data.mkString("\n") + "\nRemoteAddress:" + req.remoteAddress)).debug - }, - Method.GET / "health" -> handler { (req: Request) => ZIO.succeed(Response.ok) }, - Method.GET / "version" -> handler { (req: Request) => ZIO.succeed(Response.text(MediatorBuildInfo.version)) }, - Method.GET / "did" -> handler { (req: Request) => - for { - agent <- ZIO.service[MediatorAgent] - ret <- ZIO.succeed(Response.text(agent.id.string)) - } yield (ret) - }, - Method.GET / "invitation" -> handler { (req: Request) => - for { - agent <- ZIO.service[MediatorAgent] - annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq) - invitation = OOBInvitation( - from = agent.id, - goal_code = Some("request-mediate"), - goal = Some("RequestMediate"), - accept = Some(Seq("didcomm/v2")), - ) - _ <- ZIO.log("New mediate invitation MsgID: " + invitation.id.value) - ret <- ZIO.succeed(Response.json(invitation.toPlaintextMessage.toJson)) - } yield (ret) - }, - Method.GET / "invitationOOB" -> handler { (req: Request) => - for { - agent <- ZIO.service[MediatorAgent] - annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq) - invitation = OOBInvitation( - from = agent.id, - goal_code = Some("request-mediate"), - goal = Some("RequestMediate"), - accept = Some(Seq("didcomm/v2")), - ) - _ <- ZIO.log("New mediate invitation MsgID: " + invitation.id.value) - ret <- ZIO.succeed( - Response.text( - OutOfBandPlaintext.from(invitation.toPlaintextMessage).makeURI("") - ) - ) - } yield (ret) - }, - Method.POST / trailing -> handler { (req: Request) => - if ( - req.headers - .get("content-type") - .exists { h => h == MediaTypes.SIGNED.typ || h == MediaTypes.ENCRYPTED.typ } - ) { - for { - agent <- ZIO.service[MediatorAgent] - data <- req.body.asString - .catchAll(ex => ZIO.fail(Response.badRequest("Unable to read the body of the request"))) - ret <- agent - .receiveMessage(data) - .map { - case None => Response(status = Status.Accepted) - case Some(value: SignedMessage) => - Response( - status = Status.Accepted, - headers = Headers(Header.ContentType(MediaType.apply("application", "didcomm-signed+json"))), - body = Body.fromCharSequence(value.toJson) - ) - case Some(value: EncryptedMessage) => - Response( - status = Status.Accepted, - headers = Headers(Header.ContentType(MediaType.apply("application", "didcomm-encrypted+json"))), - body = Body.fromCharSequence(value.toJson) - ) - } - .catchAll { - case MediatorDidError(error) => - ZIO.logError(s"Error MediatorDidError: $error") *> - ZIO.succeed(Response.status(Status.BadRequest)) - case MediatorThrowable(error) => - ZIO.logError(s"Error MediatorThrowable: $error") *> - ZIO.succeed(Response.status(Status.BadRequest)) - case StorageCollection(error) => - ZIO.logError(s"Error StorageCollection: $error") *> - ZIO.succeed(Response.status(Status.BadRequest)) - case StorageThrowable(error) => - ZIO.logError(s"Error StorageThrowable: $error") *> - ZIO.succeed(Response.status(Status.BadRequest)) - case DuplicateMessage(error) => - ZIO.logError(s"Error DuplicateKeyError: $error") *> - ZIO.succeed(Response.status(Status.BadRequest)) - case MissingProtocolError(piuri) => - ZIO.logError(s"MissingProtocolError ('$piuri')") *> - ZIO.succeed(Response.status(Status.BadRequest)) // TODO - } - } yield ret - } else - ZIO - .logError(s"Request Headers: ${req.headers.mkString(",")}") - .as( - Response - .text(s"The content-type must be ${MediaTypes.SIGNED.typ} or ${MediaTypes.ENCRYPTED.typ}") - .copy(status = Status.BadRequest) - ) - - }, - Method.GET / trailing -> handler { (req: Request) => - for { - agent <- ZIO.service[MediatorAgent] - _ <- ZIO.log("index.html") - ret <- ZIO.succeed(IndexHtml.html(agent.id)) - } yield ret - }, - Method.GET / "public" / string("path") -> handler { (path: String, req: Request) => - // RoutesMiddleware - // TODO https://zio.dev/reference/stream/zpipeline/#:~:text=ZPipeline.gzip%20%E2%80%94%20The%20gzip%20pipeline%20compresses%20a%20stream%20of%20bytes%20as%20using%20gzip%20method%3A - val fullPath = s"public/$path" - val classLoader = Thread.currentThread().getContextClassLoader() - val headerContentType = fullPath match - case s if s.endsWith(".html") => Header.ContentType(MediaType.text.html) - case s if s.endsWith(".js") => Header.ContentType(MediaType.text.javascript) - case s if s.endsWith(".css") => Header.ContentType(MediaType.text.css) - case s if s.endsWith(".svg") => Header.ContentType(MediaType.image.`svg+xml`) - case s => Header.ContentType(MediaType.text.plain) - Handler.fromResource(fullPath).map(_.addHeader(headerContentType)) - }.flatten - ) - }.sandbox.toHttpApp - -} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala index ac6e929e..5ac53cca 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/DataModels.scala @@ -6,6 +6,8 @@ import reactivemongo.api.bson.* import java.time.Instant import scala.util.Try import zio.json._ +import scala.util.Failure +import scala.util.Success type HASH = String // messages @@ -13,15 +15,63 @@ type XRequestID = String // x-request-id case class MessageItem( _id: HASH, - msg: EncryptedMessage, - headers: ProtectedHeader, + msg: SignedMessage | EncryptedMessage, + headers: ProtectedHeader | Seq[SignProtectedHeader], ts: String, xRequestId: Option[XRequestID] ) object MessageItem { - def apply(msg: EncryptedMessage, xRequestId: Option[XRequestID]): MessageItem = { - new MessageItem(msg.sha1, msg, msg.`protected`.obj, Instant.now().toString, xRequestId) + def apply(msg: SignedMessage | EncryptedMessage, xRequestId: Option[XRequestID]): MessageItem = + msg match { + case sMsg: SignedMessage => + new MessageItem( + msg.sha256, + msg, + sMsg.signatures.map(_.`protected`.obj), + Instant.now().toString, + xRequestId + ) + case eMsg: EncryptedMessage => + new MessageItem(msg.sha256, msg, eMsg.`protected`.obj, Instant.now().toString, xRequestId) + } + + given BSONWriter[ProtectedHeader | Seq[SignProtectedHeader]] with { + override def writeTry(obj: ProtectedHeader | Seq[SignProtectedHeader]): Try[BSONValue] = + obj match { + case obj: ProtectedHeader => + summon[BSONDocumentWriter[ProtectedHeader]].writeTry(obj) + case seq: Seq[_] => + val f = summon[BSONDocumentWriter[SignProtectedHeader]] + seq + .map(e => Try(e.asInstanceOf[SignProtectedHeader]).flatMap(ee => f.writeTry(ee))) + .foldLeft(Try(Seq.empty[BSONDocument]))((acc, e) => + (acc, e) match + case (Failure(exception), _) => Failure(exception) + case (Success(seq), Failure(exception)) => Failure(exception) + case (Success(seq), Success(value)) => Success(seq :+ value) + ) + .map(e => BSONArray(e)) + } } + + given BSONReader[ProtectedHeader | Seq[SignProtectedHeader]] with { + override def readTry(bson: BSONValue): Try[ProtectedHeader | Seq[SignProtectedHeader]] = { + bson match + case array: BSONArray => + val f = summon[BSONDocumentReader[SignProtectedHeader]] + array.values + .map(e => f.readTry(e)) + .foldLeft(Try(Seq.empty[SignProtectedHeader]))((acc, e) => + (acc, e) match + case (Failure(exception), _) => Failure(exception) + case (Success(seq), Failure(exception)) => Failure(exception) + case (Success(seq), Success(value)) => Success(seq :+ value) + ) + case obj: BSONDocument => summon[BSONDocumentReader[ProtectedHeader]].readTry(obj) + case _ => Failure(new RuntimeException("Must be a Document for a Array of Documents")) + } + } + given BSONDocumentWriter[MessageItem] = Macros.writer[MessageItem] given BSONDocumentReader[MessageItem] = Macros.reader[MessageItem] } @@ -79,7 +129,7 @@ object SentMessageItem { case sMsg: SignedMessage => new SentMessageItem( encrypt = msg, - hash = sMsg.sha1, // FIXME + hash = sMsg.sha256, headers = sMsg.signatures.headOption.flatMap(_.`protected`.obj.toJsonAST.toOption).getOrElse(ast.Json.Null), plaintext = plaintext, transport = Seq( @@ -95,7 +145,7 @@ object SentMessageItem { case eMsg: EncryptedMessage => new SentMessageItem( encrypt = msg, - hash = eMsg.sha1, + hash = eMsg.sha256, headers = eMsg.`protected`.obj.toJsonAST.getOrElse(ast.Json.Null), plaintext = plaintext, transport = Seq( diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala index 023e3ab7..b0e7ddc8 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/MessageItemRepo.scala @@ -1,7 +1,7 @@ package io.iohk.atala.mediator.db import fmgp.did.* -import fmgp.did.comm.EncryptedMessage +import fmgp.did.comm.{SignedMessage, EncryptedMessage} import io.iohk.atala.mediator.{DuplicateMessage, StorageCollection, StorageError, StorageThrowable} import reactivemongo.api.bson.* import reactivemongo.api.bson.collection.BSONCollection @@ -11,6 +11,7 @@ import zio.* import reactivemongo.core.errors.DatabaseException import scala.concurrent.ExecutionContext + object MessageItemRepo { def layer: ZLayer[ReactiveMongoApi, Throwable, MessageItemRepo] = ZLayer { @@ -27,7 +28,7 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon .map(_.collection(collectionName)) .mapError(ex => StorageCollection(ex)) - def insert(msg: EncryptedMessage): IO[StorageError, WriteResult] = { + def insert(msg: SignedMessage | EncryptedMessage): IO[StorageError, WriteResult] = { for { _ <- ZIO.logInfo("insert") xRequestId <- ZIO.logAnnotations.map(_.get(XRequestId.value)) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala b/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala index 47362a84..c2a239ad 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/db/UserAccountRepo.scala @@ -145,7 +145,7 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon */ def addToInboxes( recipients: Set[DIDSubject], - msg: EncryptedMessage + msg: SignedMessage | EncryptedMessage ): ZIO[Any, StorageError, Int] = { def selector = BSONDocument( @@ -156,7 +156,7 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon BSONDocument( "$elemMatch" -> BSONDocument( - "hash" -> msg.sha1, + "hash" -> msg.sha256, "recipient" -> BSONDocument("$in" -> recipients.map(_.did)) ) ) @@ -167,7 +167,7 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon "$push" -> BSONDocument( "messagesRef" -> BSONDocument( "$each" -> - recipients.map(recipient => MessageMetaData(msg.sha1, recipient, xRequestId)) + recipients.map(recipient => MessageMetaData(msg.sha256, recipient, xRequestId)) ) ) ) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala deleted file mode 100644 index 4f128d1b..00000000 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/BasicMessageExecuter.scala +++ /dev/null @@ -1,25 +0,0 @@ -package io.iohk.atala.mediator.protocols - -import fmgp.crypto.error.FailToParse -import fmgp.did.comm.{EncryptedMessage, PIURI, PlaintextMessage, SignedMessage} -import fmgp.did.comm.protocol.basicmessage2.BasicMessage -import io.iohk.atala.mediator.{MediatorDidError, MediatorError, MediatorThrowable, StorageError} -import io.iohk.atala.mediator.actions.{Action, NoReply, ProtocolExecuter} -import zio.{Console, ZIO} - -object BasicMessageExecuter extends ProtocolExecuter[Any, MediatorError] { - - override def supportedPIURI: Seq[PIURI] = Seq(BasicMessage.piuri) - - override def execute[R]( - plaintextMessage: PlaintextMessage - ): ZIO[R, MediatorError, Option[SignedMessage | EncryptedMessage]] = - program(plaintextMessage).debug *> ZIO.none - - override def program[R1 <: Any](plaintextMessage: PlaintextMessage): ZIO[R1, MediatorError, Action] = for { - job <- BasicMessage.fromPlaintextMessage(plaintextMessage) match - case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) - case Right(bm) => Console.printLine(bm.toString).mapError(ex => MediatorThrowable(ex)) - } yield NoReply - -} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuter.scala index 537eb7fc..f04f807a 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuter.scala @@ -1,20 +1,19 @@ package io.iohk.atala.mediator.protocols +import zio.ZIO + import fmgp.crypto.error.FailToParse import fmgp.did.Agent import fmgp.did.comm.{PIURI, PlaintextMessage} +import fmgp.did.comm.protocol._ import fmgp.did.comm.protocol.discoverfeatures2._ -import io.iohk.atala.mediator.{MediatorDidError, MediatorError} -import io.iohk.atala.mediator.actions.{Action, NoReply, ProtocolExecuter, ProtocolExecuterWithServices, Reply} -import zio.ZIO +import io.iohk.atala.mediator.{ProtocolExecutionFailToParse, MediatorError} -object DiscoverFeaturesExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services, MediatorError] { +object DiscoverFeaturesExecuter extends ProtocolExecuter[Agent, MediatorError] { override def supportedPIURI: Seq[PIURI] = Seq(FeatureQuery.piuri, FeatureDisclose.piuri) - override def program[R1 <: Agent]( - plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { + override def program(plaintextMessage: PlaintextMessage): ZIO[Agent, MediatorError, Action] = { // the val is from the match to be definitely stable val piuriFeatureQuery = FeatureQuery.piuri val piuriFeatureDisclose = FeatureDisclose.piuri @@ -25,7 +24,7 @@ object DiscoverFeaturesExecuter extends ProtocolExecuterWithServices[ProtocolExe ret <- plaintextMessage.toFeatureQuery match case Left(error) => ZIO.logError(s"Fail in FeatureQuery: $error") *> - ZIO.fail(MediatorDidError(FailToParse(error))) + ZIO.fail(ProtocolExecutionFailToParse(FailToParse(error))) case Right(featureQuery) => for { _ <- ZIO.logInfo(featureQuery.toString()) @@ -72,7 +71,7 @@ object DiscoverFeaturesExecuter extends ProtocolExecuterWithServices[ProtocolExe case `piuriFeatureDisclose` => for { _ <- plaintextMessage.toFeatureDisclose match - case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) + case Left(error) => ZIO.fail(ProtocolExecutionFailToParse(FailToParse(error))) case Right(featureDisclose) => ZIO.logInfo(featureDisclose.toString()) } yield NoReply } diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index 15275274..7ff5a554 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -5,24 +5,18 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.comm.protocol.routing2.* -import io.iohk.atala.mediator._ -import io.iohk.atala.mediator.actions.* +import fmgp.did.comm.protocol.reportproblem2.ProblemReport +import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* -import fmgp.did.comm.protocol.reportproblem2.ProblemReport object ForwardMessageExecuter - extends ProtocolExecuterWithServices[ - ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo, - ProtocolExecuter.Erros - ] { + extends ProtocolExecuter[Agent & UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { override def supportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) - override def program[R1 <: UserAccountRepo & MessageItemRepo & Agent]( - plaintextMessage: PlaintextMessage - ): ZIO[R1, ProtocolExecuter.Erros, Action] = { + override def program(plaintextMessage: PlaintextMessage) = { // the val is from the match to be definitely stable val piuriForwardMessage = ForwardMessage.piuri diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala index 0683829e..0c784228 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuter.scala @@ -7,18 +7,14 @@ import fmgp.did.comm.Operations.* import fmgp.did.comm.protocol.* import fmgp.did.comm.protocol.mediatorcoordination2.* import io.iohk.atala.mediator.* -import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.db.UserAccountRepo import io.iohk.atala.mediator.db.DidAccount import zio.* import zio.json.* +import io.iohk.atala.mediator.db.MessageItemRepo -object MediatorCoordinationExecuter - extends ProtocolExecuterWithServices[ - ProtocolExecuter.Services & UserAccountRepo, - ProtocolExecuter.Erros - ] { +object MediatorCoordinationExecuter extends ProtocolExecuter[Agent & UserAccountRepo, MediatorError | StorageError] { override def supportedPIURI: Seq[PIURI] = Seq( MediateRequest.piuri, @@ -30,9 +26,7 @@ object MediatorCoordinationExecuter Keylist.piuri, ) - override def program[R1 <: (UserAccountRepo)]( - plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError | StorageError, Action] = { + override def program(plaintextMessage: PlaintextMessage) = { // the val is from the match to be definitely stable val piuriMediateRequest = MediateRequest.piuri val piuriMediateGrant = MediateGrant.piuri @@ -54,7 +48,7 @@ object MediatorCoordinationExecuter case m: MediateGrant => ZIO.logWarning("MediateGrant") *> ZIO.succeed(NoReply) *> ZIO.succeed( - SyncReplyOnly( + Reply( Problems .unsupportedProtocolRole( from = m.to.asFROM, @@ -68,7 +62,7 @@ object MediatorCoordinationExecuter case m: MediateDeny => ZIO.logWarning("MediateDeny") *> ZIO.succeed(NoReply) *> ZIO.succeed( - SyncReplyOnly( + Reply( Problems .unsupportedProtocolRole( from = m.to.asFROM, @@ -90,7 +84,7 @@ object MediatorCoordinationExecuter case Right(value) => ZIO.log(s"MediateGrant: $value") *> ZIO.succeed(m.makeRespondMediateGrant.toPlaintextMessage) - } yield SyncReplyOnly(reply) + } yield Reply(reply) case m: KeylistUpdate => for { _ <- ZIO.logInfo("KeylistUpdate") @@ -130,11 +124,11 @@ object MediatorCoordinationExecuter result <- ZIO.succeed(m.makeKeylistResponse(updateResponse).toPlaintextMessage) } yield result - } yield SyncReplyOnly(res) + } yield Reply(res) case m: KeylistResponse => ZIO.logWarning("KeylistResponse") *> ZIO.succeed(NoReply) *> ZIO.succeed( - SyncReplyOnly( + Reply( Problems .unsupportedProtocolRole( from = m.to.asFROM, @@ -161,7 +155,7 @@ object MediatorCoordinationExecuter } } yield mResponse match case None => NoReply // TODO error report - case Some(response) => SyncReplyOnly(response.toPlaintextMessage) + case Some(response) => Reply(response.toPlaintextMessage) case m: Keylist => ZIO.logWarning("Keylist") *> ZIO.succeed(NoReply) } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) // TODO error report diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala index 4656c181..56f2b045 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/MissingProtocolExecuter.scala @@ -4,16 +4,15 @@ import zio.ZIO import fmgp.did.* import fmgp.did.comm.PlaintextMessage +import fmgp.did.comm.protocol.Reply import io.iohk.atala.mediator.MissingProtocolError -import io.iohk.atala.mediator.actions.ProtocolExecuter -import io.iohk.atala.mediator.actions.Reply -import io.iohk.atala.mediator.actions.ProtocolExecuterWithServices import io.iohk.atala.mediator.MediatorError +import fmgp.did.comm.protocol.ProtocolExecuter -case class MissingProtocolExecuter() extends ProtocolExecuterWithServices[ProtocolExecuter.Services, MediatorError] { +object MissingProtocolExecuter extends ProtocolExecuter[Agent, Nothing] { override def supportedPIURI = Seq() - override def program[R1 <: Agent](plaintextMessage: PlaintextMessage) = + override def program(plaintextMessage: PlaintextMessage) = ZIO .service[Agent] .map(agent => diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala deleted file mode 100644 index 65c211b3..00000000 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/NullProtocolExecuter.scala +++ /dev/null @@ -1,21 +0,0 @@ -package io.iohk.atala.mediator.protocols - -import fmgp.did.comm.PlaintextMessage -import io.iohk.atala.mediator.MissingProtocolError -import io.iohk.atala.mediator.actions.ProtocolExecuter -import zio.ZIO -import fmgp.did.comm.SignedMessage -import fmgp.did.comm.EncryptedMessage - -object NullProtocolExecuter extends ProtocolExecuter[Any, MissingProtocolError] { - - override def supportedPIURI = Seq() - - override def execute[Any]( - plaintextMessage: PlaintextMessage - ): ZIO[Any, MissingProtocolError, Option[SignedMessage | EncryptedMessage]] = - program(plaintextMessage).debug *> ZIO.none - - override def program[R1 <: Any](plaintextMessage: PlaintextMessage) = - ZIO.fail(MissingProtocolError(plaintextMessage.`type`)) -} diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 38eb77bc..6ade8c30 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -7,15 +7,11 @@ import fmgp.did.comm.Operations.* import fmgp.did.comm.protocol.* import fmgp.did.comm.protocol.pickup3.* import io.iohk.atala.mediator.* -import io.iohk.atala.mediator.actions.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* -object PickupExecuter - extends ProtocolExecuterWithServices[ - ProtocolExecuter.Services & UserAccountRepo & MessageItemRepo, - ProtocolExecuter.Erros - ] { + +object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { override def supportedPIURI: Seq[PIURI] = Seq( StatusRequest.piuri, @@ -26,9 +22,7 @@ object PickupExecuter LiveModeChange.piuri, ) - override def program[R1 <: UserAccountRepo & MessageItemRepo]( - plaintextMessage: PlaintextMessage - ): ZIO[R1, StorageError, Action] = { + override def program(plaintextMessage: PlaintextMessage) = { // the val is from the match to be definitely stable val piuriStatusRequest = StatusRequest.piuri val piuriStatus = Status.piuri @@ -76,11 +70,11 @@ object PickupExecuter total_bytes = None, // TODO live_delivery = None, // TODO ).toPlaintextMessage - } yield SyncReplyOnly(ret) + } yield Reply(ret) case m: Status => ZIO.logInfo("Status") *> ZIO.succeed( - SyncReplyOnly( + Reply( Problems .unsupportedProtocolRole( from = m.to.asFROM, @@ -134,10 +128,19 @@ object PickupExecuter messagesToReturn = if (m.recipient_did.isEmpty) allMessagesFor else { - allMessagesFor.filterNot( - _.msg.recipientsSubject - .map(_.did) - .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + allMessagesFor.filterNot(item => + item.msg match { + case sMsg: SignedMessage => + sMsg.payloadAsPlaintextMessage + .map(_.to.toSeq.flatMap(i => i)) + .getOrElse(Seq.empty) + .map(_.toDID) // All Recipient Of The Message + .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + case eMsg: EncryptedMessage => + eMsg.recipientsSubject + .map(_.did) + .forall(e => !m.recipient_did.map(_.toDID.did).contains(e)) + } ) } } yield MessageDelivery( @@ -149,7 +152,7 @@ object PickupExecuter ).toPlaintextMessage } - } yield SyncReplyOnly(ret) + } yield Reply(ret) case m: MessageDelivery => ZIO.logInfo("MessageDelivery") *> ZIO.succeed( @@ -175,7 +178,7 @@ object PickupExecuter case m: LiveModeChange => ZIO.logInfo("LiveModeChange Not Supported") *> ZIO.succeed( - SyncReplyOnly( + Reply( Problems .liveModeNotSupported( from = m.to.asFROM, diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala index 97c603f1..69054a38 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/Problems.scala @@ -15,7 +15,6 @@ object Problems { pthid: MsgID, piuri: PIURI, ) = ProblemReport( - // id: MsgID = MsgID(), to = to, from = from, pthid = pthid, @@ -32,7 +31,6 @@ object Problems { pthid: MsgID, piuri: PIURI, ) = ProblemReport( - // id: MsgID = MsgID(), to = Set(to), from = from, pthid = pthid, @@ -49,7 +47,6 @@ object Problems { pthid: MsgID, piuri: PIURI, ) = ProblemReport( - // id: MsgID = MsgID(), to = Set(to), from = from, pthid = pthid, @@ -66,7 +63,6 @@ object Problems { pthid: MsgID, piuri: PIURI, ) = ProblemReport( - // id: MsgID = MsgID(), to = Set(to), from = from, pthid = pthid, @@ -83,7 +79,6 @@ object Problems { pthid: MsgID, piuri: PIURI, ) = ProblemReport( - // id: MsgID = MsgID(), to = to, from = from, pthid = pthid, @@ -100,7 +95,6 @@ object Problems { pthid: MsgID, piuri: PIURI, ) = ProblemReport( - // id: MsgID = MsgID(), to = to, from = from, pthid = pthid, @@ -118,7 +112,6 @@ object Problems { piuri: PIURI, didNotEnrolled: DIDSubject ) = ProblemReport( - // id: MsgID = MsgID(), to = to.toSet, from = from, pthid = pthid, @@ -134,14 +127,28 @@ object Problems { from: FROM, pthid: MsgID, piuri: PIURI, + comment: String ) = ProblemReport( - // id: MsgID = MsgID(), to = to, from = from, pthid = pthid, ack = None, code = ProblemCode.ErroFail("msg", piuri.value), - comment = None, + comment = Some(comment), + args = None, + escalate_to = email, + ) + + def decryptFail( + from: FROM, + comment: String + ) = ProblemReport( + to = Set.empty, + from = from, + pthid = MsgID("?"), // TODO + ack = None, + code = ProblemCode.ErroFail("msg"), + comment = Some(comment), args = None, escalate_to = email, ) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala deleted file mode 100644 index 0b9b2772..00000000 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/TrustPingExecuter.scala +++ /dev/null @@ -1,46 +0,0 @@ -package io.iohk.atala.mediator.protocols - -import fmgp.crypto.error.FailToParse -import fmgp.did.Agent -import fmgp.did.comm.{PIURI, PlaintextMessage} -import fmgp.did.comm.protocol.trustping2.{ - TrustPing, - TrustPingResponse, - TrustPingWithOutRequestedResponse, - TrustPingWithRequestedResponse -} -import io.iohk.atala.mediator.{MediatorDidError, MediatorError} -import io.iohk.atala.mediator.actions.{Action, NoReply, ProtocolExecuter, ProtocolExecuterWithServices, Reply} -import zio.ZIO - -class TrustPingExecuter extends ProtocolExecuterWithServices[ProtocolExecuter.Services, MediatorError] { - - override def supportedPIURI: Seq[PIURI] = Seq(TrustPing.piuri, TrustPingResponse.piuri) - - override def program[R1 <: Agent]( - plaintextMessage: PlaintextMessage - ): ZIO[R1, MediatorError, Action] = { - // the val is from the match to be definitely stable - val piuriTrustPing = TrustPing.piuri - val piuriTrustPingResponse = TrustPingResponse.piuri - - plaintextMessage.`type` match - case `piuriTrustPing` => - TrustPing.fromPlaintextMessage(plaintextMessage) match - case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) - case Right(ping: TrustPingWithOutRequestedResponse) => ZIO.logInfo(ping.toString()) *> ZIO.succeed(NoReply) - case Right(ping: TrustPingWithRequestedResponse) => - for { - _ <- ZIO.logInfo(ping.toString()) - agent <- ZIO.service[Agent] - ret = ping.makeRespond - } yield Reply(ret.toPlaintextMessage) - case `piuriTrustPingResponse` => - for { - job <- TrustPingResponse.fromPlaintextMessage(plaintextMessage) match - case Left(error) => ZIO.fail(MediatorDidError(FailToParse(error))) - case Right(ping) => ZIO.logInfo(ping.toString()) - } yield NoReply - } - -} diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/db/AgentStub.scala b/mediator/src/test/scala/io/iohk/atala/mediator/db/AgentStub.scala index d2e3bf99..288ecd1f 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/db/AgentStub.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/db/AgentStub.scala @@ -3,7 +3,7 @@ import fmgp.crypto.{Curve, KTY, OKPPrivateKey} import fmgp.did.Agent import fmgp.did.comm.EncryptedMessage import fmgp.did.method.peer.{DIDPeer2, DIDPeerServiceEncoded} -import io.iohk.atala.mediator.app.{MediatorAgent, MediatorConfig} +import io.iohk.atala.mediator.{MediatorAgent, MediatorConfig} import zio.{ULayer, ZLayer} import java.net.URI @@ -16,9 +16,9 @@ object AgentStub { def keyAuthentication(d: String, x: String): OKPPrivateKey = OKPPrivateKey(kty = KTY.OKP, crv = Curve.Ed25519, d = d, x = x, kid = None) - val endpoint = new URI("http://localhost:8080") + val endpoints = "http://localhost:8080" val mediatorConfig = MediatorConfig( - endpoint, + endpoints, keyAgreement("Z6D8LduZgZ6LnrOHPrMTS6uU2u5Btsrk1SGs4fn8M7c", "Sr4SkIskjN_VdKTn0zkjYbhGTWArdUNE4j_DmUpnQGw"), keyAuthentication("INXCnxFEl0atLIIQYruHzGd5sUivMRyQOzu87qVerug", "MBjnXZxkMcoQVVL21hahWAw43RuAG-i64ipbeKKqwoA") ) diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala index 0840452c..06ccab53 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/db/MessageItemRepoSpec.scala @@ -42,11 +42,11 @@ object MessageItemRepoSpec extends ZIOSpecDefault with DidAccountStubSetup { for { messageItem <- ZIO.service[MessageItemRepo] msg <- ZIO.fromEither(encryptedMessageAlice) - result <- messageItem.findById(msg.sha1) + result <- messageItem.findById(msg.sha256) } yield { val outcome = result.forall { messageItem => messageItem.msg == msg && - messageItem._id == msg.sha1 && + messageItem._id == msg.sha256 && messageItem.xRequestId.contains("b373423c-c78f-4cbc-a3fe-89cbc1351835") } assertTrue(outcome) @@ -59,11 +59,11 @@ object MessageItemRepoSpec extends ZIOSpecDefault with DidAccountStubSetup { msg <- ZIO.fromEither(encryptedMessageAlice) msg2 <- ZIO.fromEither(encryptedMessageBob) msg2Added <- messageItem.insert(msg2) - result <- messageItem.findByIds(Seq(msg.sha1, msg2.sha1)) + result <- messageItem.findByIds(Seq(msg.sha256, msg2.sha256)) } yield { val outcome = result.forall { messageItem => Seq(msg, msg2).contains(messageItem.msg) && - Seq(msg.sha1, msg2.sha1).contains(messageItem._id) && + Seq(msg.sha256, msg2.sha256).contains(messageItem._id) && messageItem.xRequestId.contains("b373423c-c78f-4cbc-a3fe-89cbc1351835") } assertTrue(outcome) diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala index 3a4eb9db..28a9cbff 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/db/UserAccountRepoSpec.scala @@ -144,7 +144,7 @@ object UserAccountRepoSpec extends ZIOSpecDefault with DidAccountStubSetup with assertTrue(addedToInbox == 1) && assert(messageMetaData)( forall( - hasField("hash", (m: MessageMetaData) => m.hash, equalTo(msg.sha1)) + hasField("hash", (m: MessageMetaData) => m.hash, equalTo(msg.sha256)) && hasField("recipient", (m: MessageMetaData) => m.recipient, equalTo(bob)) && hasField("xRequestId", (m: MessageMetaData) => m.xRequestId, equalTo(Some(xRequestId))) ) @@ -156,7 +156,7 @@ object UserAccountRepoSpec extends ZIOSpecDefault with DidAccountStubSetup with for { userAccount <- ZIO.service[UserAccountRepo] msg <- ZIO.fromEither(encryptedMessageAlice) - markedDelivered <- userAccount.markAsDelivered(alice, Seq(msg.sha1)) + markedDelivered <- userAccount.markAsDelivered(alice, Seq(msg.sha256)) didAccount <- userAccount.getDidAccount(alice) } yield { val messageMetaData: Seq[MessageMetaData] = didAccount.map(_.messagesRef).getOrElse(Seq.empty) diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuterSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuterSpec.scala index 40170e7d..7537f91c 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuterSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/DiscoverFeaturesExecuterSpec.scala @@ -3,10 +3,10 @@ package io.iohk.atala.mediator.protocols import fmgp.did.DIDSubject import fmgp.did.comm.protocol.reportproblem2.{ProblemCode, ProblemReport} import fmgp.did.comm.protocol.discoverfeatures2.* +import fmgp.did.comm.protocol.* import fmgp.did.comm.{EncryptedMessage, Operations, PlaintextMessage, SignedMessage, layerDefault} import fmgp.did.method.peer.DidPeerResolver import fmgp.util.Base64 -import io.iohk.atala.mediator.comm.MessageDispatcherJVM import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.db.MessageItemRepoSpec.encryptedMessageAlice import io.iohk.atala.mediator.protocols.DiscoverFeaturesExecuter @@ -24,8 +24,10 @@ import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import reactivemongo.api.bson.BSONDocument import fmgp.did.DIDSubject.* import fmgp.did.comm.Operations.authDecrypt -import io.iohk.atala.mediator.app.MediatorAgent +import io.iohk.atala.mediator.MediatorAgent import io.iohk.atala.mediator.db.AgentStub.{bobAgent, bobAgentLayer} + +/** mediator/testOnly io.iohk.atala.mediator.protocols.DiscoverFeaturesExecuterSpec */ object DiscoverFeaturesExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with MessageSetup { override def spec = suite("DiscoverFeaturesExecuterSpec")( @@ -34,15 +36,16 @@ object DiscoverFeaturesExecuterSpec extends ZIOSpecDefault with DidAccountStubSe for { agent <- ZIO.service[MediatorAgent] msg <- ZIO.fromEither(plaintextDiscoverFeatureRequestMessage(bobAgent.id.did, agent.id.did)) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(bobAgentLayer) - featureDisclose <- ZIO.fromEither(decryptedMessage.asInstanceOf[PlaintextMessage].toFeatureDisclose) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == FeatureDisclose.piuri) && - assertTrue(featureDisclose.disclosures.nonEmpty) && - assertTrue(featureDisclose.disclosures.head.id.contains("routing")) + action match + case reply: AnyReply => + reply.msg.toFeatureDisclose match + case Left(value) => assertTrue(false) + case Right(featureDisclose) => + assertTrue(featureDisclose.disclosures.nonEmpty) && + assertTrue(featureDisclose.disclosures.head.id.contains("routing")) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test( @@ -52,21 +55,19 @@ object DiscoverFeaturesExecuterSpec extends ZIOSpecDefault with DidAccountStubSe for { agent <- ZIO.service[MediatorAgent] msg <- ZIO.fromEither(plaintextDiscoverFeatureRequestMessageNoMatch(bobAgent.id.did, agent.id.did)) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(bobAgentLayer) - featureDisclose <- ZIO.fromEither(decryptedMessage.asInstanceOf[PlaintextMessage].toFeatureDisclose) - + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == FeatureDisclose.piuri) && - assertTrue(featureDisclose.disclosures.isEmpty) - + action match + case reply: AnyReply => + reply.msg.toFeatureDisclose match + case Left(value) => assertTrue(false) + case Right(featureDisclose) => + assertTrue(featureDisclose.disclosures.isEmpty) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) - .provideSomeLayer(Scope.default >>> Client.default >>> MessageDispatcherJVM.layer) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) .provideLayerShared(dataAccessLayer) @@ TestAspect.sequential diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala index da85e4db..f123364b 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala @@ -1,10 +1,10 @@ package io.iohk.atala.mediator.protocols -import fmgp.did.comm.protocol.reportproblem2.{ProblemCode, ProblemReport} +import fmgp.did.comm.protocol.reportproblem2.* +import fmgp.did.comm.protocol.* import fmgp.did.comm.{EncryptedMessage, Operations, PlaintextMessage, SignedMessage, layerDefault} import fmgp.did.method.peer.DidPeerResolver import fmgp.util.Base64 -import io.iohk.atala.mediator.comm.MessageDispatcherJVM import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.db.MessageItemRepoSpec.encryptedMessageAlice import io.iohk.atala.mediator.protocols.ForwardMessageExecuter @@ -19,6 +19,8 @@ import fmgp.did.DIDSubject import scala.concurrent.ExecutionContext.Implicits.global import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import reactivemongo.api.bson.BSONDocument + +/** mediator/testOnly io.iohk.atala.mediator.protocols.ForwardMessageExecutorSpec */ object ForwardMessageExecutorSpec extends ZIOSpecDefault with DidAccountStubSetup with MessageSetup { override def spec = suite("ForwardMessageExecutorSpec")( @@ -28,23 +30,17 @@ object ForwardMessageExecutorSpec extends ZIOSpecDefault with DidAccountStubSetu userAccount <- ZIO.service[UserAccountRepo] result <- userAccount.createOrFindDidAccount(alice) msg <- ZIO.fromEither(plaintextForwardNotEnrolledDidMessage) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) + action <- executer.program(msg) } yield { - assertTrue(message.isInstanceOf[SignedMessage]) - val signedMessage = message.asInstanceOf[SignedMessage] - val jsonString = Base64.fromBase64url(signedMessage.payload.base64url).decodeToString - val problemReport = jsonString.fromJson[PlaintextMessage].flatMap(ProblemReport.fromPlaintextMessage) - assert(problemReport)( - isRight( - hasField("code", (p: ProblemReport) => p.code, equalTo(ProblemCode.ErroFail("req", "not_enroll"))) && - hasField( - "from", - (p: ProblemReport) => p.from, - equalTo(alice) + action match + case reply: AnyReply => + assert(reply.msg.toProblemReport)( + isRight( + hasField("code", (p: ProblemReport) => p.code, equalTo(ProblemCode.ErroFail("req", "not_enroll"))) && + hasField("from", (p: ProblemReport) => p.from, equalTo(alice)) ) - ) - ) + ) + case NoReply => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("Forward message for enrolled DID receives NoReply") { @@ -54,13 +50,14 @@ object ForwardMessageExecutorSpec extends ZIOSpecDefault with DidAccountStubSetu result <- userAccount.createOrFindDidAccount(alice) result <- userAccount.addAlias(owner = alice, newAlias = alice) msg <- ZIO.fromEither(plaintextForwardEnrolledDidMessage) - result <- executer.execute(msg) - } yield assertTrue(result.isEmpty) + action <- executer.program(msg) + } yield action match + case reply: AnyReply => assertTrue(false) + case NoReply => assertTrue(true) } @@ TestAspect.before(setupAndClean) ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) - .provideSomeLayer(Scope.default >>> Client.default >>> MessageDispatcherJVM.layer) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) .provideLayerShared(dataAccessLayer) @@ TestAspect.sequential diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuterSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuterSpec.scala index abd8b192..e90780ce 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuterSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MediatorCoordinationExecuterSpec.scala @@ -1,10 +1,10 @@ package io.iohk.atala.mediator.protocols -import fmgp.did.comm.protocol.reportproblem2.{ProblemCode, ProblemReport} +import fmgp.did.comm.protocol.reportproblem2.* +import fmgp.did.comm.protocol.* import fmgp.did.comm.{EncryptedMessage, Operations, PlaintextMessage, SignedMessage, layerDefault} import fmgp.did.method.peer.DidPeerResolver import fmgp.util.Base64 -import io.iohk.atala.mediator.comm.MessageDispatcherJVM import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.db.MessageItemRepoSpec.encryptedMessageAlice import io.iohk.atala.mediator.protocols.* @@ -15,7 +15,7 @@ import zio.json.* import zio.test.* import zio.test.Assertion.* import fmgp.did.{Agent, DIDSubject} -import io.iohk.atala.mediator.app.MediatorAgent +import io.iohk.atala.mediator.MediatorAgent import scala.concurrent.ExecutionContext.Implicits.global import reactivemongo.api.indexes.{Index, IndexType} @@ -24,6 +24,8 @@ import Operations.* import fmgp.did.comm.protocol.mediatorcoordination2.* import io.iohk.atala.mediator.db.AgentStub.* import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* + +/** mediator/testOnly io.iohk.atala.mediator.protocols.MediatorCoordinationExecuterSpec */ object MediatorCoordinationExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with MessageSetup { override def spec = { @@ -34,12 +36,11 @@ object MediatorCoordinationExecuterSpec extends ZIOSpecDefault with DidAccountSt for { agent <- ZIO.service[MediatorAgent] msg <- ZIO.fromEither(plaintextMediationRequestMessage(bobAgent.id.did, agent.id.did)) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(bobAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == MediateGrant.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == MediateGrant.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("MediationRequest message for already used alias did should get mediation deny") { @@ -50,12 +51,11 @@ object MediatorCoordinationExecuterSpec extends ZIOSpecDefault with DidAccountSt result <- userAccount.createOrFindDidAccount(alice) result <- userAccount.addAlias(owner = alice, newAlias = DIDSubject(aliceAgent.id.did)) msg <- ZIO.fromEither(plaintextMediationRequestMessage(aliceAgent.id.did, agent.id.did)) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == MediateDeny.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == MediateDeny.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("KeyList Update message Request should add alias and return keyList Response") { @@ -67,12 +67,11 @@ object MediatorCoordinationExecuterSpec extends ZIOSpecDefault with DidAccountSt msg <- ZIO.fromEither( plaintextKeyListUpdateRequestMessage(aliceAgent.id.did, agent.id.did, aliceAgent.id.did) ) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == KeylistResponse.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == KeylistResponse.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("KeyList remove alias message Request should remove alias and return keyList Response") { @@ -85,12 +84,11 @@ object MediatorCoordinationExecuterSpec extends ZIOSpecDefault with DidAccountSt msg <- ZIO.fromEither( plaintextKeyListRemoveAliasRequestMessage(aliceAgent.id.did, agent.id.did, bobAgent.id.did) ) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == KeylistResponse.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == KeylistResponse.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("KeyList remove alias non existing didAccount Request should return problem report") { @@ -102,28 +100,25 @@ object MediatorCoordinationExecuterSpec extends ZIOSpecDefault with DidAccountSt msg <- ZIO.fromEither( plaintextKeyListRemoveAliasRequestMessage(bobAgent.id.did, agent.id.did, bobAgent.id.did) ) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(bobAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - val problemReport = ProblemReport.fromPlaintextMessage(plainText) - assert(problemReport)( - isRight( - hasField("code", (p: ProblemReport) => p.code, equalTo(ProblemCode.ErroFail("req", "not_enroll"))) && - hasField( - "from", - (p: ProblemReport) => p.from, - equalTo(agent.id.did) + action match + case reply: AnyReply => + assert(reply.msg.toProblemReport)( + isRight( + hasField("code", (p: ProblemReport) => p.code, equalTo(ProblemCode.ErroFail("req", "not_enroll"))) && + hasField( + "from", + (p: ProblemReport) => p.from, + equalTo(agent.id.did) + ) ) - ) - ) && assertTrue(plainText.`type` == ProblemReport.piuri) - + ) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean) ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) - .provideSomeLayer(Scope.default >>> Client.default >>> MessageDispatcherJVM.layer) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) .provideLayerShared(dataAccessLayer) @@ TestAspect.sequential diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala index fb124a27..2b8a5ff6 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala @@ -8,8 +8,7 @@ import fmgp.did.comm.protocol.pickup3.StatusRequest import fmgp.did.method.peer.DidPeerResolver import fmgp.did.{Agent, DIDSubject} import fmgp.util.Base64 -import io.iohk.atala.mediator.app.MediatorAgent -import io.iohk.atala.mediator.comm.MessageDispatcherJVM +import io.iohk.atala.mediator.MediatorAgent import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.db.AgentStub.* import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* @@ -25,13 +24,15 @@ import zio.test.* import zio.test.Assertion.* import scala.concurrent.ExecutionContext.Implicits.global +import fmgp.did.comm.protocol.* +/** mediator/testOnly io.iohk.atala.mediator.protocols.PickupExecuterSpec */ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with MessageSetup { override def spec = { suite("PickupExecuterSpec")( - test("Pickup Status message should return ProblemReport") { + test("Pickup Status message should return ProblemReport") { val executer = PickupExecuter for { mediatorAgent <- ZIO.service[MediatorAgent] @@ -42,29 +43,27 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M newAlias = DIDSubject(aliceAgent.id.did) ) msg <- ZIO.fromEither(plaintextStatusMessage(aliceAgent.id.did, mediatorAgent.id.did)) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == ProblemReport.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), - test("Pickup StatusRequest message should return problem report for not enrolled did") { + test("Pickup StatusRequest message should return problem report for not enrolled did") { val executer = PickupExecuter for { mediatorAgent <- ZIO.service[MediatorAgent] userAccount <- ZIO.service[UserAccountRepo] msg <- ZIO.fromEither(plaintextStatusRequestMessage(aliceAgent.id.did, mediatorAgent.id.did)) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == ProblemReport.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), - test("Pickup StatusRequest message should return Status Message") { + test("Pickup StatusRequest message should return Status Message") { val executer = PickupExecuter for { mediatorAgent <- ZIO.service[MediatorAgent] @@ -75,12 +74,11 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M newAlias = DIDSubject(aliceAgent.id.did) ) msg <- ZIO.fromEither(plaintextStatusRequestMessage(aliceAgent.id.did, mediatorAgent.id.did)) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == Status.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("Pickup DeliveryRequest message return MessageDelivery and attachment message") { @@ -101,16 +99,16 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M msgForward <- ZIO.fromEither( plaintextForwardMessage(aliceAgent.id.did, mediatorAgent.id.did, encryptedBasicMessage.toJson) ) - result <- forwardMessageExecuter.execute(msgForward) + _ <- forwardMessageExecuter.program(msgForward) msg <- ZIO.fromEither( plaintextDeliveryRequestMessage(aliceAgent.id.did, mediatorAgent.id.did, aliceAgent.id.did) ) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- executer.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == MessageDelivery.piuri) && assertTrue(plainText.attachments.nonEmpty) + action match + case reply: AnyReply => + assertTrue(reply.msg.`type` == MessageDelivery.piuri) && assertTrue(reply.msg.attachments.nonEmpty) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("Delivery Request message for Pickup returns a Status Message when there are no messages available") { @@ -126,12 +124,11 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M msg <- ZIO.fromEither( plaintextDeliveryRequestMessage(aliceAgent.id.did, mediatorAgent.id.did, aliceAgent.id.did) ) - result <- pickupExecuter.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) + action <- pickupExecuter.program(msg) } yield { - val plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - assertTrue(plainText.`type` == Status.piuri) + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) + case _ => assertTrue(false) } } @@ TestAspect.before(setupAndClean), test("Messages Received message should clear the messages from the queue") { @@ -152,26 +149,25 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M msgForward <- ZIO.fromEither( plaintextForwardMessage(aliceAgent.id.did, mediatorAgent.id.did, encryptedBasicMessage.toJson) ) - result <- forwardMessageExecuter.execute(msgForward) + _ <- forwardMessageExecuter.program(msgForward) msg <- ZIO.fromEither( plaintextDeliveryRequestMessage(aliceAgent.id.did, mediatorAgent.id.did, aliceAgent.id.did) ) - result <- executer.execute(msg) - message <- ZIO.fromOption(result) - decryptedMessage <- authDecrypt(message.asInstanceOf[EncryptedMessage]).provideSomeLayer(aliceAgentLayer) - plainText = decryptedMessage.asInstanceOf[PlaintextMessage] - attchmentID = plainText.attachments.map(_.flatMap(_.id).head).get + action1 <- executer.program(msg) + // plainText = decryptedMessage.asInstanceOf[PlaintextMessage] + attchmentID = action1.asInstanceOf[AnyReply].msg.attachments.map(_.flatMap(_.id).head).get messagesReceived <- ZIO.fromEither( plaintextMessagesReceivedRequestMessage(aliceAgent.id.did, mediatorAgent.id.did, attchmentID) ) - result <- executer.execute(messagesReceived) + action2 <- executer.program(messagesReceived) } yield { - assertTrue(result.isEmpty) + action2 match + case reply: AnyReply => assertTrue(false) + case NoReply => assertTrue(true) } } @@ TestAspect.before(setupAndClean) ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) - .provideSomeLayer(Scope.default >>> Client.default >>> MessageDispatcherJVM.layer) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) .provideLayerShared(dataAccessLayer) @@ TestAspect.sequential diff --git a/webapp/src/main/scala/io/iohk/atala/mediator/MediatorInfo.scala b/webapp/src/main/scala/io/iohk/atala/mediator/MediatorInfo.scala index b5e46041..d579cece 100644 --- a/webapp/src/main/scala/io/iohk/atala/mediator/MediatorInfo.scala +++ b/webapp/src/main/scala/io/iohk/atala/mediator/MediatorInfo.scala @@ -20,7 +20,7 @@ object MediatorInfo { val host = dom.window.location.host val scheme = dom.window.location.protocol val fullPath = s"${scheme}//${host}" - val qrCodeData = OutOfBandPlaintext.from(invitation.toPlaintextMessage).makeURI(s"$fullPath") + val qrCodeData = OutOfBand.from(invitation.toPlaintextMessage).makeURI(s"$fullPath") val divQRCode = div() {