Building an “actor” in F# with higher throughput than Akka and Erlang actors

Building an “actor” in F# with higher throughput than Akka and Erlang actors

The “Big Data” problem

Our free lunch is over!

The number of transistors in our CPUs is still increasing like Moore’s law predicted. However, the frequency of our chips is flat-lining. We can no longer expect to see a 2x or even 1.5x performance improvement every 18 months from code that doesn’t exploit parallelism.

“Big Data” has arrived

According to Cisco we are now in The Zetabyte Era.

Global IP traffic has increased eightfold over the past 5 years.

Globally, mobile data traffic will increase 18-fold between 2011 and 2016.

The Economist ran an article in 2010 on what some are calling “the industrial revolution of data”.

Oracle, IBM, Microsoft and SAP between them have spent more than $15 billion on buying software firms specialising in data management and analytics.

This industry is estimated to be worth more than $100 billion and growing at almost 10% a year, roughly twice as fast as the software business as a whole.

We need a different programming model

The most prevalent programming model today is probably Object Oriented Programming (OOP). This typically results in a system of objects encapsulating some mutable state and interacting with other objects through method calls.

Concurrent changes to mutable state need to be synchronized to avoid race hazards. In OOP there are two main ways of doing this:

Locks don’t scale well. They are inefficient under even moderate amounts of contention. At best, they involve threads occasionally spinning whilst doing no useful work. At worst, they involve multiple threads incurring the overhead of taking trips through the operating system.

Lock/wait-free code is generally far more efficient. However, it is hard to write. In fact, building good, lock-free data-structures is still publishable. For example, this paper on building lock-free queues was published in August. Therefore, writing lock/wait-free code ad hoc is probably not going to scale to a whole system.

Fortunately, there are several alternatives to OOP that might help us code for these “Big Data” scale problems. Each alternative has benefits and drawbacks. For example, Functional Programming used alongside OOP promotes the use of immutable objects, which abates the need for synchronization. However, the GCs in the VMs we use (particularly the JVM and CLR) don’t always cope well with the associated increase in garbage.

In this article we will discuss the Actor Model, another alternative/modification to OOP. In particular, we will show how to construct actors in F# that are capable of processing a high throughput of messages.

What are actors?

Conceptually an actor has its own “thread” of execution.

  • It processes each message based on its current state.
    • This might involve changing its state through mutation, creating new actors or sending messages to existing actors.
  • It also designates how it will handle the next message.
    • This could be through recursion, looping or something else.

The Actor Model was invented by Carl Hewitt. Earlier this year Eric Meijer and Clemens Szyperski interviewed him for Channel 9 (see the video here). He said the following about actors:

They are the fundamental unit of computation.
They have to embody processing, storage and communication.

Language support

Erlang

Erlang was designed at Ericsson in 1986. It was built to support concurrency, fault-tolerance and hot-swapping. It is quite heavily used in industry through various database and queue offerings. For example, CouchDB, SimpleDB, Riak and RabbitMQ all make use of Erlang.

The Erlang VM has a garbage collector per process (i.e., per actor). Unfortunately, it is beyond the scope of this article to investigate how this changes the latency distribution of various systems in comparison to the same systems running on the JVM or CLR.

Scala

Scala has several implementations of actors. Akka is (at the time of writing) the most advanced of these. According to their website Akka supports:

50 million msg/sec on a single machine.

Small memory footprint; ~2.7 million actors per GB of heap.

See the Akka team blog for more details on how these were measured.

F#

F# has one implementation of actors, the MailboxProcessor, built into its core library. It is easy to use and integrates well with Asynchronous Workflows, another F# feature. However, it does have some shortcomings in comparison to the actor implementations described above. The primary issue is the lack of support for communicating between system processes or machines. Addressing this issue is beyond the scope of this article.

Other implementations

See this page on Wikipedia.

Performance: Erlang vs Akka

There are a few blog posts with code that compare the performance of Erlang to Akka. Paul Keeble measured the throughput of a single actor in both Erlang and Akka in this blog article. Franz Bettag later repeated this experiment here. I repeated the experiment on my 2.8GHz i7 with 4GB RAM running Windows 8. These were my results:

  • Erlang was capable of processing over 1,210,000 messages per second (on average over 10 runs of 3,000,000 messages).

Eshell V5.9.3  (abort with ^G)
1> client:runTest(3000000).
Count is {ok,300000000}
Test took 2.824 seconds
Throughput=1062322.9461756374 per sec
ok
2> client:runTest2(3000000).
Count is 300000000
Test took 2.465 seconds
Throughput=1217038.5395537526 per sec
ok
  • Akka was capabable of processing over 2,620,000 messages per second (on average over 10 runs of 3,000,000 messages). We used Scala v2.9.2, Akka v2.0.4 and passed custom parameters to SBT in order to set the heap size and turn on various optimizations (see my github project for more details).

Multiple main classes detected, select one to run:

 [1] ag.bett.scala.test.BenchmarkAll
 [2] ag.bett.scala.test.akka.Application

Enter number: 1

[info] Running ag.bett.scala.test.BenchmarkAll
Warmup run!

[akka] Count is 300000000
[akka] Test took 1.195 seconds
[akka] Throughput=2510460.251046025 per sec

Warmup run finished!
Garbage Collection


[akka] Count is 300000000
[akka] Test took 1.096 seconds
[akka] Throughput=2737226.2773722624 per sec

[success] Total time: 12 s, completed 10-Dec-2012 22:14:01

To test the throughput each actor maintains a count. We send them each 3,000,000 messages from up to 8 parallel threads asking them to increase their counts by some fixed amount. We then record the average number of messages they process each second by inspecting the count with another message. See Paul Keeble’s code here on github for more details.

There is one thing to keep in mind about this test. In the interview with Carl Hewitt, Carl said this:

One actor is no actor.

This statement highlights what is wrong with these performance comparisons. Erlang was built for scaling out to millions of actors interacting with each other. The throughput of one actor doesn’t tell us anything particularly interesting. However, we will conveniently forget this for the rest of this article.

Repeating the experiment in F#

The Discriminated Union below defines the messages that the actor can accept.


type CounterMsg =
   | Add of int64
   | GetAndReset of (int64 -> unit)

The code beow defines an actor. It pulls a message from a mailbox, processes it and recurses. An Add(n) message causes it to update its count. A GetAndReset(reply) message causes it to reset its count after replying with the current count.


let vanillaCounter = 
   MailboxProcessor.Start 
      let rec loop count = async {
         let! msg = inbox.Receive()
         match msg with
         | Add n -> return! loop (count + n)
         | GetAndReset reply ->
            reply count
            return! loop 0L
      }
      loop 0L

This actor was capable of processing over 930,000 messages per second (on average over 10 runs of 3,000,000 messages). This is under half the throughput of the Akka actor but very close to the throughput of the Erlang actor.

A simple actor implementation in F#

The code below implements a simple actor. It makes use of the highly efficient lock-free ConcurrentQueue that is built into the framework. However, it also uses event wait handles for “waking” the “sleeping” actor when new messages arrive. This is inefficient.


type 'a ISimpleActor =
   inherit IDisposable
   abstract Post : msg:'a -> unit
   abstract PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b

type 'a SimpleMailbox() =
   let msgs = ConcurrentQueue()
   let onMsg = new AutoResetEvent(false)

   member __.Receive() =
      let rec await() = async {
         let mutable value = Unchecked.defaultof
         let hasValue = msgs.TryDequeue(&value)
         if hasValue then return value
         else 
            let! _ = Async.AwaitWaitHandle onMsg
            return! await()        
      }
      await()

   member __.Post msg = 
      msgs.Enqueue msg
      onMsg.Set() |> ignore

   member __.PostAndReply msgFactory =
      let value = ref Unchecked.defaultof
      use onReply = new AutoResetEvent(false) 
      let msg = msgFactory (fun x ->
         value := x
         onReply.Set() |> ignore
      )
      __.Post msg
      onReply.WaitOne() |> ignore
      !value

   interface 'a ISimpleActor with
      member __.Post msg = __.Post msg
      member __.PostAndReply msgFactory = __.PostAndReply msgFactory
      member __.Dispose() = onMsg.Dispose()

module SimpleActor =
   let Start f =
      let mailbox = new SimpleMailbox()
      f mailbox |> Async.Start
      mailbox :> _ ISimpleActor

We used this implementation with the code below to reconstruct the same throughput test that we used above.


let simpleActor = 
   SimpleActor.Start 
      let rec loop count = async {
         let! msg = inbox.Receive()
         match msg with
         | Add n -> return! loop (count + n)
         | GetAndReset reply ->
            reply count
            return! loop 0L
      }
      loop 0L

This actor was capable of processing over 1,070,000 messages per second (on average over 10 runs of 3,000,000 messages). This is slightly faster than the built in MailboxProcessor and tantalizingly close to Erlang’s actor performance. However, it still doesn’t come close to the Akka implementation.

An improved actor implementation in F#

We improved upon the implementation above by removing the need for the event wait handles in the Post(...) method.

We did this by introducing a count variable that keeps track of the number of messages that are left to be processed by the actor.

When an actor posts something into this actor they atomically increment the message count. If they posted the first message (i.e., the count is now 1) then they take resposibility for ensuring the actor processes its incoming messages until the count is reduced back to 0. This might involve scheduling the processing of messages on a different thread. However, for this particular problem, we found that this reduced throughput by between 10-20%.

When the “responsible” thread has processed a message it atomically decrements the count. If the count reaches 0 then it effectively “loses responsibility” and can continue to execute the rest of the work it has to do.

We made one further optimization. This was to avoid using the ConcurrentQueue for the first message when the count is 0. This improved throughput by approximately 15%.


type 'a ISharedActor =
   abstract Post : msg:'a -> unit
   abstract PostAndReply : msgFactory:(('b -> unit) -> 'a) -> 'b

type 'a SharedMailbox() =
   let msgs = ConcurrentQueue()
   let mutable isStarted = false
   let mutable msgCount = 0
   let mutable react = Unchecked.defaultof
   let mutable currentMessage = Unchecked.defaultof

   let rec execute(isFirst) =

      let inline consumeAndLoop() =
         react currentMessage
         currentMessage <- Unchecked.defaultof
         let newCount = Interlocked.Decrement &msgCount
         if newCount  0 then execute false

      if isFirst then consumeAndLoop()
      else
         let hasMessage = msgs.TryDequeue(&currentMessage)
         if hasMessage then consumeAndLoop()
         else 
            Thread.SpinWait 20
            execute false

   member __.Receive(callback) = 
      isStarted <- true
      react <- callback

   member __.Post msg =
      while not isStarted do Thread.SpinWait 20
      let newCount = Interlocked.Increment &msgCount
      if newCount = 1 then
         currentMessage <- msg
         // Might want to schedule this call on another thread.
         execute true
      else msgs.Enqueue msg

   member __.PostAndReply msgFactory =
      let value = ref Unchecked.defaultof
      use onReply = new AutoResetEvent(false)
      let msg = msgFactory (fun x ->
         value := x
         onReply.Set() |> ignore
      )
      __.Post msg
      onReply.WaitOne() |> ignore
      !value


   interface 'a ISharedActor with
      member __.Post msg = __.Post msg
      member __.PostAndReply msgFactory = __.PostAndReply msgFactory

module SharedActor =
   let Start f =
      let mailbox = new SharedMailbox()
      f mailbox
      mailbox :> _ ISharedActor

We used this implementation with the code below to reconstruct the same throughput test that we used previously.


let sharedActor = 
   SharedActor.Start 
      let rec loop count =
         inbox.Receive(fun msg ->
            match msg with
            | Add n -> loop (count + n)
            | GetAndReset reply ->
               reply count
               loop 0L)
      loop 0L

This actor was capable of processing over 4,660,000 messages per second (on average over 10 runs of 3,000,000 messages). Therefore, this implementation provides a 3.8x throughput improvement on the Erlang implementation and a 1.7x throughput improvement on the Akka implementation.

Conclusion

We have compared some Scala, Erlang and F# actors. We found that Scala’s Akka allowed far greater throughputs than the default Erlang and F# implementations. We also built actors from scratch in F# that are capable of processing substantially higher throughputs than any of the default implementations. However, we noted that these are not very interesting or important results. What we really need to measure is the throughput and latency of realistically sized systems of actors interacting with one another. The Akka team have made a step in this direction. In their blog they look at how the total throughput of all actors in a system scales as more actors are added into that system.

We need more examples of how complex systems can be better described using the Actor Model than using OOP. Though it probably isn’t a silver bullet, the Actor Model is a promising solution to our “Big Data” problem because it is far easier to express concurrent interactions using actors than it is with OOP.

Possible next steps

  • Measure the (suspected positive) impact of per actor garbage collection that Erlang uses in comparison to the shared heap implementations of actors on the JVM and CLR
  • Measure the relationship between memory and the number of actors in a system on Akka, Erlang and F#
  • Measure the throughput across a large system of interacting actors in Akka, Erlang and F#
  • Implement cross machine actor communication in F#

Resources

This entry was posted in Uncategorized and tagged , , . Bookmark the permalink.

18 Responses to Building an “actor” in F# with higher throughput than Akka and Erlang actors

  1. Pingback: Building an “actor” in F# with higher throughput than Akka and Erlang actors | Functional programming | Scoop.it

  2. Thanks for adding to the set of benchmarks out there, even more so because you acknowledge how useful such an experiment really is. Since you did not explicitly provide the source for your Akka test run (I’m assuming that you use Franz’ code), I just quickly typed it into my REPL using Akka 2.1.0-RC4 (just because that is the one we are currently staging for release). The performance difference to 2.0.4 should be small for all I know. In order to reproduce my steps, clone Akka, start “sbt akka-actor/console” and do as I did in this gist: https://gist.github.com/4246790
    (the second file can be “:paste”-ed into that “sbt akka-actor/console”)
    On my MacBook Pro from early 2011 (2.2GHz i7, 12GB) the first “simple” actor solution does roughly 2–3 million msgs/sec, but it fluctuates a lot; the usage of the parallel foreach makes thread scheduling a little chaotic (without .par it fluctuates less, around 2 million msgs/sec). Reducing the thread pool size to one for the actor under test and warming up the VM results in just shy of 6 million msgs/sec processed (without the .par for the sending it fluctuates even less and reaches 6 million).

    But as you noted, that is not the point. (Which is why I did not even try to tune settings.)

    Akka is very much about offering a complete toolkit for actually making use of actors to build fault-tolerant systems. Supervision hierarchies are essential for this, and to my knowledge none of the other systems have them (in Erlang you can link, but in Akka it is mandatory; I’d argue that the average application is better designed because of this). Without supervision there is no failure handling in an asynchronous system. Another corner-stone is distribution across network nodes. All single-host performance is not worth anything if your box keels over and dies. The actor model is not useful unless embedded in a complete toolkit.

  3. Pingback: F# Weekly #49, 2012 « Sergey Tihon's Blog

  4. says:

    You say: “However, they have neglected to provide any material on what methods they used to determine these figures or how we might reproduce them.”

    To be honest, how much time did you spend trying to find it? Because I found it in 3 seconds: https://www.google.se/search?q=50+million+messages+akka

  5. zbray says:

    Thanks for the link Viktor.

    BTW I think you guys at TypeSafe are doing a fantastic job at spreading ideas that need to be spread. Akka is a brilliant product. I recommend it to anyone using the JVM and struggling with concurrency. I wish we had something as mature as Akka in .Net!

    Hopefully you read the rest of the article. I tried to point out several times that the metric itself is pretty meaningless. Case studies are far more interesting. I think it would be even better if the claim wasn’t made at all, as it doesn’t really mean anything. Having said this, I still think the article should be referenced where the claim is made on the Akka website. Maybe I’m a bit anal but these things annoy me.

    Thanks for reading!

    • says:

      Hi Zack,

      I’m glad to hear that you enjoy Akka,

      I am also a tad anal at times, but you did claim that there wasn’t _any_ material on what methods we used or how to reproduce, I hope I have falsified that claim as we have an amazing blog article describing the benchmark, the machine and configuration: (http://letitcrash.com/post/20397701710/50-million-messages-per-second-on-a-single-machine)

      As for adding a link to the blog article from the claim, that’s something that we’ll consider doing.

      Our benchmark is not about measuring performance, it’s about measuring scalability, a measurement which is far more interesting (at least to us) than performance.

      Looking forward to seeing the article get updated with appropriate version and settings for Akka so we can actually compare numbers. To me it seems like you were running the Scala Actors benchmark instead of Akka, as the performance numbers were almost identical to the ones in the referenced blog article.

      Cheers,

      • zbray says:

        Thanks again Viktor – I’ll add a link to the blog articles for each of these claims. Hopefully, I’ll get a chance to check the benchmarks this evening and update the article if necessary. Also, I’m sorry I must have missed the last section measuring throughput against the number of actors. I agree this is a far more interesting measurement! I might update my conclusion to reference this as an example of an interesting measurement. I’d love to recreate it in F#/Erlang too and compare.

  6. says:

    Hi Zach,

    Sounds great, I am looking forward to reading it!

    Cheers,

    • zbray says:

      You were both right. I’m sorry. I must have used the wrong repository to run the tests. I’ve just updated the article and *hopefully* corrected any inconsistencies.

  7. zbray says:

    EDIT: Improved the performance from 3,320,000 msgs/sec to 4,660,000 msgs/sec by updating the test to use Array.Parallel instead of Async. See changes here: https://github.com/ZachBray/ActorPerformanceTests/commit/49b238968371cff0f0c4696bd662578f44791097

    • zbray says:

      NOTE: This has made the other F# measurements slightly out. They need to be re-measured, as they will probably improve too.

      • says:

        Since you are tuning the F# code I think it’s only fair to tune the Akka configuration, as Roland points out he can get it > 6 million msgs/s by just changing number of threads to use. (i.e. 0 code changes)

  8. Thanks for running the Akka test and sharing the code, your numbers now match mine. It would still be nice if you could acknowledge that the title of your post is still very misleading, though, since you explicitly tuned your F# solution to use as few threads as possible. The default Akka configuration is tuned for running many more actors than you have cores (which matches typical programs), which leads to scheduling artifacts when running only one actor. Your result for Akka is roughly comparable to the “simplistic” F# solution. A fair comparison of your tuned version would mean to configure a very small thread pool (1 thread should be enough for 1 actor), which is exactly what I did in my gist above.

    In order to show you that this is not idle speculation let me explain what is going on. Your i7 box very probably advertises eight cores which leads to a default thread-pool size of 16. The default settings lead to a new task being scheduled for every batch of five messages processed by an actor, and this scheduling happens of course from the preceding task’s run method. The executor will want to run the new task ASAP, meaning that it will dispatch the same actor onto a different thread each time (due to an overlap of a few byte-code instructions). This leads to a thread-hopping exercise done by the one actor, which of course is very inefficient (CPU caches, thread wake-up, etc.).

    The Akka default settings scale extremely well because this artifact of running the degenerate case goes away as soon as all threads are busy: instead of hopping, the executor will preferably run the actor on the current thread, tuning automatically to the behavior you had to manually build into your actor implementation. We have Doug Lea to thank for that!

    So, I’d like to ask you to either compare to sensible Akka settings (single-threaded pool or much higher batch size setting “akka.actor.default-dispatcher.throughput=1000”) or remove the comparison altogether. And as we all agree, a more meaningful test would not have one of these actors running but 100, at least.

  9. says:

    Still eagerly awaiting the final verdict here…

  10. Jon Harrop says:

    If the message is 10x bigger (e.g. a linked list with 10 elements) does Erlang run 10x slower because it deep copies whereas the shared-heap implementations run at the same speed?

    Throughput is certainly interesting but what about latency?

  11. Rick says:

    Each actor implementation is incomplete if it does not incarnate fault-tolerance and remoting from its roots. I’m sad to say that until now only one was able to be keen enough to start its life taking remoting and fault-tolerance very seriously – Erlang. Akka is very nice but comes from single-VM life and takes remoting as an add-on. F# and others ought to be ashamed to compare themselfes with Erlang. Oh, wow. 2x performance in-memory… Who cares if you still forgot the basics? We are nearly 30 years away from Erlang birth and nobody understands the basics.
    Please, if you ever think about starting an actor framework – start with failure-tolerance and remoting. Otherwise you see an almost perfect framework – like Akka.
    Akka is really good. Indeed. They enforece supervision! This is further dimension compared to Erlang. But, they forgot remoting! I’m so sad to see that Akka is not able to handle supervision on remoting like Erlang. So, they invented the wheel more than once. (Jgroups, gossip,…) Nobody needs actor clustering – simply do actor remoting with supervision and your done. Get the first step right. Sorry, HAkkars but see this criticism as a high-degree acknowledgement of the good you did to the JVM victims.
    Certainly Akka is the best failure-tolerant, remoting actor framework on this silly planet.
    So I stay with them and theay will do it. It makes sense what they did and what they intend to do.
    But, hey, don’t compare apples with a supervision tree of apples…
    Cheers.

    • Rick, I honestly have no idea what you are talking about: everything in Akka has been designed with remoting in mind (we call it “location transparency” to avoid the misused term “transparent remoting”). Are you referring to the fact that the Akka project has not yet reached the end of its roadmap? There certainly are features and improvements missing, but it is impossible to tell from your comment what you mean.

      But then again you also write “Akka is the best failure-tolerant, remoting actor framework”; I’m not sure what to make of this praise now. Thanks, I guess.

      BTW: “nobody needs actor clustering” is quite the opposite from what I hear when talking with people.

  12. James says:

    All of the angle brackets have disappeared from your F# code

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s