Saturday, March 29, 2008

Crawling web pages with WebSPHINX

Some proof-of-concept type projects I have worked on in the past involves crawling pages off public web sites. Since these are one-time crawls involving a few hundred pages at most, its generally impractical to set up a full blown crawl using your massively distributed parallel computing crawling juggernaut of choice. I recently discovered WebSPHINX, a really cool little GUI tool and Java class library, that allow you do this fairly easily and quickly. This blog post describes a few of these use-cases (with code) that show how easy it is to use the class library. In other words, it describes a programmer's hack to crawl a few hundred pages off the web once in a while for one time use.

The GUI tool is nice, and allows you to experiment with some basic crawls. However, to actually do something with the pages you crawled inline with the crawl, you need to get the Javascript library that comes bundled with Netscape. Not surprising, since the last release of WebSPHINX was in 2002, and Netscape was the browser technology leader at the time. I use Firefox now, however, and I didn't want to download Netscape just to get at its embedded JAR file. In any case, the class library is simple enough to use, so I just went with that.

At its most basic, all you have to do to build your own crawler is to subclass the WebSPHINX crawler and override the doVisit(Page) method to specify what your crawler should do with the Page it visits. Here you could probably have parsing or persistence logic for the Page in question. There are some other methods you can subclass as well, such as the shouldVisit(Link) method, which allows you to weed out URLs you don't want to crawl even before they are crawled, so you incur less overhead.

I created a base crawler class MyCrawler which all my other crawlers extend. The MyCrawler class contains a few things that make the subclasses a little more well-behaved, such as obeying the robots.txt exclusion file, waiting 1s between page visits, and the inclusion of a UserAgent string that tells the webmaster of my target site who I am and how I should be contacted if necessary. Here is the code for this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// MyCrawler.java
package com.mycompany.mycrawler;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import websphinx.Crawler;
import websphinx.DownloadParameters;
import websphinx.Page;

public abstract class MyCrawler extends Crawler {

  private static final long serialVersionUID = 2383514014091378008L;

  protected final Log log = LogFactory.getLog(getClass());

  public MyCrawler() {
    super();
    DownloadParameters dp = new DownloadParameters();
    dp.changeObeyRobotExclusion(true);
    dp.changeUserAgent("MyCrawler Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.8.1.4) " + 
      "WebSPHINX 0.5 contact me_at_mycompany_dot_com");
    setDownloadParameters(dp);
    setDomain(Crawler.SUBTREE);
    setLinkType(Crawler.HYPERLINKS);
  }
  
  @Override
  public void visit(Page page) {
    doVisit(page);
    try {
      Thread.sleep(1000L);
    } catch (InterruptedException e) {;}
  }
  
  /**
   * Extend me, not visit(Page)!
   */
  protected abstract void doVisit(Page page);
}

Downloading a small site

This class hits a small public site and downloads it to your local disk. The URL structure determines the file and directory names on the local disk. You may need to tweak the logic for figuring out the mapping from the URL path to the local file path, this worked for my test, but may not work for any arbitary site. The init() method contains application logic and is called from the main() method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// SiteDownloadingCrawler.java
package com.mycompany.mycrawler;

import java.io.File;
import java.net.URL;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;

import websphinx.Link;
import websphinx.Page;

public class SiteDownloadingCrawler extends MyCrawler {

  private static final long serialVersionUID = 64989986095789110L;

  private String targetDir;
  
  public void setTargetDir(String targetDir) {
    this.targetDir = targetDir;
  }
  
  private void init() throws Exception {
    File targetDirFile = new File(targetDir);
    if (targetDirFile.exists()) {
      FileUtils.forceDelete(targetDirFile);
    }
    FileUtils.forceMkdir(targetDirFile);
  }
  
  @Override
  protected void doVisit(Page page) {
    URL url = page.getURL();
    try {
      String path = url.getPath().replaceFirst("/", "");
      if (StringUtils.isNotEmpty(path)) {
        String targetPathName = FilenameUtils.concat(targetDir, path);
        File targetFile = new File(targetPathName);
        File targetPath = new File(FilenameUtils.getPath(targetPathName));
        if (! targetPath.exists()) {
          FileUtils.forceMkdir(targetPath);
        }
        FileUtils.writeByteArrayToFile(targetFile, page.getContentBytes());
      }
    } catch (Exception e) {
      log.error("Could not download url:" + url.toString(), e);
    }
  }
  
  /**
   * This is how we are called.
   * @param argv command line args.
   */
  public static void main(String[] argv) {
    SiteDownloadingCrawler crawler = new SiteDownloadingCrawler();
    try {
      crawler.setTargetDir("/tmp/some-public-site");
      crawler.init();
      crawler.setRoot(new Link(new URL("http://www.some-public-site.com")));
      crawler.run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Downloading from a site protected by basic authentication

Sometimes you may need to download files off a remote, but non-public site. The site is protected by Basic HTTP Authentication, so the contents are only available to trusted clients (of which you are one). I used Javaworld's Java Tip 46 to figure out how to do this. The actual persistence work is identical to the SiteDownloadingCrawler, so we just use the superclass's doVisit(Page) method here.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// RemoteFileHarvester.java
package com.mycompany.mycrawler;

import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.URL;

import websphinx.Link;

public class RemoteFileHarvester extends SiteDownloadingCrawler {

  private static final long serialVersionUID = 3466884716433043917L;
  
  /**
   * This is how we are called.
   * @param argv command line args.
   */
  public static void main(String[] argv) {
    RemoteFileHarvester crawler = new RemoteFileHarvester();
    try {
      crawler.setTargetDir("/tmp/private-remote-site");
      URL rootUrl = new URL("http://private.site.com/protected/");
      Authenticator.setDefault(new Authenticator() {
        protected PasswordAuthentication getPasswordAuthentication() {
          return new PasswordAuthentication("myuser", "mypassword".toCharArray());
        }
      });
      crawler.setRoot(new Link(rootUrl));
      crawler.run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Harvesting URLs from a public site

Sometimes, all you want are a bunch of URLs of pages from a public site. The URLs could be used as input to another process. In most cases, the URLs you are interested in have a distinct structure which you can exploit to reduce the I/O your crawler is doing, and also reducing the load on the public site. We extend the shouldVisit(Link) method here to tell the crawler to not even bother to visit pages whose URLs don't match the pattern. Additionally, we have application level init() and destroy() methods that opens and closes the handle to the output file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// UrlHarvestingCrawler.java
package com.mycompany.mycrawler;

import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URL;

import websphinx.Crawler;
import websphinx.Link;
import websphinx.Page;

public class UrlHarvestingCrawler extends MyCrawler {

  private static final long serialVersionUID = 9015164947202781853L;
  private static final String URL_PATTERN = "some_pattern";

  private PrintWriter output;
  
  protected void init() throws Exception {
    output = new PrintWriter(new OutputStreamWriter(
      new FileOutputStream("/tmp/urls-from-public-site.txt")), true);
  }
  
  protected void destroy() {
    output.flush();
    output.close();
  }
  
  @Override
  protected void doVisit(Page page) {
    URL url = page.getURL();
    output.println(url.toString());
  }
  
  @Override
  public boolean shouldVisit(Link link) {
    URL linkUrl = link.getURL();
    return (linkUrl.toString().contains(URL_PATTERN));
  }
  
  /**
   * This is how we are called.
   * @param argv command line args.
   */
  public static void main(String[] argv) {
    UrlHarvestingCrawler crawler = new UrlHarvestingCrawler();
    try {
      crawler.init();
      crawler.setRoot(new Link(new URL("http://www.public-site.com/page")));
      crawler.setDomain(Crawler.SERVER); // reset this since we are interested in siblings
      crawler.setMaxDepth(2); // only crawl 2 levels deep, default 5
      crawler.run();
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      crawler.destroy();
    }
  }
}

Obviously, there can be many more interesting use cases WebSPHINX can be applied to. Also, you can probably do a lot of this stuff using a well crafted wget call or by wrapping a well-crafted wget call (or calls) inside a shell script. However, WebSPHINX offers a neat way to do this with Java, which at least in my opinion, is more maintainable.

A few cautions. If you are going to run a crawler frequently (possibly from your home account), be aware that your ISP may come down hard on you for excessive bandwidth use. But this probably doesn't apply if you are going to use WebSPHINX for the type of one-off situations I have described above.

A few other cautions to keep in mind when actually running this (or any) crawler. Crawling other people's web sites is an intrusion to them. Sure, they put up their sites to be viewed, so in a sense its public property, but they put it up to be viewed by humans, and they usually have advertising and other stuff on the page from which they hope to make some money. Not only is your crawler not helping them do this but it is consuming bandwidth they reserve for their human visitors, so be as nice and non-intrusive as you can. Don't hit them too hard, don't crawl parts of their site they tell you not to, and let them know who you are, so they can get back to you with a cease-and-desist order (and obey the cease-and-desist if they hand you one).

As you have seen from the article, the MyCrawler subclass of the WebSPHINX Crawler applies these checks so my crawls are as well behaved as I know how to make them. Obviously, I am not a crawling guru, so if you have suggestions that will make them better behaved, by all means let me know, and I will be happy to add in your suggestions.

Saturday, March 22, 2008

More Workflow - Events and Asynchronous WorkItems

Last week, I expanded upon my original post where I modeled a workflow as a Petri Net, adding in usability features such as an XML configuration and integration with WorkItems defined in Spring configuration. My goal, however, is to have a network of batch jobs be triggered off in response to a state change in my primary datasource. The only dependencies between the jobs is the order in which they must execute, and the order is specified in the workflow.

Previously, I had methods to traverse the network in a depth-first or breadth-first manner. However, the policy can change depending on the mix of jobs, so an event-driven approach is preferable. Specifically, the caller starts the workflow traversal by calling traverse() on the Workflow object. The Workflow then calls the nearest fireable Transitions (as defined by the Edge weights), which spawns batch jobs in background threads through the mapped AsynchronousWorkItem. Once the batch job is complete, it sends back an event to the WorkflowFactory, which will cause the WorkflowFactory to call workflow.traverse() on the nearest neighbors (reachable Places) of the Transition, and so on, until a stop Place is reached. This is illustrated in the diagram below:

I decided to use the event handling built into Spring. For the event publishers, all I needed to do is have the WorkflowItem be ApplicationContextAware (which exposes the getApplicationContext().publishEvent(Object) method to all its subclasses. The WorkflowFactory is the single event subscriber, and it does this by implementing ApplicationListener which mandates an onApplicationEvent(Object) method. This is off whenever the ApplicationContext has an event published into it.

In the interests of not boring you to death, I reproduce only the code that is relevant to this post. If you have been reading my previous posts on this subject, I had to do some minor refactoring, such as the addition of a stop attribute in the XML to specify the end Place(s) in a network, and the removal of the old Workflow.traverse(boolean) method in favor of the event driven ones. You can probably make these connections yourself. In case you haven't read the previous posts, you probably only care about the event-handling stuff anyway, so the rest is unimportant to you.

The first change is the introduction of the WorkItem layer to insulate the actual job from the Transition. This ensures that the programmer who writes the batch job does not have to worry about whether the job will run synchronously or asynchronously, and in any case, its a common concern, so it should be factored out into one place.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// WorkItem.java
package com.mycompany.myapp.workflow;

import java.util.Map;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public abstract class WorkItem implements ApplicationContextAware {

  private String name;
  private Workflow workflow;
  private String transitionName;
  private IExecutable executable;
  private ApplicationContext applicationContext;

  public WorkItem() {
    super();
  }
  
  public ApplicationContext getApplicationContext() {
    return applicationContext;
  }
  
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public String getTransitionName() {
    return transitionName;
  }

  public void setTransitionName(String transitionName) {
    this.transitionName = transitionName;
  }

  public Workflow getWorkflow() {
    return workflow;
  }

  public void setWorkflow(Workflow workflow) {
    this.workflow = workflow;
  }

  public IExecutable getExecutable() {
    return executable;
  }

  public void setExecutable(IExecutable executable) {
    this.executable = executable;
  }

  public abstract void execute();
}

WorkItem has two implementations, the SynchronousWorkItem (which can be used for testing the workflow itself, among other uses), and the AsynchronousWorkItem (which would probably be the one a client would mostly use).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// SynchronousWorkItem.java
package com.mycompany.myapp.workflow.workitems;

import java.util.Map;

import com.mycompany.myapp.workflow.IExecutable;
import com.mycompany.myapp.workflow.Transition;
import com.mycompany.myapp.workflow.WorkItem;
import com.mycompany.myapp.workflow.Workflow;
import com.mycompany.myapp.workflow.events.TransitionEventSource;
import com.mycompany.myapp.workflow.events.TransitionFailedEvent;
import com.mycompany.myapp.workflow.events.TransitionEvent;

public class SynchronousWorkItem extends WorkItem {

  public SynchronousWorkItem() {
    super();
  }
  
  @Override
  public void execute() {
    Workflow workflow = getWorkflow();
    Transition transition = workflow.getTransitionByName(getTransitionName());
    TransitionEventSource source = new TransitionEventSource();
    source.setWorkflow(workflow);
    source.setCurrentTransition(transition);
    try {
      Map<gString,Boolean> attributes = 
        getExecutable().execute(getWorkflow().getAttributes());
      workflow.setAttributes(attributes);
      source.setWorkflow(workflow);
      source.setTransitionStatus(true);
      getApplicationContext().publishEvent(new TransitionEvent(source));
    } catch (Exception e) {
      source.setTransitionStatus(false);
      getApplicationContext().publishEvent(new TransitionEvent(source));
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// AsynchronousWorkItem.java
package com.mycompany.myapp.workflow.workitems;

import java.util.Map;

import com.mycompany.myapp.workflow.Transition;
import com.mycompany.myapp.workflow.Workflow;
import com.mycompany.myapp.workflow.events.TransitionEventSource;
import com.mycompany.myapp.workflow.events.TransitionFailedEvent;
import com.mycompany.myapp.workflow.events.TransitionEvent;

/**
 * Allows a WorkItem to be fired asynchronously, and send an event back to
 * the Workflow on completion.
 */
public class AsynchronousWorkItem extends SynchronousWorkItem {

  public AsynchronousWorkItem() {
    super();
  }
  
  @Override
  public void execute() {
    Runnable r = new Runnable() {
      public void run() {
        Workflow workflow = getWorkflow();
        Transition transition = workflow.getTransitionByName(getTransitionName());
        TransitionEventSource source = new TransitionEventSource();
        source.setWorkflow(workflow);
        source.setCurrentTransition(transition);
        try {
          Map<String,Boolean> attributes = 
            getExecutable().execute(getWorkflow().getAttributes());
          workflow.setAttributes(attributes);
          source.setWorkflow(workflow);
          source.setTransitionStatus(true);
          getApplicationContext().publishEvent(new TransitionEvent(source));
        } catch (Exception e) {
          source.setTransitionStatus(false);
          getApplicationContext().publishEvent(new TransitionEvent(source));
        }
      }
    };
    new Thread(r).start();
  }
}

A user of the workflow needs to only provide implementations of the IExecutable interface. It has a single method execute(), which takes in the map of workflow attributes, and returns the updated map of workflow attributes. The user must make sure that the correct Javascript variables are updated before returning the attributes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// IExecutable.java
package com.mycompany.myapp.workflow;

import java.util.Map;

/**
 * Generic interface that all WorkItem Executors must implement.
 */
public interface IExecutable {
  public Map<String,Boolean> execute(Map<String,Boolean> attributes) throws Exception;
}

An example IExecutable implementation is the MockExecutable I use for my tests. The functionality is similar to the MockClosure I have been using in previous iterations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// MockExecutable.java
package com.mycompany.myapp.workflow.executables;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

import com.mycompany.myapp.workflow.IExecutable;

/**
 * Mock Executor that just prints a message and updates the _OK variable.
 * This executor could (or not) be used as a model for building executors
 * which wrap existing legacy classes.
 */
public class MockExecutable implements IExecutable {

  private String name;

  public String getName() {
    return name;
  }
  
  public void setName(String name) {
    this.name = name;
  }

  public Map<String,Boolean> execute(Map<String,Boolean> attributes) 
      throws Exception {
    System.out.println("Executing:" + name);
    attributes.put(StringUtils.upperCase(name) + "_OK", Boolean.TRUE);
    return attributes;
  }
}

And here is how I hook all these things into the Spring Application Context. In the interest of brevity, only one definition is shown, the others are similar. The workflow-conf.xml is unchanged as a result of this change.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<beans ..>
  ...
  <bean id="workItem_t01" 
      class="com.mycompany.myapp.workflow.workitems.AsynchronousWorkItem">
    <property name="name" value="t01"/>
    <property name="executable">
      <bean class="com.mycompany.myapp.workflow.executables.MockExecutable">
        <property name="name" value="t01"/>
      </bean>
    </property>
  </bean>
  ...
</beans>

As you can see, both WorkItem implementations take care of extracting the current attributes from the Workflow object, pass it into the mapped IExecutable, retrieve and update the attributes into the Workflow, then call ApplicationContext.publishEvent() to publish the changes. We need an Event class that the WorkflowFactory will listen for (so we don't react to other events Spring may be throwing). We also need an Event source object, which will contain the information to be passed from the Event publisher to the subscriber. Both are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// TransitionEvent.java
package com.mycompany.myapp.workflow.events;

import org.springframework.context.ApplicationEvent;

public class TransitionEvent extends ApplicationEvent {

  private static final long serialVersionUID = 2221611924011056575L;

  public TransitionEvent(Object source) {
    super(source);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// TransitionEventSource.java
package com.mycompany.myapp.workflow.events;

import com.mycompany.myapp.workflow.Transition;
import com.mycompany.myapp.workflow.Workflow;

public class TransitionEventSource {
  
  private Workflow workflow;
  private Transition currentTransition;
  private boolean transitionStatus;
  
  public Workflow getWorkflow() {
    return workflow;
  }
  
  public void setWorkflow(Workflow workflow) {
    this.workflow = workflow;
  }
  
  public Transition getCurrentTransition() {
    return currentTransition;
  }
  
  public void setCurrentTransition(Transition currentTransition) {
    this.currentTransition = currentTransition;
  }
  
  public boolean isTransitionStatus() {
    return transitionStatus;
  }

  public void setTransitionStatus(boolean transitionStatus) {
    this.transitionStatus = transitionStatus;
  }
}

And finally, the listener in the onApplicationEvent() method of WorkflowFactory.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// WorkflowFactory.java
package com.mycompany.myapp.workflow;

public class WorkflowFactory implements BeanFactoryAware, InitializingBean, ApplicationListener {
  ...
  public void onApplicationEvent(ApplicationEvent event) {
    try {
      if (TransitionEvent.class.isInstance(event)) {
        TransitionEventSource source = TransitionEventSource.class.cast(event.getSource());
        if (source.isTransitionStatus()) {
          Workflow workflow = source.getWorkflow();
          Transition transition = source.getCurrentTransition();
          List<Place> reachablePlaces = workflow.getNextReachablePlaces(transition);
          for (Place reachablePlace : reachablePlaces) {
            workflow.traverse(reachablePlace);
          }
        } else {
          Workflow workflow = source.getWorkflow();
          Transition transition = source.getCurrentTransition();
          log.error("Workflow:[" + workflow.getName() + 
            "] failed Transition:[" + transition.getName() + 
            "], current attributes:" + workflow.getAttributes());
        }
      }
    } catch (Exception e) {
      log.error("Error in Event listener", e);
    }
  }
}

Unlike the caller, who calls the no-args version of the traverse() method on the Workflow object, the onApplicationEvent() method will find the nearest reachable neighbors of the workflow given the current transition and the attributes, and call the overloaded version of the traverse() method. These methods are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Workflow.java
package com.mycompany.myapp.workflow;

public class Workflow {

  ...
  public void traverse() throws Exception {
    traverse(places.get(startPlaceName));
  }
  
  public void traverse(Place place) throws Exception {
    if (place.isStopPlace()) {
      return;
    }
    List<Transition> fireableTransitions = getNextFireableTransitions(place);
    for (Transition fireableTransition : fireableTransitions) {
      if (alreadyFiredTransitions.contains(fireableTransition)) {
        continue;
      }
      WorkItem workItem = fireableTransition.getWorkItem();
      workItem.execute();
      alreadyFiredTransitions.add(fireableTransition);
    }
  }
  ...
}

Finally, to tie all this together (and to show how a typical client would call this thing), I have the following JUnit test, which does the event driven traversal.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// WorkflowFactoryTest.java
package com.mycompany.myapp.workflow;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class WorkflowFactoryTest {
  
  private final Log log = LogFactory.getLog(getClass());
  
  private static WorkflowFactory factory;
  private Workflow workflow;

  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    ApplicationContext context = new ClassPathXmlApplicationContext(
      "classpath:applicationContext.xml");
    factory = (WorkflowFactory) context.getBean("workflowFactory");
  }
  
  @Test
  public void testEventDrivenTraversalWithSyncWorkItem() throws Exception {
    workflow = factory.take("myapp-sync-wf");
    log.debug("Event driven workflow traversal with synchronous work items");
    workflow.traverse();
    factory.give("myapp-sync-wf");
  }
  
  @Test
  public void testEventDrivenTraversalWithAsyncItems() throws Exception {
    workflow = factory.take("myapp-async-wf");
    log.debug("Event driven workflow traversal with asynchronous work items");
    workflow.traverse();
    factory.give("myapp-async-wf");
  }
}

In this case, both tests return the same results (similar to the results from depth first traversal in my previous version of this application). However, if I create a SleepyMockExecutable class like so:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// SleepyMockExecutable.java
package com.mycompany.myapp.workflow.executables;

import java.util.Map;

import org.apache.commons.lang.StringUtils;

public class SleepyMockExecutable extends MockExecutable {

  @Override
  public Map<String,Boolean> execute(Map<String,Boolean> attributes) 
      throws Exception {
    System.out.println("Executing:" + getName());
    Thread.sleep(10000L);
    attributes.put(StringUtils.upperCase(getName()) + "_OK", Boolean.TRUE);
    return attributes;
  }
}

and plug this into Transition t23 like so, thus simulating a batch job that runs slower than the other batch jobs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<beans ...>
  ...
  <bean id="async_workItem_t23" 
      class="com.mycompany.myapp.workflow.workitems.AsynchronousWorkItem">
    <property name="name" value="t23"/>
    <property name="executable">
      <bean class="com.mycompany.myapp.workflow.executables.SleepyMockExecutable">
        <property name="name" value="t23"/>
      </bean>
    </property>
  </bean>
  ...
</beans>

The traversal changes as expected. Since the traversal is now event-driven, there is now no need to tell the workflow which traversal algorithm to use, it is dictated by the completion events and the edge weights. The outputs (before and after introducing SleepyMockExecutable into the chain) are shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// all executables are MockExecutable
Executing:t01
Executing:t12
Executing:t23
Executing:t24
Executing:t345
Executing:t16
Executing:t67
Executing:t578
// t23 is a SleepyMockExecutable, all others MockExecutable
Executing:t01
Executing:t12
Executing:t16
Executing:t23
Executing:t24
Executing:t67
Executing:t345
Executing:t578

As I mentioned before, I used the built-in Spring event handling because I was using Spring anyway. In case you not using Spring, a very good alternative is EventBus, that provides an event handling mechanism similar to Spring and is annotation driven as well. Some sample code that I tried using EventBus is shown below, you can adapt it to use in the application above if you want.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// EventBusBasedPublisher.java
package com.mycompany.myapp.events;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bushe.swing.event.EventBus;
import org.bushe.swing.event.annotation.AnnotationProcessor;
import org.bushe.swing.event.annotation.EventPublisher;

@EventPublisher
public class EventBusBasedPublisher {

  private final Log log = LogFactory.getLog(getClass());
  
  private String name;
  private Map<String,Boolean> attributes = new HashMap<String,Boolean>();
  
  public EventBusBasedPublisher() {
    super();
    AnnotationProcessor.process(this);
  }
  
  public void setName(String name) {
    this.name = name;
  }
  
  public void setAttributes(Map<String,Boolean> attributes) {
    this.attributes = attributes;
  }
  
  public void execute() throws Exception {
    log.debug("Executing:" + name);
    attributes.put(name, Boolean.TRUE);
    EventBus.publish(attributes);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// EventBusBasedSubscriber.java
package com.mycompany.myapp.events;

import java.util.Map;

import org.bushe.swing.event.annotation.AnnotationProcessor;
import org.bushe.swing.event.annotation.EventSubscriber;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class EventBusBasedSubscriber {

  private final Log log = LogFactory.getLog(getClass());
  
  public EventBusBasedSubscriber() {
    super();
    AnnotationProcessor.process(this);
  }
  
  @EventSubscriber
  public void onEvent(Map<String,Boolean> attributes) {
    log.debug("Caught an event");
    log.debug(attributes.toString());
  }
}

And to test this, the following code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// EventBusEventsTest.java
package com.mycompany.myapp.events;

import org.bushe.swing.event.EventBus;
import org.junit.Test;

public class EventBusEventsTest {

  @Test
  public void testEvents() throws Exception {
    EventBusBasedSubscriber subscriber = new EventBusBasedSubscriber();
    EventBusBasedPublisher publisher = new EventBusBasedPublisher();
    for (int i = 0; i < 10; i++) {
      publisher.setName("PUB_" + i);
      publisher.execute();
    }
  }
}

So what's next for me in the Workflow world? Surprisingly, not much. I was talking with a colleague, and he expressed (a valid, IMO) concern that I was burning cycles trying to build something for which stable, mature versions are already widely available in the open source world. So figuring out how to adapt one of these to my application would probably make more sense in the long run. I agree with his assessment, and I will do as he suggests. However, it was a fun three weeks, and what I have got out of it is a better understanding of how workflow engines work, and the issues that have to be dealt with. Hopefully this will help me make a good decision in selecting the right workflow engine for my application.

Saturday, March 15, 2008

More Workflow - XML and Spring Integration

Last week, I described how I modeled a Workflow as a Petri Net, an idea I got from the Bossa project. This week, I describe some more improvements that enable this abstraction to be consumed easily by the application it is going to be embedded in. Specifically, they are:

  • XML configuration of workflow(s).
  • Add support multiple workflows in the same application.
  • Address Workflow contention issues.
  • Access work items defined as Spring beans from the workflow.
  • Replace BSF with Java 1.6's built-in scripting.

To do this, I had to refactor the code a bit, so I will provide all of the code once again so it is easy to follow along. There was also a bug in the breadth-first traversal which caused certain transactions to be fired twice, so that is fixed in the new code too.

XML Configuration and Multiple Workflows

Bossa provides a GUI tool that allows you to draw a Petri Net representing a workflow and builds an XML configuration file from it. So if I had used Bossa, I wouldn't have to write this. Starting from XML is definitely a step forward, but it is by no means a panacea. I cannot over-emphasize the importance of drawing the Petri Net first. There are configuration issues that are quite hard to debug by looking at the straight API and the XML, but which are readily apparent when you look at the diagram.

The XML equivalent of the Workflow described in my last post would look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?xml version="1.0" encoding="UTF-8"?>
<workflows>
  <workflow name="myapp-wf1">
    <places>
      <place name="p0" start="true"/>
      <place name="p1"/>
      <place name="p2"/>
      <place name="p3"/>
      <place name="p4"/>
      <place name="p5"/>
      <place name="p6"/>
      <place name="p7"/>
      <place name="p8"/>
    </places>
    <transitions>
      <transition name="t01" workitem="workItem_t01"/>
      <transition name="t12" workitem="workItem_t12"/>
      <transition name="t23" workitem="workItem_t23"/>
      <transition name="t24" workitem="workItem_t24"/>
      <transition name="t345" workitem="workItem_t345"/>
      <transition name="t16" workitem="workItem_t16"/>
      <transition name="t67" workitem="workItem_t67"/>
      <transition name="t578" workitem="workItem_t578"/>
    </transitions>
    <edges>
      <edge transition="t01" place="p0" type="input" weight="1"/>
      <edge transition="t01" place="p1" type="output" weight="T01_OK"/>
      <edge transition="t12" place="p1" type="input" weight="1"/>
      <edge transition="t12" place="p2" type="output" weight="T12_OK"/>
      <edge transition="t23" place="p2" type="input" weight="1"/>
      <edge transition="t23" place="p3" type="output" weight="T23_OK"/>
      <edge transition="t24" place="p2" type="input" weight="1"/>
      <edge transition="t24" place="p4" type="output" weight="T24_OK"/>
      <edge transition="t345" place="p3" type="input" weight="T23_OK && T24_OK"/>
      <edge transition="t345" place="p4" type="input" weight="T23_OK && T24_OK"/>
      <edge transition="t345" place="p5" type="output" weight="T345_OK"/>
      <edge transition="t16" place="p1" type="input" weight="1"/>
      <edge transition="t16" place="p6" type="output" weight="T16_OK"/>
      <edge transition="t67" place="p6" type="input" weight="1"/>
      <edge transition="t67" place="p7" type="output" weight="T67_OK"/>
      <edge transition="t578" place="p5" type="input" weight="T345_OK && T67_OK"/>
      <edge transition="t578" place="p7" type="input" weight="T345_OK && T67_OK"/>
      <edge transition="t578" place="p8" type="output" weight="T578_OK"/>
    </edges>
    <attribs>
      <attrib key="T01_OK" value="false"/>
      <attrib key="T12_OK" value="false"/>
      <attrib key="T23_OK" value="false"/>
      <attrib key="T24_OK" value="false"/>
      <attrib key="T345_OK" value="false"/>
      <attrib key="T16_OK" value="false"/>
      <attrib key="T67_OK" value="false"/>
      <attrib key="T578_OK" value="false"/>
    </attribs>
  </workflow>
</workflows>

As you can see, multiple workflows are now supported from within a single XML configuration file. The XML configuration file is used to build a WorkflowFactory object. It started out as a standard Singleton Factory object, but since I converted this to Spring (see below), I just put the XML parsing code into the afterPropertiesSet() method, which is called by the Spring BeanFactory on startup.

I renamed the PetriNet class to Workflow, since that is what is being represented. A WorkflowFactory is responsible for giving out Workflow objects to the enclosing application.The code for the WorkflowFactory is shown below:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// WorkflowFactory.java
package com.mycompany.myapp.workflow;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;

/**
 * Factory that is initialized by the XML configuration file workflow-conf.xml
 * in the application's classpath. Factory returns Workflow objects.
 */
public class WorkflowFactory implements BeanFactoryAware, InitializingBean {

  private BeanFactory beanFactory;
  private Resource xmlConfig;
  private Map<String,Workflow> workflowMap = null;
  private Map<String,Boolean> taken = null;
  private Map<String,Map<String,Boolean>> originalState = null;

  public void setXmlConfig(Resource xmlConfig) {
    this.xmlConfig = xmlConfig;
  }
  
  public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    this.beanFactory = beanFactory;
  }

  /**
   * Allows a client to take a configured workflow object. Once taken, the
   * Workflow is unavailable to other processes until the client gives it
   * back to the factory.
   * @param name the name of the Workflow to take.
   * @return a Workflow object, or null if the Workflow is unavailable or
   * does not exist. Client must check for null.
   * @throws Exception if one is thrown.
   */
  public Workflow take(String name) throws Exception {
    if (taken.get(name)) {
      // workflow is in use by another process
      return null;
    }
    Workflow workflow = workflowMap.get(name);
    if (workflow != null) {
      taken.put(name, Boolean.TRUE);
      return workflowMap.get(name);
    }
    return null;
  }
  
  /**
   * Allows a client to return a Workflow object back to the factory. The 
   * factory will take care of resetting the state of the Workflow. so it
   * can be used without re-initialization by the next client.
   * @param name the name of the Workflow to return to the factory.
   * @throws Exception if one is thrown.
   */
  public void give(String name) throws Exception {
    Workflow workflow = workflowMap.get(name);
    if (workflow != null) {
      Map<String,Boolean> originalAttributes = originalState.get(name);
      Map<String,Boolean> workflowAttributes = workflow.getAttributes();
      workflowAttributes.clear();
      for (String key : originalAttributes.keySet()) {
        workflowAttributes.put(key, originalAttributes.get(key));
      }
      workflow.setAttributes(workflowAttributes);
      taken.put(name, Boolean.FALSE);
    }
  }

  /**
   * Since this bean has been declared as an InitializingBean to Spring, the
   * Bean Factory will automatically run the afterPropertiesSet() method on
   * startup.
   * @throws Exception if one is thrown.
   */
  public void afterPropertiesSet() throws Exception {
    workflowMap = new HashMap<String,Workflow>();
    taken = new HashMap<String,Boolean>();
    originalState = new HashMap<String,Map<String,Boolean>>();
    SAXBuilder builder = new SAXBuilder();
    Document doc = builder.build(xmlConfig.getInputStream());
    Element workflowsElement = doc.getRootElement();
    List<Element> workflows = workflowsElement.getChildren();
    for (Element workflowElement : workflows) {
      Workflow workflow = new Workflow();
      String workflowName = workflowElement.getAttributeValue("name");
      workflow.setName(workflowName);
      // grab the attribs since we will need them later
      List<Element> attribElements = workflowElement.getChild("attribs").getChildren();
      for (Element attribElement : attribElements) {
        String key = attribElement.getAttributeValue("key");
        String value = attribElement.getAttributeValue("value");
        workflow.setAttributeValue(key, Boolean.valueOf(value));
      }
      workflowMap.put(workflowName, workflow);
      taken.put(workflowName, Boolean.FALSE);
      // make a copy of the original state for repeated use
      Map<String,Boolean> attribCopy = new HashMap<String,Boolean>();
      for (String key : workflow.getAttributes().keySet()) {
        attribCopy.put(key, workflow.getAttributes().get(key));
      }
      originalState.put(workflowName, attribCopy);
      // places
      Map<String,Place> placeMap = new HashMap<String,Place>();
      List<Element> placeElements = workflowElement.getChild("places").getChildren();
      for (Element placeElement : placeElements) {
        String placeName = placeElement.getAttributeValue("name");
        String isStartPlace = placeElement.getAttributeValue("start");
        Place place = new Place(placeName);
        if (isStartPlace != null) {
          workflow.addPlace(place, Boolean.valueOf(isStartPlace));
        } else {
          workflow.addPlace(place);
        }
        placeMap.put(placeName, place);
      }
      // transitions
      Map<String,Transition> transitionMap = new HashMap<String,Transition>();
      List<Element> transitionElements = 
        workflowElement.getChild("transitions").getChildren();
      for (Element transitionElement : transitionElements) {
        String transitionName = transitionElement.getAttributeValue("name");
        String workItemName = transitionElement.getAttributeValue("workitem");
        WorkItem workItem = (WorkItem) beanFactory.getBean(workItemName);
        workItem.setAttribs(workflow.getAttributes());
        Transition transition = new Transition(transitionName, workItem); 
        workflow.addTransition(transition);
        transitionMap.put(transitionName, transition);
      }
      // edges
      List<Element> edgeElements = workflowElement.getChild("edges").getChildren();
      for (Element edgeElement : edgeElements) {
        String transitionName = edgeElement.getAttributeValue("transition");
        String placeName = edgeElement.getAttributeValue("place");
        String type = edgeElement.getAttributeValue("type");
        String weight = edgeElement.getAttributeValue("weight");
        if (type.equals("input")) {
          transitionMap.get(transitionName).addInput(placeMap.get(placeName), weight);
        } else {
          transitionMap.get(transitionName).addOutput(placeMap.get(placeName), weight);
        }
      }
    }
  }
}

Since I also made quite a few changes in the other classes - PetriNet (now called Workflow), Place, Transition and Edge, to accomodate the changes to the application, they are also shown below in their current incarnation.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// Workflow.java
package com.mycompany.myapp.workflow;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Models a workflow as a Petri Net. See the Bossa Manifesto for details about
 * what a Petri Net is and how it can be used to model a workflow. In a nutshell,
 * a Petri Net is a graph of Places and Transitions connected by Edges with
 * numeric weights. In this implementation, edge weights are represented by 0 and
 * 1 (for false and true respectively) or Javascript expressions that evaluate to
 * true or false.
 * @link {http://www.bigbross.com/bossa/overview.shtml}
 */
public class Workflow {

  private String name;
  private Map<String,Place> places = new HashMap<String,Place>();
  private List<Transition> transitions = new ArrayList<Transition>();
  private String initialPlaceName;
  private Map<String,Boolean> attributes = new HashMap<String,Boolean>();

  public Workflow() { super(); }
  
  public void setName(String name) {
    this.name = name;
  }
  
  public String getName() {
    return name;
  }
  
  /**
   * Add a Place object to the Petri net. 
   * @param place the Place to be added.
   * @return the Place that was added.
   */
  public Place addPlace(Place place) {
    places.put(place.getName(), place);
    return place;
  }

  /**
   * Overloaded version of {@see PetriNet#addPlace(Place)} to specify the
   * initial Place object. A Petri net can have only a single start place.
   * This is the Place from which the traversal will be started.
   * @param isStartPlace true if this is the start Place.
   * @return the Place that was added.
   */
  public Place addPlace(Place place, boolean isStartPlace) {
    if (isStartPlace) {
      if (initialPlaceName != null) {
        throw new IllegalArgumentException("Initial Place is already set");
      }
      initialPlaceName = place.getName();
    }
    return addPlace(place);
  }
  
  /**
   * Return a List of Place objects in the Petri Net.
   * @return a List of Place objects.
   */
  public List<Place> getPlaces() {
    return new ArrayList<Place>(places.values());
  }

  /**
   * Add a Transition object to the Petri Net.
   * @param transition the Transition to add.
   * @return
   */
  public Transition addTransition(Transition transition) {
    transitions.add(transition);
    return transition;
  }
  
  /**
   * Returns a List of Transitions mapped into the Petri net.
   * @return a List of all Transition objects.
   */
  public List<Transition> getTransitions() {
    return transitions;
  }

  /**
   * Allows setting initial values for Javascript variables that are reset
   * during the traversal of the Petri net by the Closures mapped to the 
   * Transition objects.
   * @param variable the name of the Javascript variable.
   * @param value the initial value (usually false).
   */
  public void setAttributeValue(String variable, boolean value) {
    attributes.put(variable, value);
  }
  
  /**
   * Sets all the attributes in a single method call.
   * @param attributes the attributes to set.
   */
  public void setAttributes(Map<String,Boolean> attributes) {
    this.attributes = attributes;
  }
  
  /**
   * Returns the Javascript variables and their current values in the Petri Net.
   * @return the Map of Javascript variable names and their current values.
   */
  public Map<String,Boolean> getAttributes() {
    return attributes;
  }

  /**
   * Traverse a Petri Net in either a depth first or breadth first order. Sometimes
   * it may make sense to order the transitions so that the larger jobs get completed
   * first, so this parameter could be used to influence the ordering of the jobs.
   * When a Transition is encountered, the Closure associated with the Transition 
   * will be executed, so a traversal will end up running all the jobs. If you wish
   * to test without actually running any jobs, consider using a MockClosure object.
   * This method delegates to the recursive traverse_r().
   * @param depthFirst true or false. If true, the Petri Net will be traversed depth
   * first, and if false, it will be traversed breadth first.
   * @throws Exception if one is thrown.
   */
  public void traverse(boolean depthFirst) throws Exception {
    if (depthFirst) {
      traverseDfs_r(null);
    } else {
      Set<Transition> firedTransitions = new HashSet<Transition>();
      traverseBfs_r(null, firedTransitions);
    }
  }

  /**
   * Returns a List of Transitions that may be reached from the specified Place
   * on the Petri net. All output Transitions that are immediate neighbors of the
   * specified Place are considered, and reachability is determined by evaluating
   * the weight of the edge separating the Place and the Transition.
   * @param place the Place from which to determine next fireable Transitions.
   * @return a List of Transition objects that can be fired from the Place.
   * @throws Exception if one is thrown.
   */
  protected List<Transition> getNextFireableTransitions(Place place) throws Exception {
    List<Transition> fireableTransitions = new ArrayList<Transition>();
    if (place == null) {
      place = places.get(initialPlaceName);
    }
    List<Transition> outputTransitions = place.getOutputTransitions();
    for (Transition outputTransition : outputTransitions) {
      if (outputTransition.isFireable(attributes)) {
        fireableTransitions.add(outputTransition);
      }
    }
    return fireableTransitions;
  }
  
  /**
   * Returns a List of Places which can be reached from the specified Transition.
   * All output edges of the specified Transition are considered, and reachability
   * is determined by evaluating the weights of the Edge connecting the Transition
   * and the neighboring Place objects.
   * @param transition the Transition from which to determine reachable Places.
   * @return a List of reachable Places from the Transition.
   * @throws Exception if one is thrown.
   */
  protected List<Place> getNextReachablePlaces(Transition transition) throws Exception {
    List<Place> reachablePlaces = new ArrayList<Place>();
    if (transition == null) {
      return reachablePlaces;
    }
    List<Edge> outputEdges = transition.getOutputs();
    for (Edge outputEdge : outputEdges) {
      Place place = outputEdge.getPlace();
      if (transition.canReachTo(place, outputEdge.getEdgeWeightExpr(), attributes)) {
        reachablePlaces.add(place);
      }
    }
    return reachablePlaces;
  }
  
  /**
   * Implements a breadth first traversal of the Petri Net, so that each transition
   * is not fired more than once.
   * @param place the starting Place from which to fire.
   * @param alreadyFiredTransitions a Set of already fired transitions.
   * @throws Exception if one is thrown.
   */
  private void traverseBfs_r(Place place, Set<Transition> alreadyFiredTransitions) 
      throws Exception {
    List<Transition> transitions = getNextFireableTransitions(place);
    if (transitions.size() == 0) {
      return;
    }
    Set<Place> reachablePlaces = new HashSet<Place>();
    for (Transition transition : transitions) {
      if (alreadyFiredTransitions.contains(transition)) {
        continue;
      }
      alreadyFiredTransitions.add(transition);
      WorkItem workItem = transition.getWorkItem();
      workItem.execute();
      reachablePlaces.addAll(getNextReachablePlaces(transition));
    }
    for (Place reachablePlace : reachablePlaces) {
      traverseBfs_r(reachablePlace, alreadyFiredTransitions);
    }
  }
  
  /**
   * Implements a depth first traversal of the Petri Net.
   * @param place the starting Place.
   * @throws Exception if one is thrown.
   */
  private void traverseDfs_r(Place place) throws Exception {
    List<Transition> transitions = getNextFireableTransitions(place);
    if (transitions.size() == 0) {
      return;
    }
    for (Transition transition : transitions) {
      WorkItem workItem = transition.getWorkItem();
      workItem.execute();
      List<Place> reachablePlaces = getNextReachablePlaces(transition);
      for (Place reachablePlace : reachablePlaces) {
        traverseDfs_r(reachablePlace);
      }
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Place.java
package com.mycompany.myapp.workflow;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

/**
 * Models a Place node in a Petri Net.
 */
public class Place {

  private String name;
  private List<gTransition> inputTransitions = new ArrayList<Transition>();
  private List<gTransition> outputTransitions = new ArrayList<Transition>();

  public Place(String name) {
    setName(name);
  }
  
  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public List<gTransition> getInputTransitions() {
    return inputTransitions;
  }

  public void addInputTransition(Transition transition) {
    inputTransitions.add(transition);
  }

  public List<gTransition> getOutputTransitions() {
    return outputTransitions;
  }

  public void addOutputTransition(Transition transition) {
    outputTransitions.add(transition);
  }

  @Override
  public int hashCode() {
    return name.hashCode();
  }
  
  @Override
  public boolean equals(Object obj) {
    if (Place.class.isInstance(obj)) {
      Place that = Place.class.cast(obj);
      return (this.getName().equals(that.getName()));
    }
    return false;
  }
  
  @Override
  public String toString() {
    return ReflectionToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
  }
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Transition.java
package com.mycompany.myapp.workflow;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.commons.lang.math.NumberUtils;

/**
 * Models a Transition node in a Petri Net.
 */
public class Transition {

  private String name;
  private WorkItem workItem;
  private List<Edge> inputs = new ArrayList<Edge>();
  private List<Edge> outputs = new ArrayList<Edge>();
  
  public Transition(String name, WorkItem workItem) {
    this.name = name;
    this.workItem = workItem;
  }

  public String getName() {
    return name;
  }
  
  public WorkItem getWorkItem() {
    return workItem;
  }
  
  public void addInput(Place place, String weightExpr) {
    place.addOutputTransition(this);
    Edge edge = new Edge();
    edge.setPlace(place);
    edge.setEdgeWeightExpr(weightExpr);
    inputs.add(edge);
  }

  public List<Edge> getInputs() {
    return inputs;
  }
  
  public void addOutput(Place place, String weightExpr) {
    place.addInputTransition(this);
    Edge edge = new Edge();
    edge.setPlace(place);
    edge.setEdgeWeightExpr(weightExpr);
    outputs.add(edge);
  }
  
  public List<Edge> getOutputs() {
    return outputs;
  }
  
  public boolean isFireable(Map<String,Boolean> attributes) throws Exception {
    boolean fireable = true;
    for (Edge edge : inputs) {
      String edgeWeightExpr = edge.getEdgeWeightExpr();
      if (NumberUtils.isNumber(edgeWeightExpr)) {
        fireable = (edgeWeightExpr.equals("1") ? true : false);
      } else {
        Boolean canFire = evaluate(attributes, edgeWeightExpr);
        fireable = fireable && canFire;
      }
    }
    return fireable;
  }
  
  public boolean canReachTo(Place place, String weightExpr, Map<String,Boolean> attributes)
      throws Exception {
    if (NumberUtils.isNumber(weightExpr)) {
      return (weightExpr.equals("1") ? true : false);
    } else {
      Boolean canReach = evaluate(attributes, weightExpr);
      return canReach;
    }
  }
  
  @Override
  public int hashCode() {
    return name.hashCode();
  }
  
  @Override
  public boolean equals(Object obj) {
    if (Transition.class.isInstance(obj)) {
      Transition that = Transition.class.cast(obj);
      return (this.getName().equals(that.getName()));
    }
    return false;
  }
  
  @Override
  public String toString() {
    return ReflectionToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
  }
  
  private Boolean evaluate(Map<String,Boolean> attributes, String edgeWeightExpr) 
      throws Exception {
    ScriptEngineManager scriptEngineManager = new ScriptEngineManager();
    ScriptEngine engine = scriptEngineManager.getEngineByName("js");
    for (String key : attributes.keySet()) {
      engine.put(key, attributes.get(key));
    }
    Boolean result = (Boolean) engine.eval(edgeWeightExpr);
    return result;
  }
}

I don't think Edge has changed, but here it is again for completeness.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Edge.java
package com.mycompany.myapp.workflow;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.commons.lang.math.NumberUtils;

/**
 * Models and Edge connecting a Place and Transition in a Petri Net.
 */
public class Edge {

  private Place place;
  private String edgeWeightExpr;
  
  public Place getPlace() {
    return place;
  }
  
  public void setPlace(Place place) {
    this.place = place;
  }
  
  public String getEdgeWeightExpr() {
    return edgeWeightExpr;
  }
  
  public void setEdgeWeightExpr(String edgeWeightExpr) {
    if (NumberUtils.isNumber(edgeWeightExpr)) {
      if (!edgeWeightExpr.equals("1") && !edgeWeightExpr.equals("0")) {
        throw new IllegalArgumentException("Numeric edge weights can only be 0 or 1");
      }
    }
    this.edgeWeightExpr = edgeWeightExpr;
  }
  
  @Override
  public String toString() {
    return ReflectionToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE);
  }
}

Workflow contention issues

Since a Workflow would be "checked out" for a period of time, application code should not be able to access a Workflow while it is being used by another (or the same) component. So it exposes a take() method which will return a null Workflow if it is not found or if it is already "taken", and a give() method which resets the Workflow to its initial state and makes it available to the application when it calls take(). Client code must check for a null result from the take() call.

Replacing BSF with built-in ScriptEngine

The Bossa project used a unreleased version of Apache BSF, so I did the same, pulling in the nightly snapshot when I set it up. Java 1.6 already bundles a scripting framework, so I used that instead. That way users of this code don't have to mess with having to work with an unreleased version of a project. You can see the code in the Transition.evaluate() method.

Spring Integration

Our original design mapped a self-contained piece of code (modeled with a Closure) to each Transition. The Closure would operate on the state of the Workflow represented by its attributes map. This turned out to be too limiting, so we now map a WorkItem object to each Transition. The WorkItem is an abstract class (containing about as much state as our original Closure), but now can be extended as needed by the application.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// WorkItem.java
package com.mycompany.myapp.workflow;

import java.util.Map;

/**
 * Holder class to model a work that happens for a Transition. This class
 * contains the minimal information necessary to execute a piece of work
 * during a transition. Clients are expected to extend this class to meet
 * the requirements of their workflow. 
 */
public abstract class WorkItem {

  private String name;
  private Map<String,Boolean> attribs;

  public WorkItem() {
    ;;
  }
  
  public WorkItem(String name, Map<String,Boolean> attribs) {
    setName(name);
    setAttribs(attribs);
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public Map<String, Boolean> getAttribs() {
    return attribs;
  }

  public void setAttribs(Map<String, Boolean> attribs) {
    this.attribs = attribs;
  }
  
  public void execute() throws Exception {
    // :NOOP:
  }
}

I plan on providing two template extensions of WorkItem, a SynchronousWorkItem and an AsynchronousWorkItem. The SynchronousWorkItem is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// SynchronousWorkItem.java
package com.mycompany.myapp.workflow.workitems;

import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.mycompany.myapp.workflow.WorkItem;

/**
 * A mock closure which is instantiated with the process name, and which prints out
 * the name of the process in its execute() method, and updates the ${processName}_OK 
 * attribute to true to indicate to the Workflow that it completed successfully.
 */
public class SynchronousWorkItem extends WorkItem {

  public SynchronousWorkItem() {
    super();
  }
  
  public SynchronousWorkItem(String name, Map<String,Boolean> attribs) {
    super(name, attribs);
  }

  @Override
  public void execute() throws Exception {
    System.out.println("Executing " + getName());
    String key = StringUtils.upperCase(getName()) + "_OK";
    getAttribs().put(key, Boolean.TRUE);
  }
}

Our applicationContext.xml file now contains the bean definition for WorkflowFactory, and beans that are referenced by id from the workflow-conf.xml file. It is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://www.springframework.org/schema/util 
       http://www.springframework.org/schema/util/spring-util-2.0.xsd">

  <bean 
    class="org.springframework.beans.factory.annotation.RequiredAnnotationBeanPostProcessor"/>

  <bean id="workflowFactory" class="com.mycompany.myapp.workflow.WorkflowFactory">
    <property name="xmlConfig" value="classpath:workflow-conf.xml"/>
  </bean>
  
  <bean id="workItem_t01" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t01"/>
  </bean>

  <bean id="workItem_t12" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t12"/>
  </bean>
  
  <bean id="workItem_t23" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t23"/>
  </bean>
  
  <bean id="workItem_t24" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t24"/>
  </bean>
  
  <bean id="workItem_t345" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t345"/>
  </bean>
  
  <bean id="workItem_t16" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t16"/>
  </bean>
  
  <bean id="workItem_t67" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t67"/>
  </bean>
  
  <bean id="workItem_t578" 
      class="com.mycompany.myapp.workflow.workitems.SynchronousWorkItem">
    <property name="name" value="t578"/>
  </bean>

</beans>

Testing these changes

Our code for testing these changes is a standard JUnit test as before, except it now uses the WorkflowFactory to get instances of a Workflow from the XML configuration. Here it is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// WorkflowFactoryTest.java
package com.mycompany.myapp.workflow;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Test for testing XML configuration of a collection of work flows.
 */
public class WorkflowFactoryTest {
  
  private static WorkflowFactory factory;
  private Workflow workflow;

  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    ApplicationContext context = new ClassPathXmlApplicationContext(
      "classpath:applicationContext.xml");
    factory = (WorkflowFactory) context.getBean("workflowFactory");
  }
  
  @Before
  public void setUp() throws Exception {
    workflow = factory.take("myapp-wf1");
  }

  @After
  public void tearDown() throws Exception {
    factory.give("myapp-wf1");
  }
  
  @Test
  public void testTraversePetriDepthFirst() throws Exception {
    System.out.println("Depth first traversal");
    workflow.traverse(true);
  }

  @Test
  public void testTraversePetriBreadthFirst() throws Exception {
    System.out.println("Breadth first traversal");
    workflow.traverse(false);
  }
}

The output of this test, as expected, is similar to the previous post's test. However, this code reflects the bug fix that checks to see if a process associated with a Transition has already been fired, and if so, does not fire it again.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
Depth first traversal
Executing t01
Executing t12
Executing t23
Executing t24
Executing t345
Executing t16
Executing t67
Executing t578

Breadth first traversal
Executing t01
Executing t12
Executing t16
Executing t67
Executing t23
Executing t24
Executing t345
Executing t578

What's next?

I am working on adding support for asynchronous WorkItem templates, but the code is not quite there yet, so I will defer the discussion of that till next week.