Skip to Content

Flow Starter

1. Flow Concept

Flow is a more generalized concept for business processes.

Relationship: Flow > BP = Workflow > BPMN

Expression Engine is mainly used to solve data computation problems, implementing a calculation engine such as QLExpress. BPMN scenarios are better suited to workflow or rule engines like Flowable. For example, Flowable integrates JUEL to support simple expression parsing. DMN rule engines are better for true/false decision scenarios, for example, the sequenceFlow routing conditions in BP. Drools focuses more on rule configuration and management.

Example scenarios:

  • Import: better suited to BPMN. Each action is a data update node; actions have an order but can be processed asynchronously.
  • Approval: BPMN + DMN. BPMN handles the approval process and path. DMN handles complex approval routing. DQL solves complex approver queries.

2. Flow Design

Flow Starter provides event-driven process definition and execution.

Supported flow types:

  • automated flow, cron flow, form flow, validation flow, onchange event flow, AI Agent flow

Supported node types:

  • create data, update data, delete data, query data, compute data, decision gateway, generate report, query AI, send message
  • validate data, WebHook, async task, subflow

Supported event types:

  • create, update, delete, changed (create/update/delete), onchange, API, cron, subflow, message
  • button events can be implemented via API events

Supports synchronous and asynchronous flows, and flow versioning.

3. Supported Scenarios

  1. Batch import validation flows and post-import business processing flows
  2. Data validation is synchronous; other business processing is asynchronous
  3. Messages and emails are produced by message nodes in the flow (for example, onboarding email)
  4. Business process configuration and cron tasks

Overview

Flow Starter provides a configurable flow engine for Softa. Flows are defined by FlowConfig, FlowTrigger, FlowNode, and FlowEdge records. A flow can be triggered by ChangeLog events (create/update/delete), API events, cron events, or subflow calls. Flow execution supports synchronous validation (before transaction commit) and asynchronous processing (via MQ).

Dependency

<dependency> <groupId>io.softa</groupId> <artifactId>flow-starter</artifactId> <version>${softa.version}</version> </dependency>

Requirements

  • Database contains flow metadata tables: FlowConfig, FlowTrigger, FlowNode, FlowEdge, FlowStage, FlowEvent, FlowInstance, FlowDebugHistory.
  • Pulsar is required for async flow events and async task execution.
  • ChangeLog and Cron integration are optional. Enable if you use those trigger sources.

Configuration

Enable flow

enable: flow: true

MQ topics

mq: topics: change-log: topic: dev_demo_change_log flow-sub: dev_demo_change_log_flow_sub cron-task: topic: dev_demo_cron_task flow-sub: dev_demo_cron_task_flow_sub flow-async-task: topic: dev_demo_flow_async_task sub: dev_demo_flow_async_task_sub flow-event: topic: dev_demo_flow_event sub: dev_demo_flow_event_sub

Key Concepts

  • FlowConfig: the flow definition. Key fields include name, flowType, sync, rollbackOnFail, debugMode, active.
  • FlowTrigger: defines the event that triggers a flow, including eventType, sourceModel, sourceFields, triggerCondition, and cronId.
  • FlowNode: the execution unit. Each node has nodeType, nodeParams, nodeCondition, and optional exceptionPolicy.
  • FlowEdge: edge between nodes (mainly for layout/visualization).
  • FlowStage: optional grouping for nodes.

Execution Model

Trigger sources:

  • ChangeLog events: generated by ORM change log. Synchronous flows run BEFORE_COMMIT for validation; async flows are sent to MQ.
  • API events: POST /automation/apiEvent triggers by TriggerEventDTO.
  • Cron events: consumed from cron-task MQ and routed by cronId.
  • Subflow events: TriggerSubflow node calls another flow by triggerId.

Trigger conditions:

  • FlowTrigger.triggerCondition is evaluated against the triggerParams map. For ChangeLog events, triggerParams is dataAfterChange for CREATE/UPDATE and dataBeforeChange for DELETE. For API events, triggerParams is TriggerEventDTO.eventParams.
  • For UPDATE events, sourceFields is used to filter triggers. If sourceFields is empty, any update matches.

Sync vs async:

  • FlowConfig.sync = true: executes in-process. If rollbackOnFail = true, the flow is wrapped in a transaction.
  • FlowConfig.sync = false: FlowEventMessage is sent to MQ and executed by FlowEventConsumer.

Important behavior notes:

  • FlowManager loads all FlowTrigger and FlowConfig records once at startup and caches them in memory. Updating flow metadata requires a restart or a manual reload.
  • Validation flows run BEFORE_COMMIT. Avoid mutating data inside these flows.
  • FlowConstant.EXCLUDE_TRIGGER_MODELS excludes FlowInstance and FlowEvent from triggering flows to avoid loops.

Context and Expressions

The flow engine passes a NodeContext across nodes. It includes:

  • FlowEnv variables: NOW, TODAY, YESTERDAY.
  • TriggerParams: the row data that triggered the flow.
  • SourceRowId: the id of the triggering record.

Template expression syntax (all templates use {{ expr }}):

  • Variables and expressions: {{ TriggerParams.id }}, {{ price * qty }}, {{ NOW }}.
  • Reserved field references in filters: {{ @fieldName }}, {{ @parent.fieldName }}.

Node results:

  • Many nodes store their output in NodeContext using the node id as the key. Use {{ <nodeId> }} or {{ <nodeId>.field }} in templates and filters.

Node exception policy:

  • FlowNode.exceptionPolicy supports NodeExceptionPolicy to handle node result exceptions. It can detect empty/false results and emit signals such as EndFlow or ThrowException.

Node Types (Implemented)

  • ValidateData: validate with expression, throws BusinessException on failure.
  • GetData: query data; supports single/multi row, field value(s), exists, count.
  • ComputeData: compute expression result.
  • CreateData, UpdateData, DeleteData: CRUD operations based on templates and filters.
  • ExtractTransform: extract a field from a collection into a Set.
  • Condition: evaluate a condition and emit an exception signal.
  • TriggerSubflow: trigger another flow by triggerId.
  • AsyncTask: send an async task message to MQ.
  • QueryAi: query AI and store the reply content.
  • ReturnData: set the flow return payload.

Node Types (Stubs / No-Op)

These nodes have empty processors and should be implemented to be useful:

  • BranchGateway
  • LoopByDataset
  • LoopByPage
  • TransferStage
  • GenerateReport
  • SendMessage
  • WebHook
  • ApprovalNode

Note: FlowNodeService contains loop iteration logic for LoopByDataset and LoopByPage, but the starter does not provide NodeProcessor implementations for them. Add custom processors to make them executable.

4. Flow Key Elements

The following parameters are aligned with the current code. Field values support:

  • constants
  • expressions {{ expr }} (variables, map access, computed values), e.g. {{ TriggerParams.status }}, {{ price * qty }}, {{ NOW }}
  • reserved field references in Filters: {{ @fieldName }}, {{ @parent.fieldName }}

NodeGetDataType options:

  • MultiRows, SingleRow, OneFieldValue, OneFieldValues, Exist, Count

ValueType options:

  • String, Integer, Long, Double, BigDecimal, Boolean, Date, DateTime, Time
Node TypeParams ObjectParamRequiredDescription
Compute DataComputeDataParamsexpressionRequiredComputation expression, multi-line text
valueTypeRequiredResult value type, ValueType
Create DataCreateDataParamsmodelNameRequiredModel name to create
rowTemplateRequiredData template; values support constants and {{ expr }}
Delete DataDeleteDataParamsmodelNameRequiredModel name to delete
pkVariableEitherPK variable name, single/multi {{ var }}
filtersComposite filters; values support constants, {{ expr }}, {{ @fieldName }}
Get DataGetDataParamsmodelNameRequiredModel name to query
getDataTypeRequiredReturn type, NodeGetDataType
fieldsField list; empty defaults to id only
filtersComposite filters; values support constants, {{ expr }}, {{ @fieldName }}
ordersOrder conditions, Orders array
acrossTimelineWhether to query across timeline
limitSizeMax rows, up to 10000
Update DataUpdateDataParamsmodelNameRequiredModel name to update
pkVariableEitherPK variable name, single/multi {{ var }}
filtersComposite filters; values support constants, {{ expr }}, {{ @fieldName }}
rowTemplateRequiredData template; values support constants and {{ expr }}
Validate DataValidateDataParamsexpressionRequiredValidation expression, multi-line text
exceptionMsgRequiredException message on failure, supports {{ expr }}
Async TaskAsyncTaskParamsasyncTaskHandlerCodeRequiredAsync task handler
dataTemplateTask params template; values support constants and {{ expr }}
Extract TransformExtractTransformParamscollectionVariableRequiredCollection variable, {{ var }}
itemKeyRequiredField name to extract
Return DataReturnDataParamsdataTemplateRequiredReturn data template; values support constants and {{ expr }}
ConditionConditionParamspassConditionRequiredCondition expression
exceptionSignalRequiredNodeExceptionSignal
exceptionMessageException message, supports {{ expr }}
Branch GatewayBranchGatewayParamsserialGatewaySerial gateway flag, default parallel
Query AIQueryAiParamsrobotIdRequired
conversationIdRequired
queryContentRequiredSupports {{ expr }} interpolation
Trigger SubflowTriggerSubflowParamssubflowTriggerIdRequiredSubflow trigger ID
dataTemplateSubflow params template
WebHookWebHookParamsNo params yet
Generate ReportGenerateReportParamsNo params yet
Send MessageSendMessageParamsmessageRequiredMessage content
recipientRequiredRecipient
senderRequiredSender
Transfer StageTransferStageParamsNo params yet

REST APIs

  • POST /automation/apiEvent
  • POST /automation/onchange (currently returns empty Map)
  • POST /automation/simulateEvent (non-prod only)
  • GET /FlowConfig/getByModel
  • GET /FlowConfig/getFlowById

Entity CRUD endpoints are also available for FlowConfig, FlowTrigger, FlowNode, FlowEdge, FlowStage, FlowInstance, FlowEvent, and FlowDebugHistory via EntityController.

Examples

API event request body:

{ "sourceModel": "Order", "sourceRowId": 1001, "triggerId": 2001, "eventParams": { "id": 1001, "status": "PAID", "totalAmount": 199.99, "updatedBy": "system" } }

API event:

curl -X POST http://localhost:8080/automation/apiEvent \ -H 'Content-Type: application/json' \ -d @- <<'JSON' { "sourceModel": "Order", "sourceRowId": 1001, "triggerId": 2001, "eventParams": { "id": 1001, "status": "PAID", "totalAmount": 199.99, "updatedBy": "system" } } JSON

Simulate flow event request body (non-prod only):

{ "flowId": 3001, "flowNodeId": null, "rollbackOnFail": true, "triggerId": 2001, "sourceModel": "Order", "sourceRowId": 1001, "triggerParams": { "id": 1001, "status": "PAID", "totalAmount": 199.99 } }

Simulate flow event (non-prod only):

curl -X POST http://localhost:8080/automation/simulateEvent \ -H 'Content-Type: application/json' \ -d @- <<'JSON' { "flowId": 3001, "flowNodeId": null, "rollbackOnFail": true, "triggerId": 2001, "sourceModel": "Order", "sourceRowId": 1001, "triggerParams": { "id": 1001, "status": "PAID", "totalAmount": 199.99 } } JSON

Node parameters reference (common nodeParams templates):

{ "validateData": { "expression": "TriggerParams.totalAmount > 0", "exceptionMsg": "totalAmount must be greater than 0 for order {{ TriggerParams.id }}" }, "getData": { "modelName": "Order", "getDataType": "MultiRows", "fields": ["id", "status", "totalAmount"], "filters": ["status", "=", "PENDING"], "orders": ["createdTime", "DESC"], "limitSize": 100 }, "extractTransform": { "collectionVariable": "{{ 101 }}", "itemKey": "id" }, "computeData": { "expression": "1 + 2", "valueType": "Integer" }, "updateData": { "modelName": "Order", "pkVariable": "{{ 102 }}", "rowTemplate": { "status": "PROCESSING", "updatedAt": "{{ NOW }}" } }, "deleteData": { "modelName": "Order", "filters": ["status", "=", "CANCELLED"] }, "condition": { "passCondition": "TriggerParams.status == 'PAID'", "exceptionSignal": "EndFlow", "exceptionMessage": "status is not PAID, flow ended" }, "returnData": { "dataTemplate": { "orderId": "{{ TriggerParams.id }}", "status": "{{ TriggerParams.status }}" } }, "asyncTask": { "asyncTaskHandlerCode": "OrderNotify", "dataTemplate": { "orderId": "{{ TriggerParams.id }}", "status": "{{ TriggerParams.status }}" } }, "triggerSubflow": { "subflowTriggerId": 4001, "dataTemplate": { "orderId": "{{ TriggerParams.id }}", "totalAmount": "{{ TriggerParams.totalAmount }}" } }, "loopByDataset": { "dataSetParam": "{{ 101 }}", "loopItemNaming": "orderItem" }, "loopByPage": { "model": "Order", "fields": ["id", "status"], "filters": ["status", "=", "PENDING"], "pageSize": 50, "pageParamNaming": "pageRows" }, "queryAi": { "robotId": 1, "conversationId": 1, "queryContent": "Summarize order {{ TriggerParams.id }}" } }
Last updated on