Sunday, April 20, 2008

Learning to LISP

This is my third attempt at learning LISP. My first attempt was at college as part of my Artificial Intelligence elective course. While I gained enough competence in the language to pass the test, I did not particularly like it, and since I never had to use it after that, I soon forgot about it. My second attempt was a few years ago, fueled by comments on the blogosphere touting its advantages as a great tool for functional programming. Even though I did not have a good use case for it, I felt it was worth learning, if only to open my mind to think in terms of functional programming rather than in pure OO terms. I also came across the free e-book on Practical Common Lisp by Peter Siebel on the Internet, and I liked the book so much I bought myself a paper copy. I did attempt to make my way through the book, trying out the examples on a trial copy of Allegro-CL, but other things came up, so I gave up.

This time around, I did have something of a use-case for learning LISP. My plan was to have LISP scripts associated with workflow actions (see my last post), which would be sent over to the JMS server to be executed. In retrospect, though, I think I could probably have used Jython or Javascript to do the same thing more elegantly with less code. However, its here as a placeholder, in case I want to come back to it later.

I must admit that while I am in total awe of people who actually manage to write applications in LISP, I still don't like the LISP syntax that well. Or more likely I just don't get it, and the dislike could just be a function of unfamiliarity. However, what I do get is:

  • In LISP, code == data, much more so than in other languages.
  • LISP is probably the most parser-friendly language on the planet.
  • It is possible to extend LISP with custom functions and operations using defmacro, in ways not possible in C/C++ or Java.

Anyway, I started to look for a LISP interpreter that I could embed into my Java code and send it LISP scriptlets to execute. I chose JLisp initially, but soon realized I could not code LISP to save my life, so I had to stop and learn some LISP before I started up again.

To start off learning some basic LISP so you can start working with it, I strongly recommend the CUSP, a LISP plugin for Eclipse from Bitfauna. It comes integrated with SBCL, but I was not able to make SBCL work, so I downloaded clisp and used CUSP as a simple LISP editor, testing out my code on a terminal with clisp. I worked through about the first 10 chapters of Peter Siebel's book to get a basic understanding of LISP.

I also looked at Jatha while I was looking at JLisp, and while Jatha is probably the more mature of the two, JLisp seemed to be more suitable for what I was trying to do. For the example, I created a simple LISP "library" function closexec that will take a LISP program of this form:

1
2
(import com.mycompany.myapp.lisp.ext.closexec)
(closexec (setf x 1) (setf y 2) com.mycompany.myapp.lisp.Foo)

And then instantiate the Java class, set the variables, and call execute on it. Something like this:

1
2
3
4
5
Foo = new Foo();
foo.setX(1);
foo.setY(2);
foo.execute();
return foo.getResult();

To do this, I had to create a Library implementation called closexec (note the lowercase class name, this is a JLisp convention for Java based library extensions). Here is the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// closexec.java
package com.mycompany.myapp.lisp.ext;

import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.Map;

import net.thinksquared.jlisp.Library;
import net.thinksquared.jlisp.Lisp;
import net.thinksquared.jlisp.Sexp;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.beanutils.MethodUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class closexec implements Library {

  private final Log log = LogFactory.getLog(getClass());
  
  private Sexp CLOSEXEC;
  
  /**
   * @see net.thinksquared.jlisp.Library#apply(net.thinksquared.jlisp.Sexp, net.thinksquared.jlisp.Sexp)
   */
  public Sexp apply(Sexp function, Sexp args) {
    int result = 0;
    if (function.equals(CLOSEXEC)) {
      String className = null;
      Map<String,String> properties = new LinkedHashMap<String,String>(); 
      for (;;) {
        Sexp head = args.head;
        if (head.equals(args)) {
          // nothing more to extract from list
          break;
        }
        if (! head.at) {
          // not an atom, stack the setters
          Sexp name = head.head.tail.head;
          Sexp value = head.head.tail.tail.head;
          String nameVal = name.pname;
          String valueVal = (value.nmb ? value.pname : value.nval.toString());
          properties.put(nameVal, valueVal);
        } else {
          className = head.toS();
        }
        args = args.tail;
      }
      // now instantiate and run the class
      try {
        Object obj = Class.forName(className).newInstance();
        for (String var : properties.keySet()) {
          BeanUtils.setProperty(obj, var, (String) properties.get(var)); 
        }
        MethodUtils.invokeExactMethod(obj, "execute", null);
        result = Integer.valueOf(BeanUtils.getProperty(obj, "result"));
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
    Lisp lisp = new Lisp();
    return lisp.atom(new BigDecimal(result));
  }

  /**
   * @see net.thinksquared.jlisp.Library#register(java.util.Map, net.thinksquared.jlisp.Lisp)
   */
  @SuppressWarnings("unchecked")
  public void register(Map registry, Lisp lisp) {
    CLOSEXEC = lisp.atom("closexec");
    registry.put(CLOSEXEC, this);
  }
}

And here is the JUnit test to run this code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// LispRunnerTest.java
package com.mycompany.myapp.lisp;

import net.thinksquared.jlisp.Lisp;
import net.thinksquared.jlisp.LispThread;
import net.thinksquared.jlisp.Sexp;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;

public class LispRunnerTest {

  private final Log log = LogFactory.getLog(getClass());
  
  private final String prog = 
    "(import com.mycompany.myapp.lisp.ext.closexec) " +
    "(closexec (setf x 1) (setf y 2) com.mycompany.myapp.lisp.Foo)";
  
  @Test
  public void testRunning() throws Exception {
    Lisp lisp = new Lisp();
    Sexp progSexp = new Sexp(prog);
    if (progSexp.bad()) {
      log.error("Lisp code is not well-formed, please check");
    }
    Sexp[] sexps = lisp.execute(prog);
    // first sexp is the import
    log.debug("result=" + sexps[1].toS());
  }
}

And here is the class that is going to be manipulated. Its deliberately kept very simple, since this is just a test.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Foo.java
package com.mycompany.myapp.lisp;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class Foo {

  private final Log log = LogFactory.getLog(getClass());
  
  private int x;
  private int y;

  private int result;
  
  public void setX(String x) {
    this.x = Integer.valueOf(x);
  }
  
  public void setY(String y) {
    this.y = Integer.valueOf(y);
  }
  
  public int getResult() {
    return result;
  }

  public void execute() {
    log.debug("Executing:" + getClass().getName());
    result = x + y;
  }
}

And the output of running the test looks like this:

1
2
Executing:com.healthline.gyncms.lisp.Foo
result=3

Anyway, as mentioned before, I don't plan on using this. All I need to do is pass in plain text scripts to a remote script runner, which I can do with less effort and with greater elegance with scripting languages that have a tighter integration with Java. With these languages, it is possible to instantiate, set and execute the object directly instead of passing it through the lisp interpreter and executing it with my own code. However, it's another way of doing it, even though it may not be the most optimal.

Saturday, April 12, 2008

OSWorkflow meets JMS

The title of this post is probably unfair, since the authors of OSWorkflow provide a built-in JMSMessage action to do Fire-and-Forget style calls to a JMS Queue or Topic. However, what I describe here takes that integration one step further, by marrying event-based OSWorkflow processing with Asynchronous with Callback style JMS calls using Apache ActiveMQ. My application needs to fire long running batch job which are dependent on each other. Rather than have a human operator fire them off in the right sequence, the idea is to build a workflow that captures these dependencies, then submit them asynchronously to a JMS queue. As each job complete, the JMS listener which executes these jobs on the other end sends callbacks to the workflow, which allow it to fire the next job in the dependency graph.

The workflow is fairly complex, it contains two splits and two corresponding joins, as shown below. See my previous post for a more thorough discussion of the workflow itself. I have reproduced the graph below for completeness below:

There are three main components in here. First the WorkflowRunner, which loads the OSWorkflow configuration for the workflow into memory. It is called once from an external client (probably a web user) to kick off the workflow. Each workflow step that is responsible for executing a batch job is tied to a custom JmsAction which writes a message to the request.topic Topic. At the far end of the Topics is a JmsServer component, which basically delegates off to Java processes representing each individual batch job. Once a job is complete, it writes a message to the response.topic Topic. This message is picked up by the correct JmsAction, which then publishes an event into the Spring ApplicationContext. The WorkflowRunner then picks this event up, and issues calls to process() recursively until the workflow is complete. The diagram below illustrates this flow.

I describe below each of the individual components. First, the OSWorkflow workflow definition file. Each of the steps are mapped to a JmsAction instance. This is the only difference from the previous week, when everything was mapped to a MockAction instance.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE workflow PUBLIC 
  "-//OpenSymphony Group//DTD OSWorkflow 2.8//EN"
  "http://www.opensymphony.com/osworkflow/workflow_2_8.dtd">
<workflow>
  <initial-actions>
    <action id="0" name="start">
      <pre-functions>
        <function type="class">
          <arg name="class.name">com.opensymphony.workflow.util.Caller</arg>
        </function>
        <function type="spring">
          <arg name="bean.name">jmsAction</arg>
          <arg name="action.name">t0</arg>
        </function>
      </pre-functions>
      <results>
        <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
          step="1"/>
      </results>
    </action>
  </initial-actions>
  <steps>
    <step id="1" name="p1">
      <actions>
        <action id="1" name="t1">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="26"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t1</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="26" name="s26">
      <actions>
        <action id="26" name="split26">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" split="26"/>
          </results>
        </action>
      </actions>
    </step>
    <step id="2" name="p2">
      <actions>
        <action id="2" name="t2">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="34"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t2</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="34" name="s34">
      <actions>
        <action id="34" name="split34">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" split="34"/>
          </results>
        </action>
      </actions>
    </step>
    <step id="3" name="p3">
      <actions>
        <action id="3" name="t3">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="43"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t3</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="4" name="p4">
      <actions>
        <action id="4" name="t4">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="43"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t4</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="5" name="p5">
      <actions>
        <action id="5" name="t5">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="75"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t5</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="6" name="p6">
      <actions>
        <action id="6" name="t6">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="7"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t6</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="7" name="p7">
      <actions>
        <action id="7" name="t7">
          <results>
            <unconditional-result old-status="Finished" owner="${caller}" join="75"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t7</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="8" name="p8">
      <actions>
        <action id="8" name="t8">
          <results>
            <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
              step="9"/>
          </results>
          <post-functions>
            <function type="spring">
              <arg name="bean.name">jmsAction</arg>
              <arg name="action.name">t8</arg>
            </function>
          </post-functions>
        </action>
      </actions>
    </step>
    <step id="9" name="stop">
      <actions>
        <action id="9" name="t9" finish="true">
          <results>
            <unconditional-result old-status="Finished" status="Complete" 
              owner="${caller}"/>
          </results>
        </action>
      </actions>
    </step>
  </steps>
  <splits>
    <split id="26">
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="2"/>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="6"/>
    </split>
    <split id="34">
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="3"/>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="4"/>
    </split>
  </splits>
  <joins>
    <join id="43">
      <conditions type="AND">
        <condition type="beanshell">
          <arg name="script"><![CDATA[
            "Finished".equals(jn.getStep(3).getStatus()) &&
            "Finished".equals(jn.getStep(4).getStatus())
          ]]>
          </arg>
        </condition>
      </conditions>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="5"/>
    </join>
    <join id="75">
      <conditions type="AND">
        <condition type="beanshell">
          <arg name="script"><![CDATA[
          "Finished".equals(jn.getStep(5).getStatus()) &&
          "Finished".equals(jn.getStep(7).getStatus())
          ]]>
          </arg>
        </condition>
      </conditions>
      <unconditional-result old-status="Finished" status="Queued" owner="${caller}" 
        step="8"/>
    </join>
  </joins>
</workflow>

Next up is the Spring applicationContext.xml file which ties this all together. It also contains some definitions for the JmsAction and JmsServer beans. This is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://www.springframework.org/schema/util 
       http://www.springframework.org/schema/util/spring-util-2.0.xsd">

  <bean class="org.springframework.beans.factory.annotation.RequiredAnnotationBeanPostProcessor"/>

  <!-- OSWorkflow -->
  <bean id="workflowStore" 
    class="com.opensymphony.workflow.spi.memory.MemoryWorkflowStore"/>
  
  <bean id="workflowFactory" 
      class="com.opensymphony.workflow.spi.hibernate.SpringWorkflowFactory" 
      init-method="init">
    <property name="resource" value="workflow-defs.xml"/>
    <property name="reload" value="true"/>    
  </bean>

  <bean id="workflowConfiguration" 
      class="com.opensymphony.workflow.config.SpringConfiguration">
    <property name="factory" ref="workflowFactory"/>
    <property name="store" ref="workflowStore"/>
  </bean>
  
  <bean id="workflowTypeResolver" class="com.opensymphony.workflow.util.SpringTypeResolver">
    <property name="functions">
      <map>
        <entry key="jmsAction"><ref bean="jmsAction"/></entry>
      </map>
    </property>
  </bean>

  <bean id="workflow" class="com.opensymphony.workflow.basic.BasicWorkflow" 
      scope="prototype">
    <constructor-arg><value>testuser</value></constructor-arg>
    <property name="configuration" ref="workflowConfiguration"/>
    <property name="resolver" ref="workflowTypeResolver"/>
  </bean>
    
  <bean id="workflowRunner" class="com.mycompany.myapp.workflow.WorkflowRunner" 
      scope="prototype">
    <property name="workflow" ref="workflow"/>
  </bean>
  
  <!-- JMS -->
  <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" 
      destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
  </bean>

  <bean id="jmsAction" class="com.mycompany.myapp.workflow.JmsAction" init-method="init" 
      destroy-method="destroy" scope="prototype">
    <property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
  </bean>

  <bean id="jmsServer" class="com.mycompany.myapp.workflow.JmsServer">
    <property name="jmsConnectionFactory" ref="jmsConnectionFactory"/>
  </bean>
    
</beans>

As mentioned above, on the OSWorkflow side, we have a WorkflowRunner component. It implements ApplicationListener so it can respond to callbacks from the JmsAction components. Moving through the workflow takes place in the process() method. The onApplicationEvent() recursively calls the process() method based on events received from the JmsAction instances.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// WorkflowRunner.java
package com.mycompany.myapp.workflow;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;

import com.opensymphony.workflow.Workflow;
import com.opensymphony.workflow.WorkflowException;
import com.opensymphony.workflow.spi.Step;

/**
 * This class is called once by the client code, then the events that are
 * sent back from the Actions in the Workflow will move the workflow forward
 * until it ends.
 */
public class WorkflowRunner implements ApplicationListener {

  private final Log log = LogFactory.getLog(getClass());
  
  private String workflowName;
  private Workflow workflow;
  private Map<String,Object> inputs;
  
  private Set<Integer> alreadyExecuted = new HashSet<Integer>();
  
  @Required
  public void setWorkflow(Workflow workflow) {
    this.workflow = workflow;
  }
  
  public void setWorkflowName(String workflowName) {
    this.workflowName = workflowName;
  }
  
  public void setInputs(Map<String,Object> inputs) {
    this.inputs = inputs;
  }
  
  public long init(int initialActionId) throws Exception {
    long workflowId = workflow.initialize(workflowName, initialActionId, inputs);
    return workflowId;
  }
  
  @SuppressWarnings({"unused","unchecked"})
  public void process(long workflowId) {
    List<Step> currentSteps = workflow.getCurrentSteps(workflowId);
    for (Step currentStep : currentSteps) {
      int[] availableActions = workflow.getAvailableActions(workflowId, inputs);
      for (int availableAction : availableActions) {
        if (alreadyExecuted.contains(availableAction)) {
          continue;
        }
        try {
          alreadyExecuted.add(availableAction);
          System.out.println("Sending action.id=" + availableAction + " to JmsServer");
          workflow.doAction(workflowId, availableAction, inputs);
        } catch (WorkflowException e) {
          log.error("Exception in (workflow,action.id)=(" + workflowName + "," + 
            availableAction + "). Workflow stopped", e);
        }
      }
    }
  }
  
  public void onApplicationEvent(ApplicationEvent event) {
    if (event.getSource() instanceof Long) {
      process((Long) event.getSource());
    }
  }
}

Not too many surprises so far for people who have read my previous blog post. The JmsAction is our bridge between OSWorkflow and JMS, so it has to implement multiple interfaces. The FunctionProvider interface is so it can be injected as a function into the workflow XML configuration. ApplicationContextAware is so it can publish events back into Spring's context where it can be picked up by WorkflowRunner, and MessageListener is so it can listen on JMS events. The code is shown below and is fairly self-explanatory, comments are inlined where I felt more explanation may be helpful.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// JmsAction.java
package com.mycompany.myapp.workflow;

import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;

import com.opensymphony.module.propertyset.PropertySet;
import com.opensymphony.workflow.FunctionProvider;
import com.opensymphony.workflow.WorkflowException;
import com.opensymphony.workflow.spi.WorkflowEntry;

/**
 * Publishes a job request to a JMS queue. Listens for a callback from the
 * JMS consumer module and passes the event back to the WorkflowRunner event
 * listener to move the workflow forward.
 */
public class JmsAction implements FunctionProvider, ApplicationContextAware, MessageListener {

  private final Log log = LogFactory.getLog(getClass());
  
  private ConnectionFactory jmsConnectionFactory;
  private String actionName;
  private ApplicationContext applicationContext;
  
  private Connection connection;
  private Session session;
  private MessageProducer messageProducer;
  private MessageConsumer messageConsumer;

  @Required
  public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
    this.jmsConnectionFactory = jmsConnectionFactory; 
  }

  public void setActionName(String actionName) {
    this.actionName = actionName;
  }

  /**
   * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
   */
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }

  /**
   * @see com.opensymphony.workflow.FunctionProvider#execute(java.util.Map, java.util.Map, com.opensymphony.module.propertyset.PropertySet)
   */
  @SuppressWarnings("unchecked")
  public void execute(Map transientVars, Map args, PropertySet ps) throws WorkflowException {
    setActionName((String) args.get("action.name"));
    WorkflowEntry workflowEntry = (WorkflowEntry) transientVars.get("entry");
    final long workflowId = workflowEntry.getId();
    try {
      MapMessage message = session.createMapMessage();
      message.setString("action.name", actionName);
      message.setString("workflow.id", String.valueOf(workflowId));
      message.setString("topic.name", "request.topic");
      messageProducer.send(message);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Listens for a text message back from the JmsSubscriber module about job
   * completion. The text is formatted, contains Success or Failure, followed
   * by workflowId as a comma-separated list.
   * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
   */
  @SuppressWarnings("serial")
  public void onMessage(Message message) {
    if (MapMessage.class.isInstance(message)) {
      MapMessage mapMessage = MapMessage.class.cast(message);
      try {
        String actionName = mapMessage.getString("action.name");
        if (actionName.equals(this.actionName)) {
          // only respond to events meant for this Action
          String status = mapMessage.getString("status");
          String workflowId = mapMessage.getString("workflow.id");
          if (status.equals("Success")) {
            applicationContext.publishEvent(new ApplicationEvent(new Long(workflowId)) {});
          } else {
            log.error("Action " + actionName + " failed, see server error log for details");
          }
        }
      } catch (JMSException e) {
        throw new RuntimeException(e);
      }
    }
  }
  
  protected void init() throws Exception {
    connection = jmsConnectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic requestTopic = session.createTopic("request.topic");
    Topic responseTopic = session.createTopic("response.topic");
    messageProducer = session.createProducer(requestTopic);
    messageConsumer = session.createConsumer(responseTopic);
    messageConsumer.setMessageListener(this);
    connection.start();
  }
  
  protected void destroy() throws Exception {
    session.close();
    connection.close();
  }
}

I started off using Spring's JmsTemplate because I wanted to learn how to use it, but gave up when I could not find a clean way of registering a JmsAction as a listener. If any of you have used JmsTemplate for beans which are both publishers and subscribers, please let me know. Example code (or links to example code) would be greatly appreciated.

One more thing to note is that unlike the WorkflowRunner, which will have only a single instance per workflow, there will be many instances of JmsAction for a given workflow. We specify that its scope is prototype (built every time it is accessed from the Spring context), and that its lifecycle methods are init() and destroy(), all in the Spring configuration above.

Our final component is the JmsServer. All this does currently is to print that it is "executing" something to stdout. In a real application, it would start another Java batch process and wait for it to complete before sending the callback. The code is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// JmsServer.java
package com.mycompany.myapp.workflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

/**
 * Listens for JMS requests, services them, and returns a callback
 * when the job is completed.
 */
public class JmsServer implements MessageListener {

  private ConnectionFactory jmsConnectionFactory;
  
  private Connection connection;
  private Session session;
  private MessageProducer messageProducer;
  private MessageConsumer messageConsumer;

  public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
    this.jmsConnectionFactory = jmsConnectionFactory;
  }
  
  /**
   * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
   */
  public void onMessage(Message message) {
    if (! MapMessage.class.isInstance(message)) {
      return;
    }
    try {
      MapMessage mapMessage = MapMessage.class.cast(message);
      final String actionName = mapMessage.getString("action.name");
      final String workflowId = mapMessage.getString("workflow.id");
      // this is where we will delegate to some kind of executor in a real app
      System.out.println("Executing job:" + actionName);
      // send the callback after the job is done
      MapMessage responseMessage = session.createMapMessage();
      responseMessage.setString("action.name", actionName);
      responseMessage.setString("workflow.id", workflowId);
      responseMessage.setString("topic.name", "response.topic");
      responseMessage.setString("status", "Success");
      System.out.println("Sending callback from server");
      messageProducer.send(responseMessage);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  }
  
  protected void init() throws Exception {
    System.out.println("Initializing server");
    connection = jmsConnectionFactory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic responseTopic = session.createTopic("request.topic");
    Topic requestTopic = session.createTopic("response.topic");
    messageProducer = session.createProducer(requestTopic);
    messageConsumer = session.createConsumer(responseTopic);
    messageConsumer.setMessageListener(this);
    System.out.println("Server started");
    connection.start();
  }
  
  protected void destroy() throws Exception {
    System.out.println("Shutting down server");
    session.close();
    connection.close();
    System.out.println("Done");
  }
  
  public void run() throws Exception {
    init();
    try {
      for (;;) {
        Thread.sleep(500L);
      }
    } finally {
      destroy();
    }
  }
}

To test this, we start off the following two unit tests which run forever until they are manually stopped by a Ctrl-C at the command line. The reason I had to do this instead of packaging the whole thing into a single JUnit test as I had done before, was because I noticed that the unit test completed before the workflow had a chance to complete. Since the whole thing is event driven after the first call, there is no way to keep the test running until the workflow is complete. In real-life, this is not a problem because these are likely to be long-lived server processes anyway.

This JUnit test starts the JmsServer end of the setup and loops forever, sleeping for 0.5s between loops. I run this in one console window.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// JmsServerTest.java
package com.mycompany.myapp.workflow;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class JmsServerTest {

  private JmsServer jmsServer;
  
  @Before
  public void setUp() throws Exception {
     ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
     jmsServer = (JmsServer) context.getBean("jmsServer");
  }
  
  @Test
  public void runJmsServer() throws Exception {
    jmsServer.run();
  }
}

This JUnit test starts the OSWorkflow end of the setup and after the first call to process(), also loops forever. I run this in a second console window.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// WorkflowRunnerWithJmsActionsTest.java
package com.mycompany.myapp.workflow;

import java.util.HashMap;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class WorkflowRunnerWithJmsActionsTest {

  private WorkflowRunner runner;
  private Map<String,Object> inputs = new HashMap<String,Object>();
  private long workflowId;

  @Before
  public void setUp() throws Exception {
    ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); 
    runner = (WorkflowRunner) context.getBean("workflowRunner");
    runner.setWorkflowName("workflow-def-1");
    runner.setInputs(inputs);
    workflowId = runner.init(0);
  }
  
  @Test
  public void runWorkflow() throws Exception {
    runner.process(workflowId);
    for (;;) {
      Thread.sleep(500L);
    }
  }
}

Running these produces the following output on the client end, and a similar output on the server end. Looking at the output, it is clear that the setup works, ie the output is what you would expect given the dependencies designed into the workflow.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Sending action.id=1 to JmsServer
Sending action.id=26 to JmsServer
Sending action.id=2 to JmsServer
Sending action.id=6 to JmsServer
Sending action.id=34 to JmsServer
Sending action.id=7 to JmsServer
Sending action.id=3 to JmsServer
Sending action.id=4 to JmsServer
Sending action.id=5 to JmsServer
Sending action.id=8 to JmsServer
Sending action.id=9 to JmsServer

A nice tool that helped me debug my code is ActiveMQ's web-based administrator interface. Here you can see how many messages were received and sent on each of the topics. You can delete messages off the queue, which is very helpful after unsuccessful runs. It is available at 0.0.0.0:8161 of the machine where ApacheMQ is started.

So there you have it folks. A simple, asynchronous, event-driven way to manage dependencies among multiple batch jobs.