Skip to content

Separating Channels and Actors

Tim Watson edited this page Dec 1, 2018 · 13 revisions

Terminology and clarifications

For the purposes of this document/page:

  • Cloud Haskell refers to the current implementation of distributed-process - (from this repository, in fact), and other libraries authored by Duncan, Edsko, and other brilliant folks from Well Typed, and developed, maintained, and curated by the awesome people at Tweag I/O (Mathieu, Alexander, and Facundo, to mention a few), and Tim Watson

  • Akka refers to the Java/Scala implementation of Akka

  • Akka Streams refers to the Akka Scala API for Reactive Streams

  • Erlang refers to the open source implementation of Erlang/OTP

  • Actor refers to the universal primitive of concurrent computation under the actor model

  • In distributed-process terms, Actor refers to code running in the Process monad, which has been spawned on a local or remote node, using the primitives spawn, spawnLocal, spawnChan, and so on

  • In GHC-Haskell terms, an Actor in distributed-process is a forkIO thread which is managed by the local node infrastructure, in terms of its lifetime, and connection(s) to other actors in the system (whether distributed or otherwise)

  • Node infrastructure refers to the relationship between the local node and running processes in Cloud Haskell, viz data that is shared, and communicated, between forkIO thread, all of which is managed by the local node.

Motivation

  1. The need for actors to communicate with the outside world

Here when we say the outside world, we mean code that is not running in the Process monad, and which is not connected to the Cloud Haskell node infrastructure. For example, one might define a set of actors that run (either locally, or across a cluster of nodes) in Cloud Haskell, and provide a web based (i.e. HTTP) interface for managing (or otherwise interacting with) these actors. Equally, it seems likely (if not certain) that production code might need to interact with other services which are not implemented using the actor model (or even using Haskell, for that matter).

In most implementations, actors communicate with the outside world (and each other) using asynchronous message passing, usually offering some kind of opaque handle for third parties to use when sending messages. Examples of this include the ProcessId type from distributed-process, and the SendPort a type from the same library (which represents the sending end of a typed channel). Some implementations offer other means of addressing actors, for example:

  • The process registry in Erlang
  • The process registry in Cloud Haskell
  • Actor paths/URIs in Akka

Unlike Erlang, asynchronous message passing is not a language level primitive in Haskell, so we need to consider how code running outside of the Process monad, and thus outside the context of a managed Cloud Haskell node, ought to interact with actors running within the system. One option is to use the runProcess function, which is defined in the API for Control.Distributed.Process.Node, which essentially does the following:

  • create an empty MVar
  • spawn a new Process to run the supplied code (i.e. forkIO and execute the given actor)
  • wait on the actor finishing (i.e. the Process code completing) and write () the MVar
  • have the calling thread wait on the MVar to determine that the actor has finished working

I've elided the details of error handling, and particularly asynchronous exception handling, and inheriting the masking state of the calling thread, and so on...

If we want to send a message to an actor in the system from outside then, we must forkIO a new thread and use thread synchronising techniques (like MVar) to determine that the sending completed. Since sending is asynchronous and should never fail - more in this later - we may instead choose to forkProcess in the calling thread, since we're not waiting on a reply anyway. As long as we do not mind the following actions in the forkIO thread that spawned the new process racing with the code that sends that message, this approach is fine. What if we wish to avoid having to forkIO for every send though? We can obviously define an Actor as a dispatcher to handle this:

-- imports and some type decls elided for brevity

-- let's assume type InputData = () or some such ...
type InputData = ()
type ClientHandle = TQueue (String, InputData) -- usable outside `Process'

dispatcher :: TQueue (String, InputData) -> Process ()
dispatcher q = forever $ readQ q >>= uncurry nsend
  where readQ = liftIO . atomically . readTQueue

mkDispatcher :: LocalNode -> IO ClientHandle
mkDispatcher node = do
  q <- newTQueueIO
  forkProcess node (dispatcher q)
  return q

someCodeThatNeedsToSend :: ClientHandle -> IO ()
someCodeThatNeedsToSend hClient = do
  -- this might be in IO, or in some other monad (a web framework, etc)
  thing <- getFantasticThingThatActorsCanProcess
  atomically $ writeTQueue hClient ("destination.actor.name", thing)

Except now we've lost a very useful property of using actors in the first place... We have no idea if the actor (viz, the forkIO thread we spawned using forkProcess earlier) is still running and consuming the TQueue. If we're expecting some side effect of sending the message to alter the state of the world at some point in time, this is obviously is not good. To make the issue more concrete, let's have the client handle provide an explicit reply channel (we'll ignore the fact this design ignores ordering when multiple clients write back...):

type InputData = ()
type OutputData = ()
type SendQ = TQueue (String, InputData) -- usable outside `Process'
type ReplyQ = TQueue (String, OutputData)
data ClientHandle = ClientHandle { sendQ :: SendQ, replyQ :: ReplyQ }

dispatcher :: TQueue (String, InputData) -> Process ()
dispatcher q = forever $ readQ q >>= uncurry nsend
  where readQ = liftIO . atomically . readTQueue

listener :: TQueue (String, OutputData) -> Process ()
listener q = forever $ expect >>= liftIO . atomically . writeTQueue q

mkDispatcher :: LocalNode -> IO ClientHandle
mkDispatcher node = do
  sQ  <- newTQueueIO
  rQ <- newTQueueIO
  forkProcess node (dispatcher sQ)
  return $ ClientHandle sQ rQ

someCodeThatNeedsToSend :: ClientHandle -> IO ()
someCodeThatNeedsToSend ClientHandle{..} = do
  -- this might be in IO, or in some other monad (a web framework, etc)
  thing <- getFantasticThingThatActorsCanProcess
  atomically $ writeTQueue sendQ ("destination.actor.name", thing)

someCodeThatWantsToGetTheReply :: ClientHandle -> IO OutputData
someCodeThatWantsToGetTheReply ClientHandle{..} = do
  (_, reply) <- atomically $ readTQueue replyQ
  return reply
  1. As per the above, we have a leaky abstraction...

What happens if the process you thought was registered at that name has died? What happens if it wasn't registered? If we were just passing data around using STM, at least the runtime system would have a chance to detect the deadlock, but here it's kept invisible from us, and as far as the RTS is concerned there may be no deadlock, if other threads are looking at the CQueue for the listener (for example), and so on.

Perhaps more to the point - since this discussion is not around correctness in and of itself - this places a lot of the burden for dealing with concurrency, cross-thread synchronised access to data, and all the things that go with that (e.g. exception safety, which I've blithely ignored in these examples) back on the client.

Cloud Haskell makes it possible to avoid these kinds of deadlocks by providing monitoring and supervision primitives, however they're not usable outside of the context of the Cloud Haskell actor system. Therefore it is awkward, if not impossible, to integrate Cloud Haskell code cleanly with code that runs outside of the process monad in a safe manner, unless the actor system is central to the design of the running application, and the code which runs outside of the process monad is a second class citizen to that which is managed by the CH node.

I believe this is a major blocker for many people using Cloud Haskell in their projects, since in the example where we have a web server that wishes to communicate with the actor system, it is very difficult to get that right, and especially problematic when there are different thread management (and possibly exception management) policies applied by the web application framework on which the bulk of the application code is presumably being written.

Erlang solves this problem by rote - virtually everything is a process, even port drivers running code using the FFI have to emulate the behaviour of an actor in the system (though NIFs are an exception to this, sort of). It is neither pragmatic, nor desirable for Cloud Haskell to take this approach. In Erlang several web frameworks have emerged, most of which deal with the networking (non-blocking socket I/O) by hand. But whilst Akka has demonstrated some great results using actors to process web requests in the Play Framework, I don't think Cloud Haskell has any appetite to compete with the likes of warp. Fundamentally, Cloud Haskell code needs to operate seamlessly with code written to run outside of the framework.

  1. Recognising that the actors model does not solve every problem

There are plenty of reasons why code which is not structured using the actor model might want to send Haskell data types across the wire, or even Haskell closures (or static pointers) over the network. Not only do we need to recognise that any actor implementation needs to play nicely with the world outside of it's managed infrastructure black box, we must also allow for the desire for distribution to occur outside of the actor model at all.

Many fantastic and complex applications use network-transport without incorporating distributed-process and it's Erlang based semantics. I suspect that higher level abstractions such as typed channels and monitoring would be useful in these contexts, without forcing the actor model as a requirement for having these things.

  1. Actors don't compose well

I don't think this really needs much explanation. Typed channels should compose well, since conceptually they are very simple. I suspect we can build a very strong and concrete API around these.

There are other channel based concepts to consider too, such as integration patterns such as splitting, re-sequencing, aggregating, dead wire tapping, and so on.

Clone this wiki locally