Использование Camel DSL
Общие сведения
Apache Camel — открытый кроссплатформенный java-фреймворк, который позволяет проводить интеграцию приложений в простой и понятной форме.
Camel использует доменные языки (Domain Specific Language - DSL) для описания проектных шаблонов интеграции или маршрутов.
В ECOS используется Yaml DSL для описания маршрутов в формате YAML и XML DSL.
Camel-контекст – главная сущность Camel. Контекст является контейнером среды выполнения Camel. Контекст предоставляет много полезных сервисов, наиболее значимыми являются маршруты, компоненты, языки, конверторы типов, реестр, endpointы и форматы данных.
Маршрут – определение интеграционного потока Например, для объединения двух систем маршрут определяет как именно эти системы взаимодействуют.
Компоненты - подробно описано по ссылке
Bean – для вызова методов Java-бинов, хранящихся в реестре;
Direct – вызывает другой endpoint из того же контекста синхронно;
Direct VM - вызывает другой endpoint из любого контекста на той же JVM синхронно;
File – читает и записывает файлы;
Timer – генерирует сообщения с определенным интервалом, используя java.util.Timer;
JDBC – предоставляет доступ к базам данных через JDBC;
Jetty – предоставляет endpoint на основе HTTP для получения и отправки HTTP запросов.
Пример контекста с маршрутом:
Примечание
Атрибут = свойство = поле
Целевая БД = БД назначения = целевой источник данных – куда данные помещаются
Исходная БД = исходный источник данных – откуда данные берутся
Выборка из БД
Для выборки данных из БД необходимо:
Создать “Credentials” для подключения:
Главное меню: Инструменты администратора -> Инструменты
Неосновное меню: Интеграция -> Credentials
Создать “Источник данных” DB Data Source, в результате источник будет с типом db.
Главное меню: Инструменты администратора -> Инструменты
Неосновное меню: Интеграция -> Источники данных
Создать “Camel DSL”
Главное меню: Инструменты администратора -> Инструменты
Неосновное меню: Интеграция -> Camel DSL
Контекст Camel DSL должен содержать маршрут выборки из БД. Например:
- route:
from:
uri: "timer:start?delay=-1&repeatCount=1"
steps:
- setBody:
constant: "select * from actions"
- to: "jdbc:datasource"
- split:
simple: "${body}"
steps:
- to: "stream:out"
где
datasource – имя источника данных, созданного в п.2 при его использовании в маршруте нужно добавлять префикс «jdbc:»;
actions – имя таблицы БД, из которой делается выборка;
timer – таймер, который запускает маршрут delay=-1 - немедленно при старте контекста и только один раз repeatCount=1;
блок split разделяет результат выборки на строки, которые выводятся в трассу stream:out
Для выполнения содержимого контекста нужно изменить состояние Camel DSL на Started
Подключение RecordsDaoEndpoint
Для записи данных в RecordsDao в содержании контекста Camel DSL нужно описать RecordsDaoEndpoint
. Для этого до маршрутов описывается секция beans. Например:
- beans:
- name: "recordsDaoEndpoint"
type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
properties:
sourceId: testDao
pkProp: id
columnMap:
name: content
state: currentState
type: type
valueConvertMap: |
{"type": {"*": "YAML"}, "state": {"1":"STARTED", "*": "STOPPED"}}
- route:
from:
uri: "timer:start?delay=-1&repeatCount=1"
steps:
- setBody:
constant: "select * from actions"
- to: "jdbc:datasource"
- split:
simple: "${body}"
steps:
- to: "bean:recordsDaoEndpoint"
Где
recordsDaoEndpoint – имя
RecordsDaoEndpoint
, при его использовании в маршруте нужно добавлять префикс «bean:»;type – класс бина, всегда указывается ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
в секции properties описываются настройки
RecordsDaoEndpoint
:appName - целевой идентификатор приложения, например alfresco:
sourceId - целевой идентификатор источника данных, куда будут помещаться данные. Обязательное свойство;
pkProp – атрибут исходного источника, который является первичным ключом;
columnMap – соответствие атрибутов исходного источника и атрибутов назначения. В приведенном примере значение атрибута name из источника будет перекладываться в атрибут content назначения, state в currentState, type в type. Общий вид карты:
sourcePropName1: targetPropName1 sourcePropName2: targetPropName2 … sourcePropNameN: targetPropNameN чтоБерем: кудаКладем
valueConvertMap – карта преобразований исходных значений перед записью их в БД назначения. Карта пишется в формате JSON, символ „*“ означает любое значение атрибута. В приведенном примере перед записью в атрибут currentState значение поля state будет заменено на STARTED, если оно равно 1, и на STOPPED во всех других случаях. Таким образом, атрибут currentState в результирующей таблице будет содержать только два значения: STARTED или STOPPED. Общий вид карты:
{“sourcePropName1”: {“value1”:”resultValue1”, “value2”:”resultValue2”, … “valueN”:”resultValueN”}, “sourcePropName2”: {“value21”:”resultValue21”, “value22”:”resultValue22”, … “value2N”:”resultValue2N”}, … “sourcePropNameM”: {“valueM1”:”resultValueM1”, “valueM2”:”resultValueM2”, … “valueMN”:”resultValueMN”}}
Так как valueConvertMap многострочное свойство, то перед значением необходимо указать символ «|».
В одном контексте может быть описано несколько RecordsDaoEndpoint
.
- beans:
- name: "recordsTestDaoEndpoint"
type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
properties:
sourceId: recordsTestDao
pkProp: id
- name: "testDaoEndpoint"
type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
properties:
sourceId: testDao
pkProp: id
columnMap:
name: content
state: currentState
type: type
valueConvertMap: |
{"type": {"*": "YAML"}}
- name: "…"
…
RecordsDaoEndpoint
также может обрабатывать данные полученные из XML-файла, CSV-файла или текстового файла, содержащего строковые представления Map.
Пример контекста, содержащего маршруты для обработки RecordsDaoEndpoint
данных из файлов:
- beans:
- name: "recordsDaoEndpoint"
type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
properties:
sourceId: testDao
pkProp: id
columnMap:
name: content
state: currentState
delimiter: ","
- route:
id: "fromXmlFileToDb"
from:
uri: "direct:fromXmlFileToDb"
steps:
- split:
xpath: "//someObject"
steps:
- to: "bean:recordsDaoEndpoint"
- route:
id: "fromTxtFileToDb"
from:
uri: "direct:fromTxtFileToDb"
steps:
- split:
tokenize: "\n"
steps:
- to: "bean:recordsDaoEndpoint"
Маршрут fromXmlFileToDb делит входной XML-поток из файла на элементы someObject и передает их в RecordsDaoEndpoint
.
Пример входного XML-файла:
<?xml version="1.0" encoding="UTF-8"?>
<massages>
<someObject id="50" usage ="Additional">
<name>Test route name James</name>
<purpose>Test endpoint</purpose>
</someObject>
<someObject id="210" usage ="Standard">
<name>Route 61</name>
<purpose>Test</purpose>
<city>Moscow</city>
</someObject>
</massages>
В приведенном примере для установки значений доступны атрибуты записи id, usage, name и purpose.
Маршрут fromTxtFileToDb делит входной текстовый поток из файла на строки. Пример CSV-файла:
id,name,value
10,SomeName,
908,- route:,additional
77,,
Пример файла со строковыми представлениями Map:
id=15, name=Test
id=64, name=Route, value=null
id=48, name=Open route, value=null
Для работы со строковыми данными используются настройки RecordsDaoEndpoint
delimiter и keyValueSeparator.
delimiter – определяет строку-разделитель значений в строке для CSV-файла и пар ключ-значение для строкового представления Map, по умолчанию значение «,»
keyValueSeparator – определяет строку-разделитель ключа и значения в строковом представлении Map, по умолчанию значение «=»
Удаление данных из БД
Для удаления данных из БД необходимо создать Credentials, Источник данных и Camel DSL как указано в пункте «Выборка из БД». При этом, содержимое маршрута должно включать в себя SQL-запрос на удаление данных.
Например, следующий маршрут clearValues удаляет все записи из таблицы simple источника данных datasource, кроме тех у которых атрибут id равен „1“ или „2“.
- route:
id: "clearValues"
from:
uri: "timer:start?delay=-1&repeatCount=1"
steps:
- setBody:
constant: "delete from simple where id not in ('1','2')"
- to: "jdbc:datasource"
Пример контекста, который берет данные из источника данных todb, обрабатывает их через R`RecordsDaoEndpoint`` daoEndpoint и очищает таблицу simple, из которой взял данные:
- beans:
- name: "daoEndpoint"
type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
properties:
sourceId: testDao
pkProp: id
columnMap:
name: content
state: currentState
type: type
- route:
id: "getValues"
from:
uri: "timer:start?delay=-1&repeatCount=1"
steps:
- setBody:
constant: "select * from simple"
- to: "jdbc:todb"
- split:
simple: "${body}"
steps:
- to: "bean:daoEndpoint"
- to: "direct:clearValues"
- route:
id: "clearValues"
from:
uri: "direct:clearValues"
steps:
- setBody:
constant: "delete from simple"
- to: "jdbc:todb"
Примечание
Особенности контекста: Содержимое constant переводится в нижний регистр. Например, выборка «select * from simple order by COMPANY_ID» приводит к ошибке ERROR: column «company_id» does not exist
Получение сообщений из RabbitMQ и отправка события ECOS
Пример чтения из rabbitmq и отправка события ECOS:
Создаем новый секрет для подключения к RMQ
Создаем новый endpoint с id „rabbitmq-endpoint“ (можно любой id, но в camel конфиге мы на него ссылаемся) для подключения к RMQ и устанавливаем секрет из п.1 в него
Заходим в журнал Camel DSL и создаем новый контекст со следующим конфигом:
- beans:
- name: rabbitConnectionFactory
type: org.springframework.amqp.rabbit.connection.CachingConnectionFactory
properties:
uri: '{{ecos-endpoint:rabbitmq-endpoint/url}}'
username: '{{ecos-endpoint:rabbitmq-endpoint/credentials/username}}'
password: '{{ecos-endpoint:rabbitmq-endpoint/credentials/password}}'
- route:
from:
uri: spring-rabbitmq:default # default здесь -это дефолтный exchange в RMQ. Обычно он обозначается пустой строкой, но в camel endpoint'е вместо этого пишется "default"
parameters:
connectionFactory: '#bean:rabbitConnectionFactory'
queues: test-queue
steps:
- removeHeaders: # если в дальнейшем предполагается переотправка сообщения в RMQ, то лучше удалить заголовки, которые относятся к RMQ. Здесь этот этап просто для примера.
pattern: "CamelRabbitmq*" #"CamelRabbitmqRoutingKey"
- to: log:rmq-test # вывод в лог. Можно убрать
- to: ecos-event:test-event-type # отправка события с типом "test-event-type". В теле отправляется DataValue.of(exchange.message.body)
Подписка на событие ECOS
- route:
from:
uri: 'ecos-event:record-created' # подписываемся на событие "Запись создана"
parameters:
attributes:
recordId: 'record?id' # указываем какие атрибуты нам нужны из события
filter: # устанавливаем фильтр
t: not-eq
a: conditionField
v: true
steps:
- to: log:record-was-created
Чтение из RabbitMQ -> роутинг по jsonPath -> переотправка в Ecos Event + Dead Letter Queue
- beans:
- name: myRabbitConnectionFactory
type: org.springframework.amqp.rabbit.connection.CachingConnectionFactory
properties:
uri: '{{ecos-endpoint:my-rabbitmq-endpoint/url}}'
username: '{{ecos-endpoint:my-rabbitmq-endpoint/credentials/username}}'
password: '{{ecos-endpoint:my-rabbitmq-endpoint/credentials/password}}'
- route:
from:
uri: "spring-rabbitmq:income-test-data"
parameters:
connectionFactory: '#bean:myRabbitConnectionFactory'
queues: test-data-queue
autoDeclare: true
deadLetterExchange: income-test-data
deadLetterQueue: test-data-queue-dlq
deadLetterRoutingKey: deadLetterTestData
retryDelay: 5000
arg.queue.durable: true
arg.queue.autoDelete: false
steps:
- to:
uri: "log:income?level=INFO&showAll=true"
- choice:
when:
- jsonpath:
expression: "$.[?(@.operation == 'CREATE')]"
steps:
- to: "ecos-event:test-data-create"
- jsonpath:
expression: "$.[?(@.operation == 'UPDATE')]"
steps:
- to: "ecos-event:test-data-update"
otherwise:
steps:
- throwException:
exceptionType: "java.lang.IllegalArgumentException"
message: "Unsupported operation. Only CREATE and UPDATE are supported."
EcosRecordsSync camel component
EcosRecordsSyncComponent - компонент camel, созданный для перебора/обновления записей через RecordsAPI. Ключ для использования компонента в camel-контексте: ecos-records-sync
Компонент включает в себя как потребителя EcosRecordsSyncConsumer, так и производителя EcosRecordsSyncProducer по терминологии camel
Ниже будут примеры регистрации компонента в yaml формате, например, при регистрации через Camel DSL.
EcosRecordsSyncConsumer. Расширяет стандартный ScheduledBatchPollingConsumer, реализует перебор записей по ecos типу + sourceId. Возможные настройки для ecos-records-sync консьюмера:
Key |
Value |
---|---|
syncId |
уникальное значение в рамках инстанса приложения, на котором запускаются camel контексты. На основе этого значения создается стейт для периодического пуллинга из sourceId
см: journalId=ecos-sync-state
|
syncMode |
DEFAULT | CREATE | UPDATE
DEFAULT, UPDATE - перебор записей по дате обновления
CREATE - перебор записей по дате создания
|
sourceId |
sourceId типа |
typeRef |
ecos тип |
batchSize |
размер батча при пуллинге |
Пример использования:
- route:
from:
uri: ecos-records-sync:testEcosRecordsSync
parameters:
delay: 15000
sourceId: emodel/test-type-mig-from
typeRef: emodel/type@test-type-mig-from
batchSize: 5
steps:
- to: log:ers-test
EcosRecordsSyncProducer Расширяет DefaultProducer, реализует обновление записи через RecordsAPI. Данные для обновления берется из тела сообщения (id из тела из проперти сообщения - CamelEcosRecordsSyncEntityRef). Возможные настройки для ecos-records-sync продюсера:
Key |
Value |
---|---|
syncId |
любое значение, скорее информационное |
sourceId |
sourceId типа |
Пример использования:
- route:
from:
uri: .....
steps:
- to:
uri: ecos-records-sync:test-type-mig-to
parameters:
sourceId: emodel/test-type-mig-to