Saturday, December 27, 2008

Learning Scala for its Actors

...and very likely staying on for the rest.

I have been looking at the Actor model over the past couple of weeks as an easy way to write safer multithreaded Java programs. My focus has been on Java frameworks so far, mainly because there is a better chance of being able to build complex multithreaded systems at work than in the random stuff I do for fun, and work is a Java-heavy environment. So while I've been peripherally aware of Scala the language for a while now, mainly through Debashish Ghosh's blog, I've always thought of it as another mildly-interesting fringe language that also ran in the JVM.

However, of late, I've been reading a lot about Scala actors, and I had some time over the Christmas holidays, so I decided to bite the bullet and learn the language. In a way, it was similar to my motivation for learning Ruby for its Rails framework. Having spent about a week with it so far, I find that it is both a beautiful and practical language, once you get the hang of it.

Scala's syntax is similar to Java but different - its a bit like a lecture at college where you start with already knowing what the professor is talking about, so you space out for a bit, but when you get back you have no idea whats happening. Having had some similar experiences reading Scala code in the past, I decided to play it safe and read Ted Neward's A Busy Java Developer's Guide to Scala series on DeveloperWorks in date ascending order. I then worked through Martin Odersky's Scala by Example EBook available from the Scala site. Finally, I bought myself the Programming Scala book by Odersky, Spoon and Venners as a Christmas present, which I am still reading.

In any case, having learned some Scala, I decided to write my toy application that I have been using in my last two posts (here and here) in Scala using Scala's Actor. The result is shown below. It probably looks more like Java than Scala, but it is my first Scala program that actually does something I want to do (as opposed to the various tutorial examples).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Source: src/main/scala/myjob/ActorManager.scala
package myjob

import java.lang._
import java.util.concurrent.CountDownLatch
import scala.actors._

object ActorManager {

  val latch = new CountDownLatch(3)
  def decrementLatch(): Unit = {
    latch.countDown
  }

  def main(args: Array[String]): Unit = {
    // start the actors
    DownloadActor.start
    IndexActor.start
    WriteActor.start
    // seed the download actor with requests
    val start = System.currentTimeMillis
    for (i <- 1 until 100000) {
      println("Requested " + i)
      DownloadActor ! Message(i, "Requested " + i)
    }
    // ask them to stop
    DownloadActor ! StopMessage
    // wait for actors to stop
    latch.await
    println("elapsed = " + (System.currentTimeMillis - start))
  }
}

case class Message(id:Int, payload:String)
case class StopMessage()

object DownloadActor extends Actor {
  def act() {
    while (true) {
      receive {
        case Message(id, payload) => {
          println("Downloaded " + id)
          IndexActor ! 
            Message(id, payload.replaceFirst("Requested ", "Downloaded "))
        }
        case StopMessage => {
          println("Stopping download")
          IndexActor ! StopMessage
          ActorManager.decrementLatch
          exit
        }
      }
    }
  }
}

object IndexActor extends Actor {
  def act() {
    while (true) {
      receive {
        case Message(id, payload) => {
          println("Indexed " + id)
          WriteActor ! 
            Message(id, payload.replaceFirst("Downloaded ", "Indexed "))
        }
        case StopMessage => {
          println("Stopping Index")
          WriteActor ! StopMessage
          ActorManager.decrementLatch
          exit
        }
      }
    }
  }
}

object WriteActor extends Actor {
  def act() {
    while (true) {
      receive {
        case Message(id, payload) => {
          println("Wrote " + id)
        }
        case StopMessage => {
          println("Stopping Write")
          ActorManager.decrementLatch
          exit
        }
      }
    }
  }
}

This works as expected, interleaving the output of various actors. The poison pill handling is similar to previous examples, and the manager waiting till all the actors have completed processing and terminated are done using the CountDownLatch similar to the Jetlang example. Compiling and running the code are done using the following commands.

1
2
prompt$ scalac ActorManager.scala
prompt$ scala -cp . myjob.ActorManager

Scala also allows actors to work using an event model. Simply changing the "while (true); receive" to "loop; react" (and import scala.actors.Actor._ for its loop function) achieves the switch to the event based model. The times are comparable, although the latter runs in slightly less time for my application. I ran the code above through progressively increasing number of tasks and charted the elapsed wallclock times, similar to the Jetlang and Kilim examples from my previous two posts. Here is the graphs (Scala-receive is blue and Scala-react is cyan) and the numbers (includes numbers for Kilim (red) and Jetlang (green) from previous posts for comparison).

#-TasksKilimJetlangScala
(receive)
Scala
(react)
19104781
10264992105
10071105172176
10006056308941007
100004648599733443317
10000037076464262431420951

Update - 2009-01-02

As you can see from the comments, the numbers shown above are skewed because of the presence of console IO in the code. Removing the println() calls result in completely different performance numbers. Specifically, Jetlang and Kilim perform way faster than Scala. You can see updated code and performance numbers here.

I am no language expert, but Scala seems to have some nice things going for it. Things I liked about it is its Java-like (to begin with, at least) syntax, and that it is that it is a general-purpose language that can run either in the JVM (or .NET's CLR). The other thing is that it allows one to program in either an object oriented or functional manner, depending on need. While I don't see myself migrating from Java to Scala just yet, it may make sense to use it to build parts of applications and integrate with the main Java application. Another thing I notice is the trend of treating Scala as a sandbox for new ideas for Java, so if I am right about the trend, then it is likely that in the future Java will look more and more like Scala, so the investment in learning Scala would pay off not only in new functional idioms to use in Java or the ability to program in Scala, but also the ability to jumpstart oneself into new Scala-like Java features as they become available.

The ability to add new keywords and operators (really just functions, both of them) to the base language (like LISP's macros) is a great feature, although it can be a bit confusing when starting out. For example, the Actor framework with its receive and send (!) operators are not part of the base language but part of the Actor library. But its quite powerful once you get used to the idea.

Wednesday, December 24, 2008

Java Concurrency with Jetlang Actors

Last week, I wrote about my experiments with Kilim, a Java actor framework. This week, I take my same toy example and rewrite it using Jetlang, another Java actor framework. Jetlang is a Java port of the Retlang actor framework for C#/.NET, which is inspired by Erlang actors. From what I can see from reading about Retlang, Jetlang is still missing some functionality, but its still in its early stages (version 0.1.6 at the time of writing this).

Like Kilim, concurrency in Jetlang is made safer using a shared-nothing approach. Unlike Kilim, message communication happens using publish-subscribe. Actor threads are modeled as Fibers and Mailboxes are modeled as Channels. Actors sign up to receive events from Channels using predefined callbacks such as onReceive(), onStop() etc, and they perform an action defined by act() in these callbacks. An actor would publish to a Channel within its act() method.

If you have read my previous post, and the previous paragraph, there is really not that much to the code. Like my Kilim example, a lot of it is adapted from the PingPong example in either distribution. There is an ActorManager which is responsible for setting everything up, which you can see below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/ActorManager.java
package com.mycompany.myapp.concurrent.jetlang;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jetlang.channels.Channel;
import org.jetlang.channels.MemoryChannel;
import org.jetlang.core.Disposable;
import org.jetlang.fibers.Fiber;
import org.jetlang.fibers.PoolFiberFactory;

import com.mycompany.myapp.concurrent.kilim.Message;

public class ActorManager {

  public final static int NUM_ACTORS = 3;
  public final static String STOP = "__STOP__";
  
  public static void main(String[] args) {
    ExecutorService exec = Executors.newCachedThreadPool();
    PoolFiberFactory factory = new PoolFiberFactory(exec);

    // when the poison pill is received, the fiber.dispose() call will
    // call this and decrement the countdown latch. The onstop.await()
    // will block until the latch is zero, so that way the manager waits
    // for all the actors to complete before exiting
    final CountDownLatch onstop = new CountDownLatch(NUM_ACTORS);
    Disposable dispose = new Disposable() {
      public void dispose() {
        onstop.countDown();
      }
    };
    
    Fiber downloadFiber = factory.create();
    downloadFiber.add(dispose);
    DownloadActor downloadActor =  
      new DownloadActor(Channels.downloadChannel, Channels.indexChannel, 
      downloadFiber);
    
    Fiber indexFiber = factory.create();
    indexFiber.add(dispose);
    IndexActor indexActor = 
      new IndexActor(Channels.indexChannel, Channels.writeChannel, 
      indexFiber);
    
    Fiber writeFiber = factory.create();
    writeFiber.add(dispose);
    WriteActor writeActor = 
      new WriteActor(Channels.writeChannel, (Channel<Message>) null, 
      writeFiber);

    downloadActor.start();
    indexActor.start();
    writeActor.start();
    
    // seed the incoming channel with 10,000 requests
    for (int i = 0; i < 10000; i++) {
      String payload = "Requested " + i;
      System.out.println(payload);
      Channels.downloadChannel.publish(new Message(i, payload));
    }
    // send the poison pill to stop processing
    Channels.downloadChannel.publish(new Message(0, ActorManager.STOP));
    
    try { 
      onstop.await(); 
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    exec.shutdown();
  }
}

To stop the ActorManager, we rely on a similar poison pill approach that I used for the Kilim example. Each actor gets the poison pill, places it on its outbox and then terminates its Fiber by calling dispose(). This triggers off an onDispose() event which is decrements the count on the onstop CountDownLatch. The ActorManager waits on the CountDownLatch to terminate.

The Channels class is simply a holder class that holds final static instances of the channels. There is no difference in my case in having it here from instantiating them within the ActorManager.main(), but it looked like a good way to keep them by themselves since they are our "shared" resources here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/Channels.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.channels.MemoryChannel;

import com.mycompany.myapp.concurrent.kilim.Message;

public class Channels {

  public static final Channel<Message> downloadChannel = 
    new MemoryChannel<Message>();
  public static final Channel<Message> indexChannel = 
    new MemoryChannel<Message>();
  public static final Channel<Message> writeChannel = 
    new MemoryChannel<Message>();
}

Next up is an abstract Actor class built out of a Fiber and Channels representing the Actors inbox and outbox. There is an abstract act() method that application specific Actor subclasses will implement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/Actor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.core.Callback;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public abstract class Actor {

  private Channel<Message> inChannel;
  private Channel<Message> outChannel;
  private Fiber fiber;
  
  public Actor(Channel<Message> inChannel, 
               Channel<Message> outChannel, 
               Fiber fiber) {
    this.inChannel = inChannel;
    this.outChannel = outChannel;
    this.fiber = fiber;
  }
  
  public void start() {
    // set up subscription listener
    Callback<Message> onRecieve = new Callback<Message>() {
      public void onMessage(Message message) {
        act(message);
        if (outChannel != null) {
          outChannel.publish(message);
        }
        // process poison pill, dispose current actor and pass the message
        // on to the next actor in the chain (above)
        if (message.payload instanceof String &&
            ActorManager.STOP.equals(message.payload)) {
          fiber.dispose();
        }
      }
    };
    // subscribe to incoming channel
    inChannel.subscribe(fiber, onRecieve);
    // start the fiber
    fiber.start();
  }
  
  public abstract void act(Message message);
}

As before, the three actors are pretty trivial and pointless, so I just put them in here without much explanation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/DownloadActor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public class DownloadActor extends Actor {

  public DownloadActor(Channel<Message> inChannel, 
                       Channel<Message> outChannel,
                       Fiber fiber) {
    super(inChannel, outChannel, fiber);
  }

  @Override
  public void act(Message message) {
    String payload = (String) message.payload;
    payload = payload.replaceFirst("Requested ", "Downloaded ");
    System.out.println(payload);
    message.payload = payload;
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/IndexActor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public class IndexActor extends Actor {

  public IndexActor(Channel<Message> inChannel, 
                    Channel<Message> outChannel,
                    Fiber fiber) {
    super(inChannel, outChannel, fiber);
  }

  @Override
  public void act(Message message) {
    String payload = (String) message.payload;
    payload = payload.replaceFirst("Downloaded ", "Indexed ");
    System.out.println(payload);
    message.payload = payload;
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/jetlang/WriteActor.java
package com.mycompany.myapp.concurrent.jetlang;

import org.jetlang.channels.Channel;
import org.jetlang.fibers.Fiber;

import com.mycompany.myapp.concurrent.kilim.Message;

public class WriteActor extends Actor {

  public WriteActor(Channel<Message> inChannel, 
                    Channel<Message> outChannel,
                    Fiber fiber) {
    super(inChannel, null, fiber);
  }

  @Override
  public void act(Message message) {
    String payload = (String) message.payload;
    payload = payload.replaceFirst("Indexed ", "Wrote ");
    System.out.println(payload);
    message.payload = payload;
  }
}

Running the ActorManager produces the expected interleaved output, showing that the Actors are acting asynchronously.

I was curious about the assertion that Events are a bad idea for high-concurrency servers (PDF), so I figured that now I had two trivial versions of my application that did nothing substantial, it would be interesting to see if Kilim's wait-loop approach performed better than Jetlang's pub/sub approach. Here are the numbers - note that the times are just wall-clock times taken under very uncontrolled situations. The red line is for Kilim numbers and the green line is Jetlang's. So it looks like (at least in my test) the message passing overhead for Jetlang is slightly higher than Kilim, and increases with higher loads. However, what is an acceptable overhead for message passing would depend a lot on the application - it would probably be less important as the running times of the actors increases.

#-TasksKilimJetlang
1910
102649
10071105
1000605630
1000046485997
1000003707646426

Update - 2009-01-02

The performance numbers shown above are skewed due to the presence of Console IO in the code. While it is likely that real-world actors will do something more meaningful than replacing one string with another, possibly resulting in blocking similar to the System.out.println() calls above, we want to compare the frameworks themselves. Numbers with println() calls removed are provided in my post here.

Personally, now that I understand Jetlang a bit more than I did last week, here is some feedback. I found Jetlang initially slightly harder to understand than Kilim, but using it was simpler, mainly because there is no bytecode manipulation. I haven't used Jetlang enough to praise/blame it from a programming perspective, but here is a review of Retlang from a C#/Python developer that may be useful. Mike Rettig (the (original) author of Retlang and Jetlang) also posts interesting information on Jetlang in his blog.

Merry Christmas, everyone! Have a great holiday season.

Friday, December 19, 2008

Concurrency, Actors and Kilim

Motivation

I don't do too much multithreaded Java programming, and what I do (or have done) is quite basic. Most of them have to do with either spawning off an asynchronous thread to do some work in background, or to break up large batch processes to run in multiple threads so they can use multiple CPUs and complete faster.

Some weeks ago, I attended a presentation (organized by EBig) given by some folks from Azul Systems, where they demo-ed their 108-core (2x52) RISC chip with a JDK optimized to work with the chip. These chips are designed for running very large concurrent Java apps. Not having something large or complex enough to run on such a beast, I was pretty much not the target market, so the talk for me was in the "nice to know" category.

The day after, I attended a (free) community event hosted by QCon, where people were asking about whether Actors would be included in Java 7, since it has already been ported to Scala, which is apparently regarded as the unofficial sandbox for new features in Java. Not knowing anything about Actors, and hearing all kinds of vague but good things about how it can simplify concurrent programming, I made a mental note to look at it once I got out.

Turns out that I had quite a bit of reading to do. Its something of a recursive problem - once I learned about the Actor Framework, I wanted to take a look at Erlang. Along the way, I learned the difference between concurrent and parallel programming, and was led to LINDA and Javaspaces. I also read a few theoretical papers on concurrent and parallel programming, which outlines some very interesting design patterns (listed below in References) that can help in building "safer" concurrent systems. Finally, I decided I also wanted to brush up on the Java 5 data structures for concurrency as well, so I ended up reading Java Concurrency in Practice once again.

Also turns out that there are a bunch of really smart people building various flavors of the Actor framework in Java, which removes or postpones the necessity for (not so smart) people like me from having to learn Scala or Erlang to be able to program using Actors. The three I found are Functional Java, Kilim and Jetlang. Of the three, I liked Kilim the best, because of the simplicity of its API, although it uses bytecode enhancement to retain this simplicity, which may be a turn-off for some folks. Since I learn best by doing, I decided to code up a simple hypothetical system that made sense to me (rather than try to apply a concurrent Fibonacci generator or a PingPong example to my world). I describe this in today's post.

Background

Kilim's design is based on the message-passing concurrency approach popularized by Erlang. The application is broken up into completely independent processes (Actors), each of which have their own thread pool and which communicate with each other through messages, thus eliminating shared state, and thus eliminating problems of data corruption and deadlocks.

Below is a quote from the Side Words blog that provides a very nice and succint definition of what an Actor is and how Actor-based programming differs from Object-oriented programming.

At first sight, both may look similar. However, they have profound differences. Conceptually, objects are just “dead” data with methods. On the opposite, actors are “alive” indepedent processes with transactional memory. An actor is basically like a server. It receives and sends messages, and can also start/stop other agents. By “transactional state”, we mean that state is associated to a “conversation”; not to the actor itself, as opposed to objects having a global shared state. As a consequence actors are free of concurrency issues.

My Test Application

My test application downloads pages from the world wide web, indexes them to build up a term vector for the document (described elsewhere on my blog), and then writes the vector into a database table. This is what my application would look like if it was a single-threaded application. Each of these components has a well-defined interface. The Download job takes a URL and produces a Java object with the title and body parsed out into member variables, the Index job takes this Java object and produces a Map of terms and counts, and the Write job takes this map and writes it to the database.

My first cut at parallelizing this to run on a multi-CPU box would be to simply split this up horizontally into a bunch of threads, each of which runs the entire pipe. The first two processes should be read-only with respect to shared data, and the last one inserts into the database within transactions. So assuming that I've taken standard precautions about making the read-only shared data explicitly immutable, it should be safe to parallelize.

My next step is to optimize this a bit further. Given that my Index task is likely to be almost completely CPU-bound and my Write task almost completely IO-bound, and the Download task perhaps a combination of the two, and that I have 8 CPUs on my machine, it may make sense to allocate 3 of them to Download, 4 of them to Index and 1 of them to Write. We could do this by splitting up the process into three actors - Download, Index and Write - which would pools of 3, 4 and 1 threads respectively.

The code

Here is the code using Kilim. An ActorManager is responsible for setting up and starting the Actors, and feeding them with requests. Once done, it sends a poison pill request STOP, which causes the Actors to pass the message on to the next Actor and terminate themselves. The last Actor will terminate and send back an event to the ActorManager on the callback Mailbox, which will cause the ActorManager itself to terminate.

The ActorManager instantiates and passes in Mailboxes to the Actors. Each actor gets a reference to its inbox and a reference to its neighbor's inbox (its outbox). The Write Actor does not need to forward the results of its computation, so it has a null Mailbox for its outbox.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/ActorManager.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.ExitMsg;
import kilim.Mailbox;

public class ActorManager {

  public static final String STOP = "__STOP__";
  private static final int ACTOR_THREAD_POOL_SIZE = 2;
  
  public static void main(String[] args) {
    Mailbox<Message> mb0 = new Mailbox<Message>();
    Mailbox<Message> mb1 = new Mailbox<Message>();
    Mailbox<Message> mb2 = new Mailbox<Message>();
    Mailbox<ExitMsg> callback = new Mailbox<ExitMsg>();
    
    // instantiate actors
    DownloadActor downloadActor = new DownloadActor(
      ACTOR_THREAD_POOL_SIZE, mb0, mb1);
    IndexActor indexActor = new IndexActor(
      ACTOR_THREAD_POOL_SIZE, mb1, mb2);
    WriteActor writeActor = new WriteActor(
      ACTOR_THREAD_POOL_SIZE, mb2, null);
    
    // start the actors
    downloadActor.start();
    indexActor.start();
    writeActor.start();
    writeActor.informOnExit(callback);
    
    for (int i = 0; i < 10000; i++) {
      String req = "Requested " + i;
      mb0.putnb(new Message(i, req));
      System.out.println(req);
    }
    
    // poison pill to stop the actors
    mb0.putnb(new Message(Integer.MAX_VALUE, ActorManager.STOP));
    // block till the last actor has informed the manager that it exited
    callback.getb();
    System.exit(0);
  }
}

The base class for an Actor in Kilim is the Task, it provides a @pausable execute() method which needs to be overriden and implemented by subclasses. Read the Kilim docs for more details. The boilerplate code for the Task.execute() method has been abstracted out into the abstract Actor class, which requires subclasses to implement the act(Message)::void method. Here is the code for my Actor abstract class. The execute() method is an infinite read-eval-write loop.

Notice that the Actor class I have is tailored to my application, in the sense that it can take only two Mailbox objects during construction, and the poison pill handling is not generic. The Kilim docs mention that the "type-system" is not available in this release (0.5.1) because it is being rewritten, so I am guessing that it will also contain a more generic Actor class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/Actor.java
package com.mycompany.myapp.concurrent.kilim;

import kilim.Mailbox;
import kilim.Scheduler;
import kilim.Task;
import kilim.pausable;

public abstract class Actor extends Task {

  private Mailbox<Message> inbox;
  private Mailbox<Message> outbox;
  
  public Actor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    this.inbox = inbox;
    this.outbox = outbox;
    setScheduler(new Scheduler(numThreads));
  }

  public @pausable void execute() {
    for (;;) {
      Message request = inbox.get();
      // this is custom poison pill handling code for our application
      if (request.payload instanceof String &&
          ((String) request.payload).equals(ActorManager.STOP)) {
        if (outbox != null) {
          outbox.put(request);
        }
        break;
      }
      // end of poison pill handling
      act(request);
      if (outbox != null) {
        outbox.put(request);
      }
    }
  }
  
  public abstract void act(Message request);
}

The Message class is just a holder for the various Actors to update as it passes through them. We have an id here to identify it for logging purposes and such, but the important part is the payload. Each actor reads the payload, and writes out another payload object for the next actor to consume.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/Message.java
package com.mycompany.myapp.concurrent.kilim;

public class Message {

  public int id;
  public Object payload;
  
  public Message(int id, Object payload) {
    this.id = id;
    this.payload = payload;
  }
}

And here are the various Actor implementations. Currently they are stubbed out to just change the message and pass it on, and print out the message. There is not much to describe here, so I just show them all together.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/DownloadActor.java
package com.healthline.multicore.concurrent.kilim;

import kilim.Mailbox;

public class DownloadActor extends Actor {

  public DownloadActor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public void act(Message request) {
    Object payload = request.payload;
    if (payload instanceof String) {
      String responsePayload = new String((String) payload);
      responsePayload = responsePayload.replaceFirst(
        "Requested ", "Downloaded ");
      request.payload = responsePayload;
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/IndexActor.java
package com.healthline.multicore.concurrent.kilim;

import kilim.Mailbox;

public class IndexActor extends Actor {

  public IndexActor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public void act(Message request) {
    Object payload = request.payload;
    if (payload instanceof String) {
      String responsePayload = new String((String) payload);
      responsePayload = responsePayload.replaceFirst(
        "Downloaded ", "Indexed ");
      System.out.println(responsePayload);
      request.payload = responsePayload;
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/java/com/mycompany/myapp/concurrent/kilim/WriteActor.java
package com.healthline.multicore.concurrent.kilim;

import kilim.Mailbox;

public class WriteActor extends Actor {

  public WriteActor(int numThreads, Mailbox<Message> inbox,
      Mailbox<Message> outbox) {
    super(numThreads, inbox, outbox);
  }

  @Override
  public void act(Message request) {
    Object payload = request.payload;
    if (payload instanceof String) {
      String responsePayload = new String((String) payload);
      responsePayload = responsePayload.replaceFirst("Indexed ", "Wrote ");
      System.out.println(responsePayload);
      request.payload = responsePayload;
    }
  }
}

Compiling and Weaving

Kilim enhances the bytecode created by the compilation process, in a process called Weaving. It uses ASM to do the enhancement. Since I use Maven, I could either run the Weaver from the command line using java, write a Maven plugin, or generate an Ant build.xml from Maven, update it and use that going forward. I chose the last approach. In case there are any other Maven users out there, I did mvn ant:ant to build the build.xml, then I added this snippet to the "compile" target. Its almost a cut-n-paste from the target in the Kilim distribution's build.xml file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  <target name="compile" depends="get-deps" description="Compile the code">
    ...
    <java classname="kilim.tools.Weaver" fork="yes">
      <classpath refid="weave.classpath"/>
      <assertions>
        <enable/>
      </assertions>
      <arg value="-x"/><!-- skip classes that match ExInvalid -->
      <arg value="ExInvalid|test"/>
      <arg value="-d"/>
      <arg value="${maven.build.output}"/>
      <arg line="${maven.build.output}"/>
    </java>
  </target>

I also added this "run" target to use ant to call the ActorManager. I call it using ant run -Drun=full.classname.

1
2
3
4
5
6
7
8
  <target name="run" depends="compile" description="Run named class">
    <java classname="${run}" fork="yes">
      <classpath>
        <path refid="build.classpath"/>
        <pathelement location="${maven.build.output}"/>
      </classpath>
    </java>
  </target>

The code processes 10,000 URLs, so the run actually creates 30,000 tasks (10,000 per Actor) using the 3 thread pools of 2 threads each per actor. The entire thing completes in about 12 seconds on my laptop (with a AMD Turion dual-core CPU).

Random Observations

Similarity with HTTP request-response

The Actor framework could be seen as an inter-thread version of web applications. A web request may ultimately be served by some sort of servlet, but it can go through multiple proxies to get there. At each state, the proxies would probably add or process some attributes of the request. So essentially, the request (message) is the only thing that changes as it goes through different proxies (actors).

Similarity with JMS

Message passing concurrency looks very similar to a JMS application. In JMS terms, Kilim can be thought of as using a point-to-point architecture, whereas Jetlang uses a publish-subscribe architecture. I will probably try building something similar with Jetlang and talk about it in an upcoming blog.

Differences from Erlang Actors

From the Erlang examples I've seen, an Erlang actor comes preloaded with its own incoming Mailbox. An Erlang actor exposes two operators, a receive and send. The receive implicitly reads its inbox and matches the message against a set of patterns. If a pattern matches, then the actor to send to is specified during the send operation. In Java (pseudo-code), an Erlang style actor would look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  public class Actor {
    private Mailbox<Message> inbox;

    public Actor() {
      inbox = new Mailbox<Message>
    }

    public Message receive() {
      for (;;) {
        Message message = inbox.get();
        act(message);
      }
    }

    public void send(Message message) {
      inbox.put(message);
    }

    public void act(Message message);
  }

A subclass act(Message)::void implementation would look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  public class MyActor extends Actor {
    public void act(Message message) {
      if (Message instanceof Foo) {
        message = doPrefooOperation(message);
        actors.get(Foo).send(message);
      } else if (Message instanceof Bar) {
        message = doPrebarOperation(message);
        actors.get(Bar.class).send(message);
      } else {
        actors.get(Baz.class).send(message);
      }
    }
  }

This requires splitting up the @pausable execute method into three methods to make the API look pretty (and it doesn't work with Kilim - I tried it, and based on the Weaver's complaints, I ended up making almost everything @pausable, which caused other issues, so I gave up). It also introduces an additional shared resource, the actors registry to do the lookup of the Actor class. However, this functionality can be done with Kilim slightly differently. Since we know that our MyActor class can talk to three different Actors, we configure it with the references to their incoming mailboxes, something like this. In either case, we are telling the code where to send the processed message, the only difference is that in Erlang you tell it in its receive method and in Kilim you tell it during contruction.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  public class MyActor extends Task {
    public Mailbox<Message> inbox;
    public Mailbox<Message> fooInbox;
    public Mailbox<Message> barInbox;
    public Mailbox<Message> bazInbox;

    public MyActor(Mailbox<Message> inbox,
                   Mailbox<Message> fooInbox,
                   Mailbox<Message> barInbox,
                   Mailbox<Message> bazInbox) {
      // set these
    }

    public @pausable void execute() {
      for (;;) {
        Message message = inbox.get();
        if (message instanceof Foo) {
          message = doPrefooOperation(message);
          fooInbox.put(message);
        } else if (message instanceof Bar) {
          message = doPrebarOperation(message);
          barInbox.put(message);
        } else {
          bazInbox.put(message);
        }
      }
    }
  }

References

  • Java Concurrency in Practice. If you are going to do much concurrent Java programming, you may want to read this book. It has lots of tips on writing safe concurrent programs, as well as in-depth descriptions of the concurrency classes in Java 5.
  • Concurrency tips - from the Java Specialist site, written by Dr Heinz M Kabutz. A set of 10 commandments for programmers who write (or will write) multithreaded Java programs.
  • The Problem with Threads (PDF) by Edward E Lee. This is also available from the Kilim site. The article argues for identifying shared structures in concurrent code, then isolating them from the rest of the code via synchronized data structures.
  • How to Write Parallel Programs (PDF) by Nicholas Carriero and David Gelernter. It describes various ways to partition your code into concurrent/parallel modules which can be run simultaneously.

In addition, there is a list of interesting concurency articles maintained by Alex Miller that you may also want to take a look at.