Join Each

Overview

The JoinEach Component is designed to work with the Each Component. It collects multiple incoming messages based on a specified count and outputs them as a single batch when all expected messages have arrived. If some messages do not arrive within a timeout period, it outputs the partial batch along with count information.

Usage

Input Ports

Port
Description

in

Receives individual messages to be aggregated. Each message must contain a correlationId.

count

Specifies the total number of messages expected. Must be provided before aggregation begins.

Output Ports

Port
Description

out

Fires when all expected messages arrive, emitting an array of collected messages.

timeout

Fires when the timeout is reached before all expected messages arrive, providing the partial batch and counts.

Output Data for out Port

Field
Description

items

The array of collected messages.

Output Data for timeout Port

Field
Description

items

The array of messages that arrived before the timeout.

count

The number of expected messages.

arrived

The number of messages that actually arrived.

Instructions

  1. Set Up with Each Component

    • Ensure messages sent to in contain a correlationId matching the one from the Each component.

  2. Define Expected Count

    • Send a message to the count port specifying how many messages the component should wait for.

  3. Processing Messages

    • The component collects messages based on the correlationId.

    • When all expected messages arrive, they are sent as an array to the out port.

    • If not all messages arrive before the timeout, the timeout port emits the partial batch.

Notes

  • Works with Correlation IDs: Ensures messages from the same batch are correctly joined.

  • Timeout Handling: Prevents indefinite waiting if some messages fail to arrive.

  • Batch Processing: Useful for grouping messages into structured outputs for further processing.

Last updated