AsyncAPI Call Task (`call: asyncapi`)
Purpose
Section titled “Purpose”The AsyncAPI Call task enables workflows to interact with message brokers and event-driven services described by an AsyncAPI specification document.
This allows workflows to publish messages to channels or subscribe to messages from channels defined in the AsyncAPI document.
Usage Examples
Section titled “Usage Examples”Example: Publishing a Message
Section titled “Example: Publishing a Message”document: dsl: '1.0.0' # Assuming alpha5 or later based on reference example namespace: test name: asyncapi-publish-example version: '0.1.0'do: - publishGreeting: call: asyncapi with: # Reference to the AsyncAPI document document: endpoint: https://broker.example.com/docs/asyncapi.json operation: sendGreeting server: name: productionBroker variables: environment: prod # Define the message to publish message: payload: greeting: "Hello from workflow ${ $workflow.id }" headers: traceId: "${ $context.traceId }" # Output typically confirms publish success/failure, specifics vary - afterPublish: # ...
Example: Subscribing to Messages
Section titled “Example: Subscribing to Messages”document: dsl: '1.0.0' # Workflow DSL version namespace: test # Namespace for the workflow name: asyncapi-subscribe-example # Name of the workflow version: '0.1.0' # Version of this workflowdo: - subscribeToChatRoom: # Task name for the subscription call: asyncapi # Use the AsyncAPI task type with: # Reference to the AsyncAPI specification document document: endpoint: https://chat.example.com/api/asyncapi.yaml
# Operation ID for subscribing (must match one defined in the AsyncAPI doc) operation: receiveChatMessages
# Specify protocol to select appropriate server protocol: ws # WebSocket protocol
subscription: # Optional: Filter messages based on payload content filter: '${ .roomId == $context.targetRoomId }'
# Define consumption limits consume: amount: 10 # Max 10 messages for: { minutes: 5 } # Or max 5 minutes, whichever comes first
# Process each consumed message foreach: item: msg # Variable name for the current message do: # Log each order update - logUpdate: call: logMessage with: message: "Received order update: ${ .msg.payload.orderId }"
# Conditionally notify shipping if status is SHIPPED - checkStatus: call: notifyShipping if: "${ .msg.payload.status == 'SHIPPED' }"
# Define the output of the foreach loop (and thus the task) output: as: processedCount: "${ count(.) }" # Total messages processed lastOrderId: "${ .[-1]?.msg.payload.orderId }" # Last order ID
- afterSubscription: # ...
Configuration Options
Section titled “Configuration Options”The configuration for an AsyncAPI call is provided within the with
property of the call: asyncapi
task.
with
(Object, Required)
Section titled “with (Object, Required)”document
(Object, Required): Defines the location of the AsyncAPI specification document (JSON or YAML). Contains:endpoint
(Object, Required): Specifies the location withuri
(String | Object, Required) and optionalauthentication
(String | Object).
operation
(String, Required): The operation (publish or subscribe) to invoke, as defined within the AsyncAPIdocument
.server
(Object, Optional): Configuration for connecting to a specific server defined in the AsyncAPI document. If omitted, the runtime selects a suitable server based on the operation andprotocol
. Contains:name
(String, Required): The name of the server (must match a server name defined in the AsyncAPI document under the specified operation/channel).variables
(Object, Optional): A key/value map to override Server Variables defined in the AsyncAPI document for the selected server.
protocol
(String, Optional): The protocol to use, helping select the target server ifserver
is not specified or if multiple servers support the operation. Supported values include:amqp
,amqp1
,anypointmq
,googlepubsub
,http
,ibmmq
,jms
,kafka
,mercure
,mqtt
,mqtt5
,nats
,pulsar
,redis
,sns
,solace
,sqs
,stomp
,ws
.message
(Object, Conditionally Required): Defines the message to be published. Required if theoperation
represents a publish action. Contains details matching the AsyncAPI Message Object, such as:payload
(Any, Optional): The main content/body of the message.headers
(Object, Optional): Application-specific headers for the message.correlationId
(String, Optional): ID used for message correlation.- (Other properties like
contentType
,name
,title
,summary
,description
,tags
,externalDocs
,bindings
,examples
,traits
may be supported depending on runtime capabilities).
subscription
(Object, Conditionally Required): Defines how to subscribe to and consume messages. Required if theoperation
represents a subscribe action. Contains:- `