Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pickup Protocol - Support for Live Mode #145 #184

Merged
merged 5 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions Mediator-Error_Handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ https://identity.foundation/didcomm-messaging/spec/#problem-reports

This table defines the expected behavior of the mediator in different scenarios not covered by the specifications.

| Mediators | Atala Mediator | Roadmap Atala Mediator | │ | RootsId | Blocktrust |
|-------------|----------------|------------------------|---|---------|------------|
| Scenario G1 | G1C | - | │ | ? | ? |
| Scenario G2 | G2B [ATL-5840] | - | │ | | ? |
| Scenario G3 | Fallback G2B | [TODO] G3B | │ | | ? |
| Scenario G4 | G4B | - | │ | | ? |
| Scenario G5 | Fallback G4B | [TODO] G5B | │ | | ? |
| Scenario G6 | Fallback G4B | [WIP] G6B | │ | | ? |
| Scenario G7 | Fallback G4B | [TODO] G7B | │ | | ? |
| Scenario G8 | G8C | - | │ | | ? |
| | | | | | |
| Scenario M1 | M1B | - | │ | | ? |
| Scenario M2 | M2B | - | │ | | ? |
| Scenario M3 | Fallback G4 | M3B | │ | | ? |
| Scenario M4 | M4B | - | │ | | ? |
| Scenario M5 | M5A | [TODO] M5B | │ | | ? |
| Mediators | Atala Mediator | Roadmap Atala Mediator |
|-------------|----------------|------------------------|
| Scenario G1 | G1C | - |
| Scenario G2 | G2B [ATL-5840] | - |
| Scenario G3 | Fallback G2B | [TODO] G3B |
| Scenario G4 | G4B | - |
| Scenario G5 | Fallback G4B | [TODO] G5B |
| Scenario G6 | Fallback G4B | G6B |
| Scenario G7 | Fallback G4B | [TODO] G7B |
| Scenario G8 | G8C | - |
| | | |
| Scenario M1 | M1B | - |
| Scenario M2 | M2B | - |
| Scenario M3 | Fallback G4 | M3B |
| Scenario M4 | M4B | - |
| Scenario M5 | M5A | M5B [#145] |

### Scenarios Description

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ inThisBuild(

/** Versions */
lazy val V = new {
val scalaDID = "0.1.0-M15"
val scalaDID = "0.1.0-M16"

// FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554
val munit = "1.0.0-M10" // "0.7.29"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import fmgp.did.comm.protocol.reportproblem2.ProblemReport

case class AgentExecutorMediator(
agent: Agent,
transportManager: Ref[TransportManager],
transportManager: Ref[MediatorTransportManager],
protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
userAccountRepo: UserAccountRepo,
messageItemRepo: MessageItemRepo,
Expand Down Expand Up @@ -54,14 +54,18 @@ case class AgentExecutorMediator(
.tapError(ex => ZIO.logError(ex.toString))
.provideSomeLayer(this.indentityLayer)
.provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer)
.provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) => e ++ ZEnvironment(protocolHandler))
.provideSomeEnvironment((e: ZEnvironment[Resolver & Operations]) =>
e ++ ZEnvironment(protocolHandler) ++ ZEnvironment(transportManager)
)
.orDieWith(ex => new RuntimeException(ex.toString))

def receiveMessage(
msg: SignedMessage | EncryptedMessage,
transport: TransportDIDComm[Any]
): ZIO[
OperatorImp.Services & ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] &
// instead of OperatorImp.Services
ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
MediatorError | StorageError,
Unit
] = ZIO.logAnnotate("msg_sha256", msg.sha256) {
Expand Down Expand Up @@ -112,7 +116,9 @@ case class AgentExecutorMediator(
pMsgOrProblemReport: Either[ProblemReport, PlaintextMessage],
transport: TransportDIDComm[Any]
): ZIO[
ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError] & OperatorImp.Services,
Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] &
// instead of OperatorImp.Services
ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
MediatorError | StorageError,
Unit
] =
Expand Down Expand Up @@ -175,7 +181,12 @@ case class AgentExecutorMediator(
case Some(problemReport) => ZIO.succeed(Reply(problemReport.toPlaintextMessage))
case None =>
protocolHandler
.program(plaintextMessage)
.program(plaintextMessage) // should we change the signature of the method or use the ZEnvironment
.provideSomeEnvironment(
(e: ZEnvironment[
Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager]
]) => e ++ ZEnvironment(transport)
)
.catchSome { case ProtocolExecutionFailToParse(failToParse) =>
for {
_ <- ZIO.logWarning(s"Error ProtocolExecutionFailToParse: $failToParse")
Expand Down Expand Up @@ -246,7 +257,7 @@ object AgentExecutorMediator {
messageItemRepo: MessageItemRepo,
): ZIO[TransportFactory, Nothing, AgentExecutar] =
for {
transportManager <- TransportManager.make
transportManager <- MediatorTransportManager.make
mediator = AgentExecutorMediator(agent, transportManager, protocolHandler, userAccountRepo, messageItemRepo)
} yield mediator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object DIDCommRoutes {
inboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1)
outboundQueue <- Queue.bounded[SignedMessage | EncryptedMessage](1)
transport = new TransportDIDComm[Any] {
def transmissionFlow = Transport.TransmissionFlow.BothWays
def transmissionType = Transport.TransmissionType.SingleTransmission
def id: TransportID = TransportID.http(req.headers.get("request_id"))
def inbound: ZStream[Any, Transport.InErr, SignedMessage | EncryptedMessage] =
ZStream.fromQueue(inboundQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import fmgp.did.*
import fmgp.did.comm.*
import fmgp.did.comm.protocol.*
import fmgp.did.method.peer.*
import fmgp.did.framework.TransportFactoryImp
import io.iohk.atala.mediator.db.*
import io.iohk.atala.mediator.protocols.*
import zio.*
import zio.stream.*
import zio.config.*
import zio.config.magnolia.*
import zio.config.typesafe.*
import zio.http.*
import zio.json.*
import zio.logging.*
import zio.logging.LogFormat.*
import zio.logging.backend.SLF4J
import zio.logging.*
import zio.stream.*

import java.time.format.DateTimeFormatter
import scala.io.Source
import fmgp.did.framework.TransportFactoryImp
case class MediatorConfig(endpoints: String, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) {
val did = DIDPeer2.makeAgent(
Seq(keyAgreement, keyAuthentication),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.iohk.atala.mediator

import zio._
import zio.json._
import zio.stream._

import fmgp.did._
import fmgp.did.comm._
import fmgp.did.framework._
import fmgp.crypto.error._
import fmgp.util._

type TransportID = String

/** Based on the [[fmgp.did.framework.TransportDispatcher]] */
case class MediatorTransportManager(
transports: Set[TransportDIDComm[Any]] = Set.empty,
ids: Map[FROMTO, Set[TransportID]] = Map.empty,
kids: Map[VerificationMethodReferenced, Set[TransportID]] = Map.empty,
liveMode: Map[FROMTO, Set[TransportID]] = Map.empty,
transportFactory: TransportFactory
) extends TransportDispatcher {

override def openTransport(uri: String): UIO[TransportDIDComm[Any]] =
transportFactory.openTransport(uri) // FIXME TODO register Transport

def link(vmr: VerificationMethodReferenced, transportID: TransportID): MediatorTransportManager =
if (!transports.map(_.id).contains(transportID)) this // if transport is close
else
kids.get(vmr) match
case Some(seq) if seq.contains(transportID) => this
case Some(seq) => this.copy(kids = kids + (vmr -> (seq + transportID))).link(vmr.did.asFROMTO, transportID)
case None => this.copy(kids = kids + (vmr -> Set(transportID))).link(vmr.did.asFROMTO, transportID)

def link(from: FROMTO, transport: TransportDIDComm[Any]): MediatorTransportManager = link(from, transport.id)
def link(from: FROMTO, transportID: TransportID): MediatorTransportManager =
if (!transports.map(_.id).contains(transportID)) this // if transport is close
else
ids.get(from) match
case Some(seq) if seq.contains(transportID) => this
case Some(seq) => this.copy(ids = ids + (from -> (seq + transportID)))
case None => this.copy(ids = ids + (from -> Set(transportID)))

def registerTransport(transport: TransportDIDComm[Any]) =
this.copy(transports = transports + transport)

def unregisterTransport(transportID: TransportID) = this.copy(
transports = transports.filter(_.id != transportID),
ids = ids.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty),
kids = kids.map { case (kid, ids) => (kid, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty),
liveMode = liveMode.map { case (did, ids) => (did, ids.filter(_ != transportID)) }.filterNot(_._2.isEmpty),
)

def enableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager =
this.copy(
liveMode = liveMode.updatedWith(subject) {
case Some(set) => Some(set - transportID).filter(_.isEmpty)
case None => None
}
)

def disableLiveMode(subject: FROMTO, transportID: TransportID): MediatorTransportManager =
this.copy(
liveMode = liveMode.updatedWith(subject) {
case Some(set) => Some(set + transportID)
case None => Some(Set(transportID))
}
)

def getLiveModeEnableConnections(subject: FROMTO): Seq[TransportDIDComm[Any]] =
liveMode.get(subject).toSeq.flatMap(transportId => transports.filter(t => transportId.contains(t.id)))

def sendForLiveMode(
next: TO,
msg: /*lazy*/ => SignedMessage | EncryptedMessage
): ZIO[Any, DidFail, Iterable[Unit]] = {
val transportIDs = this.liveMode.getOrElse(next.asFROMTO, Seq.empty)
val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id))
ZIO.foreach(myChannels) { _.send(msg) }
}

// TODO maybe rename to send
def publish(to: TO, msg: SignedMessage | EncryptedMessage): ZIO[Any, Nothing, Iterable[Unit]] = {
val transportIDs = this.ids.getOrElse(to.asFROMTO, Seq.empty)
val myChannels = transportIDs.flatMap(id => this.transports.find(_.id == id))
ZIO.foreach(myChannels) { _.send(msg) }
}

override def send(
to: TO,
msg: SignedMessage | EncryptedMessage,
thid: Option[MsgID], // TODO use
pthid: Option[MsgID], // TODO use
): ZIO[Resolver & Agent & Operations, DidFail, Unit] =
sendViaDIDCommMessagingService(to, msg).unit

override def sendViaDIDCommMessagingService(
to: TO,
msg: SignedMessage | EncryptedMessage
): ZIO[Resolver & Agent & Operations, DidFail, Either[String, TransportDIDComm[Any]]] =
super.sendViaDIDCommMessagingService(to, msg)

}

object MediatorTransportManager {

def make: URIO[TransportFactory, Ref[MediatorTransportManager]] =
for {
transportFactory <- ZIO.service[TransportFactory]
ref <- Ref.make(MediatorTransportManager(transportFactory = transportFactory))
} yield ref

def registerTransport(transport: TransportDIDComm[Any]) =
for {
socketManager <- ZIO.service[Ref[MediatorTransportManager]]
_ <- socketManager.update { _.registerTransport(transport) }
_ <- ZIO.log(s"RegisterTransport concluded")
} yield ()

def unregisterTransport(transportId: String) =
for {
socketManager <- ZIO.service[Ref[MediatorTransportManager]]
_ <- socketManager.update { case sm: MediatorTransportManager => sm.unregisterTransport(transportId) }
_ <- ZIO.log(s"Channel unregisterSocket")
} yield ()

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import io.iohk.atala.mediator.protocols.*
import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo}

object OperatorImp {
type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo
type Services = Resolver & Agent & Operations & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager] &
TransportDIDComm[Any]

val protocolHandlerLayer: ULayer[
ProtocolExecuter[Services, MediatorError | StorageError]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import io.iohk.atala.mediator.*
import io.iohk.atala.mediator.db.*
import zio.*
import zio.json.*
import fmgp.did.comm.protocol.pickup3.MessageDelivery

object ForwardMessageExecuter
extends ProtocolExecuter[Agent & UserAccountRepo & MessageItemRepo, MediatorError | StorageError] {
extends ProtocolExecuter[
Resolver & Operations & Agent & UserAccountRepo & MessageItemRepo & Ref[MediatorTransportManager],
MediatorError | StorageError
] {

override def supportedPIURI: Seq[PIURI] = Seq(ForwardMessage.piuri)

Expand All @@ -34,6 +38,25 @@ object ForwardMessageExecuter
for {
_ <- repoMessageItem.insert(m.msg)
_ <- ZIO.logInfo("Add next msg (of the ForwardMessage) to the Message Repo")

// For Live Mode
mediatorTransportManager <- ZIO.service[Ref[MediatorTransportManager]].flatMap(_.get)
agent <- ZIO.service[Agent]
messageDelivery = MessageDelivery(
thid = m.id, // FIXME what should I put here?
from = agent.id.asFROM, // Mediator agent
to = m.next.asTO, // Destination of the message that is being forward
recipient_did = None,
attachments = Map(
m.msg.sha256 -> m.msg
)
).toPlaintextMessage
eMsgDelivery <- Operations
.authEncrypt(messageDelivery)
.mapError(didFail => MediatorDidError(didFail))
_ <- mediatorTransportManager
.sendForLiveMode(m.next.asTO, eMsgDelivery)
.mapError(didFail => MediatorDidError(didFail))
} yield NoReply
} else {
for {
Expand Down
Loading
Loading