程序员社区

工作流框架引擎之消息事件流详解

自定义扩展

  • BPMN 2.0标准对于各方都是一个好东西
    • 用户不用担心会绑死在供应商提供的专有解决方案上
    • 框架,特别是activiti这样的开源框架,可以提供相同功能,甚至是更好的实现,足以和大的供应商媲美
    • 按照BPMN 2.0标准,从大供应商的解决方案迁移到activiti只要经过一个简单而平滑的过程
  • BPMN 2.0标准不好的一点是
    • 它常常是不同公司之间大量讨论和妥协的结果
    • 作为开发者去阅读流程定义的BPMN 2.0xml时,有时会感觉用这种结构和方法去做事太麻烦了
    • 因此activiti把简化开发作为最优先的事情,使用一些被称为Activiti BPMN扩展的功能,这些扩展是新的结构或方法来简化对应的结构,并不属于BPMN 2.0规范
  • 根据BPMN 2.0标准开发自定义扩展的注意点:

    • 自定义扩展的前提是总有简单的方法转换成标准方法. 所以使用自定义扩展时,可以及时撤销自定义扩展
    • 当使用自定义扩展时 ,总会清楚的指明使用了新的XML元素,属性... 比如会使用activiti:命名空间前缀
    • 扩展的目标是最终加入到下一版本的BPMN规范,或者至少可以引起对特定BPMN结构的讨论

事件

  • 事件用来表明流程的生命周期中发生了什么事. 事件总是画成一个圆圈
  • 在BPMN 2.0中,事件有两大分类:捕获(catching)事件触发(throwing)事件:
    • 捕获(catching): 当流程执行到事件,会等待被触发.触发的类型是由内部图表或XML中的类型声明来决定的.捕获事件与触发事件在显示方面是根据内部图表是否被填充来区分的(白色)
    • 触发(throwing): 当流程执行到事件,会触发一个事件.触发的类型是由内部图表或XML中的类型声明来决定的.触发事件与捕获事件在显示方面是根据内部图表是否被填充来区分的(黑色)

事件定义

  • 事件定义决定了事件的语义. 如果没有事件定义,这个事件就不做什么特别的事情.没有设置事件定义的开始事件不会在启动流程时做任何事情
  • 如果给开始事件添加了一个事件定义(比如定时器事件定义)我们就声明了开始流程的事件类型(这时定时器事件监听器会在某个时间被触发)

定时器事件定义

  • 定时器事件是根据指定的时间触发的事件
  • 定时器事件可以用于开始事件,中间事件和边界事件
  • ==定时器定义元素:==
  • timeDate: 触发事件的时间. 使用ISO8601格式指定的一个确定的时间:
<timerEventDefinition>
    <timeDate>2011-03-11T12:13:14</timeDate>
</timerEventDefinition>
  • timeDuration: 指定定时器之前要等待多长时间. timeDuration可以设置为timerEventDefinition的子元素,使用ISO8601规定的格式
<timerEventDefinition>
    <!--等待10天-->
    <timeDuration>P10D</timeDuration>
</timerEventDefinition>
  • timeCycle: 指定重复执行的间隔. 可以用来定期启动流程实例,或为超时时间发送多个提醒.timeCycle元素可以使用两种格式:

    • ISO8601标准的格式
    • cron表达式
<timerEventDefinition>
    <!--重复3次,每次间隔10小时-->
    <timeCycle>R3/PT10H</timeCycle>
</timerEventDefinition>
  • 从整点开始,每5分钟执行一次:
0 0/5 * * * ?
  • ==注意:==
    • 第一个数字表示秒, 而不是像通常Unix cron中那样表示分钟
    • 重复的时间周期能更好的处理相对时间, 可以计算一些特定的时间点:用户任务的开始时间
    • cron表达式可以处理绝对时间, 这对定时启动事件特别有用
  • 在定时器事件定义中使用表达式,可以通过流程变量来影响那个定时器定义: 流程定义必须包含ISO8601(或者cron)格式的字符串,以匹配对应的时间类型
  <boundaryEvent id="escalationTimer" cancelActivity="true" attachedToRef="firstLineSupport">
     <timerEventDefinition>
        <timeDuration>${duration}</timeDuration>
     </timerEventDefinition>
  </boundaryEvent>

只有启用job执行器之后,定时器才会被触发.activiti.cfg.xml中的jobExecutorActivate需要设置为true, 默认job执行器是关闭的

错误事件定义

  • 错误事件是由指定错误触发的
  • ==注意:==
    • BPMN错误与Java异常完全不一样:
      • BPMN错误事件是为了对业务异常建模
      • Java异常是要用特定方式处理
  • 错误事件定义会引用一个error元素,引用相同error元素的错误事件处理器会捕获这个错误
<endEvent id="myErrorEndEvent">
    <!--引用一个错误声明-->
  <errorEventDefinition errorRef="myError" />
</endEvent>

信号事件定义

  • 信号事件会引用一个已命名的信号
  • 信号全局范围的事件(广播语义).会发送给所有激活的处理器
  • 信号事件定义使用signalEventDefinition元素 .signalRef属性会引用definitions根节点里定义的signal子元素(signalEventDefinition引用相同的signal元素)
<!--流程实例,其中会抛出一个信号,并被中间事件捕获-->
<definitions... >
        <!-- declaration of the signal -->
        <signal id="alertSignal" name="alert" />

        <process id="catchSignal">
                <intermediateThrowEvent id="throwSignalEvent" name="Alert">
                        <!-- signal event definition -->
                        <signalEventDefinition signalRef="alertSignal" />
                </intermediateThrowEvent>
                ...
                <intermediateCatchEvent id="catchSignalEvent" name="On Alert">
                        <!-- signal event definition -->
                        <signalEventDefinition signalRef="alertSignal" />
                </intermediateCatchEvent>
                ...
        </process>
</definitions>
触发信号事件
  • 可以通过bpmn节点由流程实例触发一个信号.也可以通过API触发
  • org.activiti.engine.RuntimeService中的方法可以用来手工触发一个信号:
RuntimeService.signalEventReceived(String signalName);
RuntimeService.signalEventReceived(String signalName, String executionId);
  • signalEventReceived(String signalName): 把信号发送给全局所有订阅的处理(广播语义)
  • signalEventReceived(String signalName, String executionId): 把信号发送给指定的执行
捕获信号事件
  • 信号事件可以被中间信号事件或边界信息事件捕获
查询信号事件的订阅
  • 查询所有订阅特定信号事件的执行
 List<Execution> executions = runtimeService.createExecutionQuery()
      .signalEventSubscriptionName("alert")
      .list();
  • 使用signalEventReceived(String signalName, String executionId) 把信号发送给这些执行
信号事件范围
  • 默认情况下,信号会在流程引擎范围内进行广播: 在一个流程实例中抛出一个信号事件,其他不同流程定义的流程实例都可以监听到这个事件
  • 有时只要在同一个流程实例中响应这个信号事件:流程实例中的同步机制,如果两个或更多活动是互斥的
  • 要想限制信号事件的范围, 可以使用信号事件定义的scope属性:
<signal id="alertSignal" name="alert" activiti:scope"processInstance"/>

默认情况下,scope的属性为global

信号事件实例
  • 不同流程使用信号交互:
  • 流程在保险规则更新或改变时启动.在修改被参与者处理时,会触发一个信号,通知规则改变:

    工作流框架引擎之消息事件流详解插图
    在这里插入图片描述

    这个事件会被所有相关的流程实例捕获

  • 订阅这个事件的流程实例:

    工作流框架引擎之消息事件流详解插图1
    -
  • 信号事件是广播给所有激活的处理器的
  • 在上面的例子中,所有流程实例都会接收到这个事件,这就是我们想要的.
  • 然而,有的情况下并不想要这种广播行为,考虑下面的流程:

    工作流框架引擎之消息事件流详解插图2
    在这里插入图片描述

    上述流程描述的模式activiti并不支持.这种想法是:

    • 执行[do something]任务时出现的错误
    • 被边界错误事件捕获
    • 然后使用信号传播给并发路径上的分支
    • 进而中断[do something inparallel]任务
  • 目前,activiti实际运行的结果与期望一致.信号会传播给边界事件并中断任务.但是,根据信号的广播含义,也会传播给所有其他订阅了信号事件的流程实例.所以,这就不是我们想要的结果
  • ==注意:==
    • 信号事件不会执行任何与特定流程实例的联系
    • 如果只想把一个信息发给指定的流程实例,需要手工关联,再使用 signalEventReceived(String signalName, String executionId) 和对应的查询机制

消息事件定义

  • 消息事件会引用一个命名的消息,每个消息都有名称和内容
  • 消息事件总会直接发送给一个接受者
  • 消息事件定义使用messageEventDefinition元素.messageRef属性引用了definitions根节点下的一个message子元素:
<!--使用两个消息事件的流程例子,开始事件和中间捕获事件分别声明和引用了两个消息事件-->
<definitions id="definitions"
  xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
  xmlns:activiti="http://activiti.org/bpmn"
  targetNamespace="Examples"
  xmlns:tns="Examples">

  <message id="newInvoice" name="newInvoiceMessage" />
  <message id="payment" name="paymentMessage" />

  <process id="invoiceProcess">

    <startEvent id="messageStart" >
        <messageEventDefinition messageRef="newInvoice" />
    </startEvent>
    ...
    <intermediateCatchEvent id="paymentEvt" >
        <messageEventDefinition messageRef="payment" />
    </intermediateCatchEvent>
    ...
  </process>

</definitions>
触发消息事件
  • 作为一个嵌入式的流程引擎,activiti不能真正接收一个消息
    • 这些环境相关,与平台相关的活动:比如连接到JMS(Java消息服务)队列或主题或执行WebService或REST请求. 这个消息的接收是你要在应用或架构的一层实现的,流程引擎则内嵌其中
  • 在应用接收一个消息之后,必须决定如何处理它:
  • 如果消息应该触发启动一个新流程实例,在下面的RuntimeService的两个方法中选择一个执行:
ProcessInstance startProcessInstanceByMessage(String messageName);
ProcessInstance startProcessInstanceByMessage(String messageName, Map<String, Object> processVariables);
ProcessInstance startProcessInstanceByMessage(String messageName, String businessKey, Map<String, Object> processVariables);  

这些方法允许使用对应的消息系统流程实例

  • 如果消息需要被运行中的流程实例处理:
    • 首先要根据消息找到对应的流程实例
    • 然后触发这个等待中的流程
  • RuntimeService提供了可以基于消息事件的订阅来触发流程继续执行:
void messageEventReceived(String messageName, String executionId);
void messageEventReceived(String messageName, String executionId, HashMap<String, Object> processVariables);   
查询消息事件订阅
  • Activiti支持开始消息事件中间消息事件
  • 消息开始事件: 消息事件订阅分配给一个特定的process definition. 这个消息订阅可以使用ProcessDefinitionQuery查询到
ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery()
      .messageEventSubscription("newCallCenterBooking")
      .singleResult();

因为同时只能有一个流程定义关联到消息的订阅点,查询总是返回0或一个结果.如果流程定义更新了,那么只有最新版本的流程定义会订阅到消息事件上

  • 中间捕获消息事件: 消息事件订阅会分配给特定的执行,这个消息事件订阅可以使用ExecutionQuery查询到:
Execution execution = runtimeService.createExecutionQuery()
      .messageEventSubscriptionName("paymentReceived")
      .variableValueEquals("orderId", message.getOrderId())
      .singleResult();

这个查询可以调用对应的查询,通常是流程相关的信息 :最多只能有一个流程实例对应着orderId

消息事件实例
  • 使用两个不同消息启动的流程实例:

    工作流框架引擎之消息事件流详解插图3
    在这里插入图片描述
  • 消息事件可以用于流程需要不同的方式来区分开始事件时,最终会进入同样的路径
赞(0) 打赏
未经允许不得转载:IDEA激活码 » 工作流框架引擎之消息事件流详解

一个分享Java & Python知识的社区