Reading from RabbitMQ -> routing by jsonPath -> forwarding to Citeck Event + Dead Letter Queue
The example demonstrates a typical integration scenario via RabbitMQ:
Connecting to a RabbitMQ queue using credentials from an ECOS Endpoint.
Logging the incoming message.
Routing the message based on the
operationfield value in the JSON body:CREATE→ publishes thetest-data-createevent,UPDATE→ publishes thetest-data-updateevent,other values → throws an exception (the message is sent to the Dead Letter Queue).
Note
For the example to work, create an ECOS Endpoint named my-rabbitmq-endpoint containing url, credentials/username, and credentials/password.
- beans:
# Бин подключения к RabbitMQ на основе данных из ECOS Endpoint
- name: myRabbitConnectionFactory
type: org.springframework.amqp.rabbit.connection.CachingConnectionFactory
properties:
uri: '{{ecos-endpoint:my-rabbitmq-endpoint/url}}'
username: '{{ecos-endpoint:my-rabbitmq-endpoint/credentials/username}}'
password: '{{ecos-endpoint:my-rabbitmq-endpoint/credentials/password}}'
- route:
from:
uri: "spring-rabbitmq:income-test-data"
parameters:
connectionFactory: '#bean:myRabbitConnectionFactory'
queues: test-data-queue
autoDeclare: true
# Dead Letter Queue — принимает сообщения, которые не удалось обработать
deadLetterExchange: income-test-data
deadLetterQueue: test-data-queue-dlq
deadLetterRoutingKey: deadLetterTestData
# Задержка перед повторной попыткой обработки (мс)
retryDelay: 5000
arg.queue.durable: true
arg.queue.autoDelete: false
steps:
# Логирование входящего сообщения
- to:
uri: "log:income?level=INFO&showAll=true"
# Роутинг по полю operation в JSON
- choice:
when:
- jsonpath:
expression: "$.[?(@.operation == 'CREATE')]"
steps:
- to: "ecos-event:test-data-create"
- jsonpath:
expression: "$.[?(@.operation == 'UPDATE')]"
steps:
- to: "ecos-event:test-data-update"
otherwise:
steps:
# Неизвестное значение operation — исключение отправит сообщение в DLQ
- throwException:
exceptionType: "java.lang.IllegalArgumentException"
message: "Unsupported operation. Only CREATE and UPDATE are supported."