From 3ec8c3c2daee3d7cb661139d0e3afac30a38bb87 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Mon, 27 Nov 2023 18:24:44 +0000 Subject: [PATCH 1/4] Pickup Protocol - Support for Live Mode #145 --- Mediator-Error_Handling.md | 32 ++++---- .../mediator/AgentExecutorMediator.scala | 15 +++- .../atala/mediator/MediatorStandalone.scala | 6 +- .../io/iohk/atala/mediator/OperatorImp.scala | 3 +- .../protocols/ForwardMessageExecuter.scala | 25 +++++- .../mediator/protocols/PickupExecuter.scala | 76 +++++++++++++++---- .../ForwardMessageExecutorSpec.scala | 7 +- .../protocols/PickupExecuterSpec.scala | 11 ++- 8 files changed, 132 insertions(+), 43 deletions(-) diff --git a/Mediator-Error_Handling.md b/Mediator-Error_Handling.md index 8be52319..035c150e 100644 --- a/Mediator-Error_Handling.md +++ b/Mediator-Error_Handling.md @@ -12,22 +12,22 @@ https://identity.foundation/didcomm-messaging/spec/#problem-reports This table defines the expected behavior of the mediator in different scenarios not covered by the specifications. -| Mediators | Atala Mediator | Roadmap Atala Mediator | │ | RootsId | Blocktrust | -|-------------|----------------|------------------------|---|---------|------------| -| Scenario G1 | G1C | - | │ | ? | ? | -| Scenario G2 | G2B [ATL-5840] | - | │ | | ? | -| Scenario G3 | Fallback G2B | [TODO] G3B | │ | | ? | -| Scenario G4 | G4B | - | │ | | ? | -| Scenario G5 | Fallback G4B | [TODO] G5B | │ | | ? | -| Scenario G6 | Fallback G4B | [WIP] G6B | │ | | ? | -| Scenario G7 | Fallback G4B | [TODO] G7B | │ | | ? | -| Scenario G8 | G8C | - | │ | | ? | -| | | | | | | -| Scenario M1 | M1B | - | │ | | ? | -| Scenario M2 | M2B | - | │ | | ? | -| Scenario M3 | Fallback G4 | M3B | │ | | ? | -| Scenario M4 | M4B | - | │ | | ? | -| Scenario M5 | M5A | [TODO] M5B | │ | | ? | +| Mediators | Atala Mediator | Roadmap Atala Mediator | +|-------------|----------------|------------------------| +| Scenario G1 | G1C | - | +| Scenario G2 | G2B [ATL-5840] | - | +| Scenario G3 | Fallback G2B | [TODO] G3B | +| Scenario G4 | G4B | - | +| Scenario G5 | Fallback G4B | [TODO] G5B | +| Scenario G6 | Fallback G4B | G6B | +| Scenario G7 | Fallback G4B | [TODO] G7B | +| Scenario G8 | G8C | - | +| | | | +| Scenario M1 | M1B | - | +| Scenario M2 | M2B | - | +| Scenario M3 | Fallback G4 | M3B | +| Scenario M4 | M4B | - | +| Scenario M5 | M5A | M5B [#145] | ### Scenarios Description diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala index c842b1c0..2e4976cc 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala @@ -16,7 +16,7 @@ import fmgp.did.comm.protocol.reportproblem2.ProblemReport case class AgentExecutorMediator( agent: Agent, - transportManager: Ref[TransportManager], + transportManager: Ref[MediatorTransportManager], protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], userAccountRepo: UserAccountRepo, messageItemRepo: MessageItemRepo, @@ -54,7 +54,9 @@ case class AgentExecutorMediator( .tapError(ex => ZIO.logError(ex.toString)) .provideSomeLayer(this.indentityLayer) .provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer) - .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => e ++ ZEnvironment(protocolHandler)) + .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => + e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager) ++ ZEnvironment(transport) + ) .orDieWith(ex => new RuntimeException(ex.toString)) def receiveMessage( @@ -175,7 +177,12 @@ case class AgentExecutorMediator( case Some(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage)) case None => protocolHandler - .program(plaintextMessage) + .program(plaintextMessage) // should we change the signature of the method or use the ZEnvironment + .provideSomeEnvironment( + (e: ZEnvironment[ + Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] + ]) => e ++ ZEnvironment(transport) + ) .catchSome { case ProtocolExecutionFailToParse(failToParse) => for { _ <- ZIO.logWarning(s"Error ProtocolExecutionFailToParse: $failToParse") @@ -246,7 +253,7 @@ object AgentExecutorMediator { messageItemRepo: MessageItemRepo, ): ZIO[TransportFactory, Nothing, AgentExecutar] = for { - transportManager <- TransportManager.make + transportManager <- MediatorTransportManager.make mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo) } yield mediator diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala index 2e59d326..c764f7e8 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorStandalone.scala @@ -6,22 +6,22 @@ import fmgp.did.* import fmgp.did.comm.* import fmgp.did.comm.protocol.* import fmgp.did.method.peer.* +import fmgp.did.framework.TransportFactoryImp import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.protocols.* import zio.* +import zio.stream.* import zio.config.* import zio.config.magnolia.* import zio.config.typesafe.* import zio.http.* import zio.json.* +import zio.logging.* import zio.logging.LogFormat.* import zio.logging.backend.SLF4J -import zio.logging.* -import zio.stream.* import java.time.format.DateTimeFormatter import scala.io.Source -import fmgp.did.framework.TransportFactoryImp case class MediatorConfig(endpoints: String, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) { val did = DIDPeer2.makeAgent( Seq(keyAgreement, keyAuthentication), diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala index 806cdcf4..9c47e96b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/OperatorImp.scala @@ -12,7 +12,8 @@ import io.iohk.atala.mediator.protocols.* import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo} object OperatorImp { - type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo + type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] & + TransportDIDComm[Any] val protocolHandlerLayer: ULayer[ ProtocolExecuter[Services, MediatorError | StorageError] diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala index 7ff5a554..f8c94937 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecuter.scala @@ -10,9 +10,13 @@ import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.comm.protocol.pickup3.MessageDelivery object ForwardMessageExecuter - extends ProtocolExecuter[Agent & UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { + extends ProtocolExecuter[ + Resolver & Operations & Agent & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager], + MediatorError | StorageError + ] { override def supportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri) @@ -34,6 +38,25 @@ object ForwardMessageExecuter for { _ <- repoMessageItem.insert(m.msg) _ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo") + + // For Live Mode + mediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]].flatMap(_.get) + agent <- ZIO.service[Agent] + messageDelivery = MessageDelivery( + thid = m.id, // FIXME what should I put here? + from = agent.id.asFROM, // Mediator agent + to = m.next.asTO, // Destination of the message that is being forward + recipient_did = None, + attachments = Map( + m.msg.sha256 -> m.msg + ) + ).toPlaintextMessage + eMsgDelivery <- Operations + .authEncrypt(messageDelivery) + .mapError(didFail => MediatorDidError(didFail)) + _ <- mediatorTransportManager + .sendForLiveMode(m.next.asTO, eMsgDelivery) + .mapError(didFail => MediatorDidError(didFail)) } yield NoReply } else { for { diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 6ade8c30..5c8b2e0b 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -10,8 +10,13 @@ import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import zio.* import zio.json.* +import fmgp.did.framework._ -object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo, MediatorError | StorageError] { +object PickupExecuter + extends ProtocolExecuter[ + UserAccountRepo & MessageItemRepo & TransportDIDComm[Any] & Ref[MediatorTransportManager], + MediatorError | StorageError + ] { override def supportedPIURI: Seq[PIURI] = Seq( StatusRequest.piuri, @@ -176,19 +181,64 @@ object PickupExecuter extends ProtocolExecuter[UserAccountRepo & MessageItemRepo ) } yield NoReply case m: LiveModeChange => - ZIO.logInfo("LiveModeChange Not Supported") *> - ZIO.succeed( - Reply( - Problems - .liveModeNotSupported( - from = m.to.asFROM, - to = m.from.asTO, - pthid = m.id, - piuri = m.piuri, + for { + _ <- ZIO.logInfo("LiveModeChange") + // For Live Mode + refMediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]] + transport <- ZIO.service[TransportDIDComm[Any]] + ret <- + if (false) { // FIXME TODO transport type support Live Mode ? + // If sent with live_delivery set to true on a connection incapable of live delivery, a problem_report SHOULD be sent + ZIO + .log(s"Connection '${transport.id}' does not support Live Delivery") + .map(_ => + Problems + .liveModeNotSupported( + from = m.to.asFROM, + to = m.from.asTO, + pthid = m.id, + piuri = m.piuri, + ) + .toPlaintextMessage ) - .toPlaintextMessage - ) - ) + } else + for { + updateTask <- refMediatorTransportManager.update(tm => + if (m.live_delivery) tm.enableLiveMode(m.from.asFROMTO, transport.id) + else tm.disableLiveMode(m.from.asFROMTO, transport.id) + ) + + // Make the status reply + repoDidAccount <- ZIO.service[UserAccountRepo] + didRequestingMessages = m.from.asFROMTO + mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) + ret = mDidAccount match + case None => + Problems + .notEnroledError( + from = m.to.asFROM, + to = Some(m.from.asTO), + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + didNotEnrolled = didRequestingMessages.asFROM.toDIDSubject, + ) + .toPlaintextMessage + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + Status( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = None, // m.recipient_did, + message_count = msgHash.size, + longest_waited_seconds = None, // TODO + newest_received_time = None, // TODO + oldest_received_time = None, // TODO + total_bytes = None, // TODO + live_delivery = None, // TODO + ).toPlaintextMessage + } yield ret + } yield Reply(ret) } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) case Right(program) => program diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala index f123364b..ddff01f7 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/ForwardMessageExecutorSpec.scala @@ -5,7 +5,9 @@ import fmgp.did.comm.protocol.* import fmgp.did.comm.{EncryptedMessage, Operations, PlaintextMessage, SignedMessage, layerDefault} import fmgp.did.method.peer.DidPeerResolver import fmgp.util.Base64 +import io.iohk.atala.mediator.MediatorTransportManager import io.iohk.atala.mediator.db.* +import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import io.iohk.atala.mediator.db.MessageItemRepoSpec.encryptedMessageAlice import io.iohk.atala.mediator.protocols.ForwardMessageExecuter import zio.* @@ -17,7 +19,6 @@ import zio.test.Assertion.* import fmgp.did.DIDSubject import scala.concurrent.ExecutionContext.Implicits.global -import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* import reactivemongo.api.bson.BSONDocument /** mediator/testOnly io.iohk.atala.mediator.protocols.ForwardMessageExecutorSpec */ @@ -56,7 +57,9 @@ object ForwardMessageExecutorSpec extends ZIOSpecDefault with DidAccountStubSetu case NoReply => assertTrue(true) } @@ TestAspect.before(setupAndClean) - ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + ) + .provideSomeLayer(io.iohk.atala.mediator.MediatorTransportManagerUtil.layerTest) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala index 2b8a5ff6..edb21640 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala @@ -8,7 +8,7 @@ import fmgp.did.comm.protocol.pickup3.StatusRequest import fmgp.did.method.peer.DidPeerResolver import fmgp.did.{Agent, DIDSubject} import fmgp.util.Base64 -import io.iohk.atala.mediator.MediatorAgent +import io.iohk.atala.mediator.* import io.iohk.atala.mediator.db.* import io.iohk.atala.mediator.db.AgentStub.* import io.iohk.atala.mediator.db.EmbeddedMongoDBInstance.* @@ -166,11 +166,16 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case NoReply => assertTrue(true) } } @@ TestAspect.before(setupAndClean) - ).provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + ) + .provideSomeLayer(MediatorTransportManagerUtil.layerTest) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(Operations.layerDefault) .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) .provideSomeLayer(AgentStub.agentLayer) - .provideLayerShared(dataAccessLayer) @@ TestAspect.sequential + .provideSomeEnvironment((e: ZEnvironment[UserAccountRepo & MessageItemRepo]) => + e ++ ZEnvironment(TransportUtil.newTransportEmpty) + ) + .provideLayerShared(dataAccessLayer) } val dataAccessLayer = EmbeddedMongoDBInstance.layer(port, hostIp) From 28aa23cd1b74a3fa3d09633e29a42e54cb2fd65c Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Mon, 27 Nov 2023 18:31:26 +0000 Subject: [PATCH 2/4] Missing files --- .../mediator/MediatorTransportManager.scala | 127 ++++++++++++++++++ .../MediatorTransportManagerUtil.scala | 14 ++ .../iohk/atala/mediator/TransportUtil.scala | 27 ++++ 3 files changed, 168 insertions(+) create mode 100644 mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala create mode 100644 mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala create mode 100644 mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala new file mode 100644 index 00000000..2f4d7874 --- /dev/null +++ b/mediator/src/main/scala/io/iohk/atala/mediator/MediatorTransportManager.scala @@ -0,0 +1,127 @@ +package io.iohk.atala.mediator + +import zio._ +import zio.json._ +import zio.stream._ + +import fmgp.did._ +import fmgp.did.comm._ +import fmgp.did.framework._ +import fmgp.crypto.error._ +import fmgp.util._ + +type TransportID = String + +/** Based on the [[fmgp.did.framework.TransportDispatcher]] */ +case class MediatorTransportManager( + transports: Set[TransportDIDComm[Any]] = Set.empty, + ids: Map[FROMTO, Set[TransportID]] = Map.empty, + kids: Map[VerificationMethodReferenced, Set[TransportID]] = Map.empty, + liveMode: Map[FROMTO, Set[TransportID]] = Map.empty, + transportFactory: TransportFactory +) extends TransportDispatcher { + + override def openTransport(uri: String): UIO[TransportDIDComm[Any]] = + transportFactory.openTransport(uri) // FIXME TODO register Transport + + def link(vmr: VerificationMethodReferenced, transportID: TransportID): MediatorTransportManager = + if (!transports.map(_.id).contains(transportID)) this // if transport is close + else + kids.get(vmr) match + case Some(seq) if seq.contains(transportID) => this + case Some(seq) => this.copy(kids = kids + (vmr -> (seq + transportID))).link(vmr.did.asFROMTO, transportID) + case None => this.copy(kids = kids + (vmr -> Set(transportID))).link(vmr.did.asFROMTO, transportID) + + def link(from: FROMTO, transport: TransportDIDComm[Any]): MediatorTransportManager = link(from, transport.id) + def link(from: FROMTO, transportID: TransportID): MediatorTransportManager = + if (!transports.map(_.id).contains(transportID)) this // if transport is close + else + ids.get(from) match + case Some(seq) if seq.contains(transportID) => this + case Some(seq) => this.copy(ids = ids + (from -> (seq + transportID))) + case None => this.copy(ids = ids + (from -> Set(transportID))) + + def registerTransport(transport: TransportDIDComm[Any]) = + this.copy(transports = transports + transport) + + def unregisterTransport(transportID: TransportID) = this.copy( + transports = transports.filter(_.id != transportID), + ids = ids.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty), + kids = kids.map { case (kid, ids) => (kid, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty), + liveMode = liveMode.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty), + ) + + def enableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager = + this.copy( + liveMode = liveMode.updatedWith(subject) { + case Some(set) => Some(set - transportID).filter(_.isEmpty) + case None => None + } + ) + + def disableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager = + this.copy( + liveMode = liveMode.updatedWith(subject) { + case Some(set) => Some(set + transportID) + case None => Some(Set(transportID)) + } + ) + + def getLiveModeEnableConnections(subject: FROMTO): Seq[TransportDIDComm[Any]] = + liveMode.get(subject).toSeq.flatMap(transportId => transports.filter(t => transportId.contains(t.id))) + + def sendForLiveMode( + next: TO, + msg: /*lazy*/ => SignedMessage | EncryptedMessage + ): ZIO[Any, DidFail, Iterable[Unit]] = { + val transportIDs = this.liveMode.getOrElse(next.asFROMTO, Seq.empty) + val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id)) + ZIO.foreach(myChannels) { _.send(msg) } + } + + // TODO maybe rename to send + def publish(to: TO, msg: SignedMessage | EncryptedMessage): ZIO[Any, Nothing, Iterable[Unit]] = { + val transportIDs = this.ids.getOrElse(to.asFROMTO, Seq.empty) + val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id)) + ZIO.foreach(myChannels) { _.send(msg) } + } + + override def send( + to: TO, + msg: SignedMessage | EncryptedMessage, + thid: Option[MsgID], // TODO use + pthid: Option[MsgID], // TODO use + ): ZIO[Resolver & Agent & Operations, DidFail, Unit] = + sendViaDIDCommMessagingService(to, msg).unit + + override def sendViaDIDCommMessagingService( + to: TO, + msg: SignedMessage | EncryptedMessage + ): ZIO[Resolver & Agent & Operations, DidFail, Either[String, TransportDIDComm[Any]]] = + super.sendViaDIDCommMessagingService(to, msg) + +} + +object MediatorTransportManager { + + def make: URIO[TransportFactory, Ref[MediatorTransportManager]] = + for { + transportFactory <- ZIO.service[TransportFactory] + ref <- Ref.make(MediatorTransportManager(transportFactory = transportFactory)) + } yield ref + + def registerTransport(transport: TransportDIDComm[Any]) = + for { + socketManager <- ZIO.service[Ref[MediatorTransportManager]] + _ <- socketManager.update { _.registerTransport(transport) } + _ <- ZIO.log(s"RegisterTransport concluded") + } yield () + + def unregisterTransport(transportId: String) = + for { + socketManager <- ZIO.service[Ref[MediatorTransportManager]] + _ <- socketManager.update { case sm: MediatorTransportManager => sm.unregisterTransport(transportId) } + _ <- ZIO.log(s"Channel unregisterSocket") + } yield () + +} diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala b/mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala new file mode 100644 index 00000000..d8742014 --- /dev/null +++ b/mediator/src/test/scala/io/iohk/atala/mediator/MediatorTransportManagerUtil.scala @@ -0,0 +1,14 @@ +package io.iohk.atala.mediator + +import zio._ +import fmgp.did.framework.TransportFactory +import zio.http._ +import fmgp.did.framework.TransportFactoryImp + +object MediatorTransportManagerUtil { + + // utility + def layerTest: ZLayer[Any, Nothing, Ref[MediatorTransportManager]] = + Scope.default >>> (Client.default >>> TransportFactoryImp.layer).orDie >>> + ZLayer.fromZIO(MediatorTransportManager.make) +} diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala b/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala new file mode 100644 index 00000000..3d7f1bbd --- /dev/null +++ b/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala @@ -0,0 +1,27 @@ +package io.iohk.atala.mediator + +import fmgp.did._ +import fmgp.did.comm._ +import fmgp.did.framework._ +import zio.stream._ + +object TransportUtil { + def newTransportEmpty: TransportDIDComm[Any] = + new Transport[Any, SignedMessage | EncryptedMessage, SignedMessage | EncryptedMessage] { + + def id: TransportID = "newTransportEmpty_Test" + def inbound: zio.stream.ZStream[ + Any, + Transport.InErr, + SignedMessage | EncryptedMessage + ] = ZStream.empty + def outbound: zio.stream.ZSink[ + Any, + Transport.OutErr, + SignedMessage | EncryptedMessage, + Nothing, + Unit + ] = ZSink.drain + + } +} From fa8f66d348f227663a092bc226ba0b83bcf8102f Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Wed, 29 Nov 2023 17:43:46 +0000 Subject: [PATCH 3/4] Done --- build.sbt | 2 +- .../iohk/atala/mediator/DIDCommRoutes.scala | 2 + .../mediator/protocols/PickupExecuter.scala | 92 +++++++++---------- .../iohk/atala/mediator/TransportUtil.scala | 36 +++++++- .../mediator/protocols/MessageSetup.scala | 22 +++++ .../protocols/PickupExecuterSpec.scala | 72 +++++++++++---- 6 files changed, 160 insertions(+), 66 deletions(-) diff --git a/build.sbt b/build.sbt index 6ae1a121..378cb09c 100644 --- a/build.sbt +++ b/build.sbt @@ -9,7 +9,7 @@ inThisBuild( /** Versions */ lazy val V = new { - val scalaDID = "0.1.0-M15" + val scalaDID = "0.1.0-M16" // FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554 val munit = "1.0.0-M10" // "0.7.29" diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala b/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala index 79830b68..e6b74a6c 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/DIDCommRoutes.scala @@ -46,6 +46,8 @@ object DIDCommRoutes { inboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1) outboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1) transport = new TransportDIDComm[Any] { + def transmissionFlow = Transport.TransmissionFlow.BothWays + def transmissionType = Transport.TransmissionType.SingleTransmission def id: TransportID = TransportID.http(req.headers.get("request_id")) def inbound: ZStream[Any, Transport.InErr, SignedMessage | EncryptedMessage] = ZStream.fromQueue(inboundQueue) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index 5c8b2e0b..fe44e443 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -11,6 +11,7 @@ import io.iohk.atala.mediator.db.* import zio.* import zio.json.* import fmgp.did.framework._ +import fmgp.did.framework.Transport.TransmissionType object PickupExecuter extends ProtocolExecuter[ @@ -187,57 +188,56 @@ object PickupExecuter refMediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]] transport <- ZIO.service[TransportDIDComm[Any]] ret <- - if (false) { // FIXME TODO transport type support Live Mode ? - // If sent with live_delivery set to true on a connection incapable of live delivery, a problem_report SHOULD be sent - ZIO - .log(s"Connection '${transport.id}' does not support Live Delivery") - .map(_ => - Problems - .liveModeNotSupported( - from = m.to.asFROM, - to = m.from.asTO, - pthid = m.id, - piuri = m.piuri, - ) - .toPlaintextMessage - ) - } else - for { - updateTask <- refMediatorTransportManager.update(tm => - if (m.live_delivery) tm.enableLiveMode(m.from.asFROMTO, transport.id) - else tm.disableLiveMode(m.from.asFROMTO, transport.id) - ) - - // Make the status reply - repoDidAccount <- ZIO.service[UserAccountRepo] - didRequestingMessages = m.from.asFROMTO - mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) - ret = mDidAccount match - case None => + transport.transmissionType match // If sent with live_delivery set to true on a connection incapable of live delivery, a problem_report SHOULD be sent + case TransmissionType.SingleTransmission => // Like HTTP + ZIO + .log(s"Connection '${transport.id}' does not support Live Delivery") + .map(_ => Problems - .notEnroledError( + .liveModeNotSupported( from = m.to.asFROM, - to = Some(m.from.asTO), - pthid = m.id, // TODO CHECK pthid + to = m.from.asTO, + pthid = m.id, piuri = m.piuri, - didNotEnrolled = didRequestingMessages.asFROM.toDIDSubject, ) .toPlaintextMessage - case Some(didAccount) => - val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) - Status( - thid = m.id, - from = m.to.asFROM, - to = m.from.asTO, - recipient_did = None, // m.recipient_did, - message_count = msgHash.size, - longest_waited_seconds = None, // TODO - newest_received_time = None, // TODO - oldest_received_time = None, // TODO - total_bytes = None, // TODO - live_delivery = None, // TODO - ).toPlaintextMessage - } yield ret + ) + case TransmissionType.MultiTransmissions => // Like WS + for { + updateTask <- refMediatorTransportManager.update(tm => + if (m.live_delivery) tm.enableLiveMode(m.from.asFROMTO, transport.id) + else tm.disableLiveMode(m.from.asFROMTO, transport.id) + ) + // Make the status reply + repoDidAccount <- ZIO.service[UserAccountRepo] + didRequestingMessages = m.from.asFROMTO + mDidAccount <- repoDidAccount.getDidAccount(didRequestingMessages.toDID) + ret = mDidAccount match + case None => + Problems + .notEnroledError( + from = m.to.asFROM, + to = Some(m.from.asTO), + pthid = m.id, // TODO CHECK pthid + piuri = m.piuri, + didNotEnrolled = didRequestingMessages.asFROM.toDIDSubject, + ) + .toPlaintextMessage + case Some(didAccount) => + val msgHash = didAccount.messagesRef.filter(_.state == false).map(_.hash) + Status( + thid = m.id, + from = m.to.asFROM, + to = m.from.asTO, + recipient_did = None, // m.recipient_did, + message_count = msgHash.size, + longest_waited_seconds = None, // TODO + newest_received_time = None, // TODO + oldest_received_time = None, // TODO + total_bytes = None, // TODO + live_delivery = None, // TODO + ).toPlaintextMessage + } yield ret } yield Reply(ret) } match case Left(error) => ZIO.logError(error) *> ZIO.succeed(NoReply) diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala b/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala index 3d7f1bbd..703a0158 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/TransportUtil.scala @@ -8,8 +8,42 @@ import zio.stream._ object TransportUtil { def newTransportEmpty: TransportDIDComm[Any] = new Transport[Any, SignedMessage | EncryptedMessage, SignedMessage | EncryptedMessage] { - + def transmissionFlow = ??? + def transmissionType = ??? def id: TransportID = "newTransportEmpty_Test" + def inbound: zio.stream.ZStream[Any, Transport.InErr, SignedMessage | EncryptedMessage] = ??? + def outbound: zio.stream.ZSink[Any, Transport.OutErr, SignedMessage | EncryptedMessage, Nothing, Unit] = ??? + } + + def newTransportEmptySingleTransmission: TransportDIDComm[Any] = + new Transport[Any, SignedMessage | EncryptedMessage, SignedMessage | EncryptedMessage] { + + def transmissionFlow = Transport.TransmissionFlow.BothWays + def transmissionType = Transport.TransmissionType.SingleTransmission + + def id: TransportID = "newTransportEmpty_Test_SingleTransmission" + def inbound: zio.stream.ZStream[ + Any, + Transport.InErr, + SignedMessage | EncryptedMessage + ] = ZStream.empty + def outbound: zio.stream.ZSink[ + Any, + Transport.OutErr, + SignedMessage | EncryptedMessage, + Nothing, + Unit + ] = ZSink.drain + + } + + def newTransportEmptyMultiTransmissions: TransportDIDComm[Any] = + new Transport[Any, SignedMessage | EncryptedMessage, SignedMessage | EncryptedMessage] { + + def transmissionFlow = Transport.TransmissionFlow.BothWays + def transmissionType = Transport.TransmissionType.MultiTransmissions + + def id: TransportID = "newTransportEmpty_Test_MultiTransmissions" def inbound: zio.stream.ZStream[ Any, Transport.InErr, diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala index 5f630579..9736ed81 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/MessageSetup.scala @@ -312,4 +312,26 @@ trait MessageSetup { | "typ" : "application/didcomm-plain+json" |}""".stripMargin.fromJson[PlaintextMessage] + val plaintextLiveModeEnable = (didFrom: String, mediatorDid: String) => s"""{ + | "id" : "f0f6c406-c247-4842-8d37-c9a4f77226d8", + | "type" : "https://didcomm.org/messagepickup/3.0/live-delivery-change", + | "to" : ["$mediatorDid"], + | "from" : "$didFrom", + | "thid" : "maybe-thid-if-responding", + | "body" : {"live_delivery" : true}, + | "return_route" : "all", + | "typ" : "application/didcomm-plain+json" + |}""".stripMargin.fromJson[PlaintextMessage] + + val plaintextLiveModeDisable = (didFrom: String, mediatorDid: String) => s"""{ + | "id" : "f0f6c406-c247-4842-8d37-c9a4f77226d8", + | "type" : "https://didcomm.org/messagepickup/3.0/live-delivery-change", + | "to" : ["$mediatorDid"], + | "from" : "$didFrom", + | "thid" : "maybe-thid-if-responding", + | "body" : {"live_delivery" : false}, + | "return_route" : "all", + | "typ" : "application/didcomm-plain+json" + |}""".stripMargin.fromJson[PlaintextMessage] + } diff --git a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala index edb21640..1c47c677 100644 --- a/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala +++ b/mediator/src/test/scala/io/iohk/atala/mediator/protocols/PickupExecuterSpec.scala @@ -49,7 +49,7 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), + }, test("Pickup StatusRequest message should return problem report for not enrolled did") { val executer = PickupExecuter for { @@ -62,7 +62,7 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), + }, test("Pickup StatusRequest message should return Status Message") { val executer = PickupExecuter for { @@ -80,8 +80,8 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), - test("Pickup DeliveryRequest message return MessageDelivery and attachment message") { + }, + test("Pickup DeliveryRequest message return MessageDelivery and attachment message") { val executer = PickupExecuter val forwardMessageExecuter = ForwardMessageExecuter for { @@ -110,7 +110,7 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M assertTrue(reply.msg.`type` == MessageDelivery.piuri) && assertTrue(reply.msg.attachments.nonEmpty) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), + }, test("Delivery Request message for Pickup returns a Status Message when there are no messages available") { val pickupExecuter = PickupExecuter for { @@ -130,8 +130,8 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) case _ => assertTrue(false) } - } @@ TestAspect.before(setupAndClean), - test("Messages Received message should clear the messages from the queue") { + }, + test("Messages Received message should clear the messages from the queue") { val executer = PickupExecuter val forwardMessageExecuter = ForwardMessageExecuter for { @@ -165,18 +165,54 @@ object PickupExecuterSpec extends ZIOSpecDefault with DidAccountStubSetup with M case reply: AnyReply => assertTrue(false) case NoReply => assertTrue(true) } - } @@ TestAspect.before(setupAndClean) - ) - .provideSomeLayer(MediatorTransportManagerUtil.layerTest) - .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) - .provideSomeLayer(Operations.layerDefault) - .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) - .provideSomeLayer(AgentStub.agentLayer) - .provideSomeEnvironment((e: ZEnvironment[UserAccountRepo & MessageItemRepo]) => - e ++ ZEnvironment(TransportUtil.newTransportEmpty) - ) - .provideLayerShared(dataAccessLayer) + }, + test("Pickup LiveMode over WS message should return Status Message") { + val executer = PickupExecuter + for { + mediatorAgent <- ZIO.service[MediatorAgent] + userAccount <- ZIO.service[UserAccountRepo] + _ <- userAccount.createOrFindDidAccount(DIDSubject(aliceAgent.id.did)) + _ <- userAccount.addAlias( + owner = DIDSubject(aliceAgent.id.did), + newAlias = DIDSubject(aliceAgent.id.did) + ) + msg <- ZIO.fromEither(plaintextLiveModeEnable(aliceAgent.id.did, mediatorAgent.id.did)) + action <- executer.program(msg) + } yield { + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == Status.piuri) + case _ => assertTrue(false) + } + } + .provideSomeLayer(ZLayer.succeed(TransportUtil.newTransportEmptyMultiTransmissions)), + test("Pickup LiveMode over SingleTransmission (HTTP) message should return ProblemReport") { + val executer = PickupExecuter + for { + mediatorAgent <- ZIO.service[MediatorAgent] + userAccount <- ZIO.service[UserAccountRepo] + _ <- userAccount.createOrFindDidAccount(DIDSubject(aliceAgent.id.did)) + _ <- userAccount.addAlias( + owner = DIDSubject(aliceAgent.id.did), + newAlias = DIDSubject(aliceAgent.id.did) + ) + msg <- ZIO.fromEither(plaintextLiveModeEnable(aliceAgent.id.did, mediatorAgent.id.did)) + action <- executer.program(msg) + } yield { + action match + case reply: AnyReply => assertTrue(reply.msg.`type` == ProblemReport.piuri) + case _ => assertTrue(false) + } + } + .provideSomeLayer(ZLayer.succeed(TransportUtil.newTransportEmptySingleTransmission)), + ) @@ TestAspect.sequential @@ TestAspect.before(setupAndClean) } + .provideSomeLayer(MediatorTransportManagerUtil.layerTest) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + .provideSomeLayer(Operations.layerDefault) + .provideSomeLayer(DidPeerResolver.layerDidPeerResolver) + .provideSomeLayer(AgentStub.agentLayer) + .provideSomeLayer(ZLayer.succeed(TransportUtil.newTransportEmpty)) + .provideLayerShared(dataAccessLayer) val dataAccessLayer = EmbeddedMongoDBInstance.layer(port, hostIp) >>> AsyncDriverResource.layer From 241f9600544c88889d21c1570c828df82c0f283f Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Wed, 29 Nov 2023 19:31:16 +0000 Subject: [PATCH 4/4] Minor improvements --- .../io/iohk/atala/mediator/AgentExecutorMediator.scala | 10 +++++++--- .../iohk/atala/mediator/protocols/PickupExecuter.scala | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala index 2e4976cc..93be459a 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/AgentExecutorMediator.scala @@ -55,7 +55,7 @@ case class AgentExecutorMediator( .provideSomeLayer(this.indentityLayer) .provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer) .provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => - e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager) ++ ZEnvironment(transport) + e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager) ) .orDieWith(ex => new RuntimeException(ex.toString)) @@ -63,7 +63,9 @@ case class AgentExecutorMediator( msg: SignedMessage | EncryptedMessage, transport: TransportDIDComm[Any] ): ZIO[ - OperatorImp.Services & ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], + Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] & + // instead of OperatorImp.Services + ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], MediatorError | StorageError, Unit ] = ZIO.logAnnotate("msg_sha256", msg.sha256) { @@ -114,7 +116,9 @@ case class AgentExecutorMediator( pMsgOrProblemReport: Either[ProblemReport, PlaintextMessage], transport: TransportDIDComm[Any] ): ZIO[ - ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError] & OperatorImp.Services, + Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] & + // instead of OperatorImp.Services + ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError], MediatorError | StorageError, Unit ] = diff --git a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala index fe44e443..acce9484 100644 --- a/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala +++ b/mediator/src/main/scala/io/iohk/atala/mediator/protocols/PickupExecuter.scala @@ -187,6 +187,7 @@ object PickupExecuter // For Live Mode refMediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]] transport <- ZIO.service[TransportDIDComm[Any]] + _ <- ZIO.log(s"The transport's transmissionType is of the type ${transport.transmissionType}") ret <- transport.transmissionType match // If sent with live_delivery set to true on a connection incapable of live delivery, a problem_report SHOULD be sent case TransmissionType.SingleTransmission => // Like HTTP