Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: websockets support #172

Merged
merged 21 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ jobs:
ATALA_GITHUB_ACTOR: ${{ secrets.ATALA_GITHUB_ACTOR }}
ATALA_GITHUB_TOKEN: ${{ secrets.ATALA_GITHUB_TOKEN }}
REPORTS_DIR: "didcomm-v2-mediator-test-suite/target/site/serenity"
DIDCOMM_V2_TESTSUITE_VERSION: "a6288ef6536cd15181c30896841f5c33ef0c050b" # old "v0.1.0"
MEDIATOR_DID: "did:peer:2.Ez6LSghwSE437wnDE1pt3X6hVDUQzSjsHzinpX3XFvMjRAm7y.Vz6Mkhh1e5CEYYq6JBUcTZ6Cp2ranCWRrv7Yax3Le4N59R6dd.SeyJ0IjoiZG0iLCJzIjoiaHR0cDovL2xvY2FsaG9zdDo4MDgwIiwiciI6W10sImEiOlsiZGlkY29tbS92MiJdfQ"
DIDCOMM_V2_TESTSUITE_VERSION: "b396c77911d15223d64477dc2bca27ddcf8ad0e7" # https://github.com/input-output-hk/didcomm-v2-mediator-test-suite/pull/13/commits/b396c77911d15223d64477dc2bca27ddcf8ad0e7
# MEDIATOR_DID: "did:peer:2.Ez6LSghwSE437wnDE1pt3X6hVDUQzSjsHzinpX3XFvMjRAm7y.Vz6Mkhh1e5CEYYq6JBUcTZ6Cp2ranCWRrv7Yax3Le4N59R6dd.SeyJ0IjoiZG0iLCJzIjoiaHR0cDovL2xvY2FsaG9zdDo4MDgwIiwiciI6W10sImEiOlsiZGlkY29tbS92MiJdfQ"
MEDIATOR_DID: "did:peer:2.Ez6LSghwSE437wnDE1pt3X6hVDUQzSjsHzinpX3XFvMjRAm7y.Vz6Mkhh1e5CEYYq6JBUcTZ6Cp2ranCWRrv7Yax3Le4N59R6dd.SeyJ0IjoiZG0iLCJzIjoiaHR0cDovL2xvY2FsaG9zdDo4MDgwIiwiciI6W10sImEiOlsiZGlkY29tbS92MiJdfQ.SeyJ0IjoiZG0iLCJzIjoid3M6Ly9sb2NhbGhvc3Q6ODA4MC93cyIsInIiOltdLCJhIjpbImRpZGNvbW0vdjIiXX0"
steps:
- name: Checkout mediator
uses: actions/checkout@v3
Expand Down
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ inThisBuild(

/** Versions */
lazy val V = new {
val scalaDID = "0.1.0-M13"
// val scalajsJavaSecureRandom = "1.0.0"
val scalaDID = "0.1.0-M14"

// FIXME another bug in the test framework https://github.com/scalameta/munit/issues/554
val munit = "1.0.0-M10" // "0.7.29"
Expand Down Expand Up @@ -49,6 +48,7 @@ lazy val D = new {
val scalaDID = Def.setting("app.fmgp" %%% "did" % V.scalaDID)
val scalaDID_imp = Def.setting("app.fmgp" %%% "did-imp" % V.scalaDID)
val scalaDID_peer = Def.setting("app.fmgp" %%% "did-method-peer" % V.scalaDID)
val scalaDID_framework = Def.setting("app.fmgp" %%% "did-framework" % V.scalaDID)

// /** The [[java.security.SecureRandom]] is used by the [[java.util.UUID.randomUUID()]] method in [[MsgId]].
// *
Expand Down Expand Up @@ -204,6 +204,7 @@ lazy val mediator = project
.settings(
libraryDependencies += D.scalaDID_imp.value,
libraryDependencies += D.scalaDID_peer.value,
libraryDependencies += D.scalaDID_framework.value,
libraryDependencies += D.zioHttp.value,
libraryDependencies ++= Seq(
D.zioConfig.value,
Expand All @@ -226,7 +227,7 @@ lazy val mediator = project
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
)
.settings(
Compile / mainClass := Some("io.iohk.atala.mediator.app.MediatorStandalone"),
Compile / mainClass := Some("io.iohk.atala.mediator.MediatorStandalone"),
Docker / maintainer := "[email protected]",
Docker / dockerUsername := Some("input-output-hk"),
Docker / dockerRepository := Some("ghcr.io"),
Expand Down

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion mediator/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mediator = {
x = "MBjnXZxkMcoQVVL21hahWAw43RuAG-i64ipbeKKqwoA"
x = ${?KEY_AUTHENTICATION_X}
}
endpoint = "http://localhost:8080"
endpoint = ["http://localhost:8080", "ws://localhost:8080/ws"] #, "http://localhost:8080/http"]
endpoint = ${?SERVICE_ENDPOINT}
}
server.http.port = 8080
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package io.iohk.atala.mediator

import zio._
import zio.json._
import zio.stream._

import fmgp.crypto.error._
import fmgp.did._
import fmgp.did.comm._
import fmgp.did.comm.protocol._
import fmgp.util._
import io.iohk.atala.mediator.db.{UserAccountRepo, MessageItemRepo}

case class AgentExecutorMediator(
agent: Agent,
transportManager: Ref[TransportManager],
protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
userAccountRepo: UserAccountRepo,
messageItemRepo: MessageItemRepo,
) extends AgentExecutar {
val scope = Scope.global // TODO do not use global
val indentityLayer = ZLayer.succeed(agent)
val userAccountRepoLayer = ZLayer.succeed(userAccountRepo)
val messageItemRepoLayer = ZLayer.succeed(messageItemRepo)
override def subject: DIDSubject = agent.id.asDIDSubject

override def receiveMsg(
msg: SignedMessage | EncryptedMessage,
transport: TransportDIDComm[Any]
): URIO[Operations & Resolver, Unit] = {
for {
job <- transport.inbound
.mapZIO(msg => jobExecuterProtocol(msg, transport))
.runDrain
.forkIn(scope)
ret <- jobExecuterProtocol(msg, transport) // Run a single time (for the message already read)
} yield ()
}.provideSomeLayer(userAccountRepoLayer ++ messageItemRepoLayer)

def jobExecuterProtocol(
msg: SignedMessage | EncryptedMessage,
transport: TransportDIDComm[Any],
): URIO[Operations & Resolver & UserAccountRepo & MessageItemRepo, Unit] =
this
.receiveMessage(msg, transport)
.tapError(ex => ZIO.log(ex.toString))
.provideSomeLayer(this.indentityLayer)
.provideSomeEnvironment((e: ZEnvironment[Resolver & Operations & UserAccountRepo & MessageItemRepo]) =>
e ++ ZEnvironment(protocolHandler)
)
.orDieWith(ex => new RuntimeException(ex.toString))

def receiveMessage(
msg: SignedMessage | EncryptedMessage,
transport: TransportDIDComm[Any]
): ZIO[
OperatorImp.Services & ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
MediatorError | StorageError,
Unit
] = ZIO.logAnnotate("msg_sha256", msg.sha256) {
for {
_ <- ZIO.logDebug(s"Receive message with sha256: '${msg.sha256}'")
agent <- ZIO.service[Agent]
recipientsSubject <- msg match
case eMsg: EncryptedMessage => ZIO.succeed(eMsg.recipientsSubject)
case sMsg: SignedMessage =>
ZIO
.fromEither(sMsg.payloadAsPlaintextMessage)
.map(_.to.toSet.flatten.map(_.toDIDSubject))
.mapError(didFail => MediatorDidError(didFail))
_ <- transportManager.get.flatMap { m =>
ZIO.foreach(recipientsSubject)(subject => m.publish(subject.asTO, msg))
}
_ <-
if (!recipientsSubject.contains(agent.id.asDIDSubject)) {
ZIO.logError(s"This agent '${agent.id.asDIDSubject}' is not a recipient") // TODO send a FAIL!!!!!!
} else
AgentExecutorMediator
.decrypt(msg)
.mapError(didFail => MediatorDidError(didFail))
.flatMap(pMsg => processMessage(pMsg, transport))
} yield ()
}

def processMessage(plaintextMessage: PlaintextMessage, transport: TransportDIDComm[Any]): ZIO[
ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError] & OperatorImp.Services,
MediatorError | StorageError,
Unit
] =
for {
_ <- plaintextMessage.from match
case None => ZIO.unit
case Some(from) => transportManager.update { _.link(from.asFROMTO, transport) }
protocolHandler <- ZIO.service[ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError]]
action <- protocolHandler
.program(plaintextMessage)
.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex"))
ret <- action match
case NoReply => ZIO.unit // TODO Maybe infor transport of immediately reply
case reply: AnyReply =>
import fmgp.did.comm.Operations._
for {
message <- {
reply.msg.to.toSeq.flatten match {
case Seq() =>
reply.msg.from match
case Some(from) => sign(reply.msg)
case None => ZIO.logError(s"No sender or recipient: ${reply.msg}") *> ZIO.fail(NoSenderOrRecipient)
case tos => // TODO FIXME is case is not a response
reply.msg.from match
case Some(from) => authEncrypt(reply.msg)
case None => anonEncrypt(reply.msg)
}
}.mapError(didFail => MediatorDidError(didFail))
_ <- plaintextMessage.return_route match
case Some(ReturnRoute.none) | None => transport.send(message) // FIXME transportManager pick the best way
case Some(ReturnRoute.all) | Some(ReturnRoute.thread) => transport.send(message)
} yield ()
} yield ()

}

object AgentExecutorMediator {

def make[S >: Resolver & Operations](
agent: Agent,
protocolHandler: ProtocolExecuter[OperatorImp.Services, MediatorError | StorageError],
userAccountRepo: UserAccountRepo,
messageItemRepo: MessageItemRepo,
): ZIO[Any, Nothing, AgentExecutar] =
TransportManager.make.map(AgentExecutorMediator(agent, _, protocolHandler, userAccountRepo, messageItemRepo))

// TODO move to another place & move validations and build a contex
def decrypt(msg: Message): ZIO[Agent & Resolver & Operations, DidFail, PlaintextMessage] =
for {
ops <- ZIO.service[Operations]
plaintextMessage <- msg match
case pm: PlaintextMessage => ZIO.succeed(pm)
case em: EncryptedMessage =>
{
em.`protected`.obj match
case AnonProtectedHeader(epk, apv, typ, enc, alg) => ops.anonDecrypt(em)
case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => ops.authDecrypt(em)
}.flatMap(decrypt _)
case sm: SignedMessage =>
ops.verify(sm).flatMap {
case false => ZIO.fail(ValidationFailed)
case true =>
sm.payload.content.fromJson[Message] match
case Left(error) => ZIO.fail(FailToParse(error))
case Right(msg2) => decrypt(msg2)
}
} yield (plaintextMessage)

}
Loading
Loading