Использование Camel DSL

Общие сведения

Apache Camel — открытый кроссплатформенный java-фреймворк, который позволяет проводить интеграцию приложений в простой и понятной форме.

Camel использует доменные языки (Domain Specific Language - DSL) для описания проектных шаблонов интеграции или маршрутов.

В ECOS используется Yaml DSL для описания маршрутов в формате YAML и XML DSL.

Camel-контекст – главная сущность Camel. Контекст является контейнером среды выполнения Camel. Контекст предоставляет много полезных сервисов, наиболее значимыми являются маршруты, компоненты, языки, конверторы типов, реестр, endpointы и форматы данных.

Маршрут – определение интеграционного потока Например, для объединения двух систем маршрут определяет как именно эти системы взаимодействуют.

Компоненты - подробно описано по ссылке

  • Bean – для вызова методов Java-бинов, хранящихся в реестре;

  • Direct – вызывает другой endpoint из того же контекста синхронно;

  • Direct VM - вызывает другой endpoint из любого контекста на той же JVM синхронно;

  • File – читает и записывает файлы;

  • Timer – генерирует сообщения с определенным интервалом, используя java.util.Timer;

  • JDBC – предоставляет доступ к базам данных через JDBC;

  • Jetty – предоставляет endpoint на основе HTTP для получения и отправки HTTP запросов.

Пример контекста с маршрутом:

../_images/Camel_1.png

Примечание

Атрибут = свойство = поле

Целевая БД = БД назначения = целевой источник данных – куда данные помещаются

Исходная БД = исходный источник данных – откуда данные берутся

Выборка из БД

Для выборки данных из БД необходимо:

  1. Создать “Credentials” для подключения:

Главное меню: Инструменты администратора -> Инструменты

../_images/Camel_2.png

Неосновное меню: Интеграция -> Credentials

../_images/Camel_3.png
  1. Создать “Источник данных” DB Data Source, в результате источник будет с типом db.

Главное меню: Инструменты администратора -> Инструменты

Неосновное меню: Интеграция -> Источники данных

../_images/Camel_4.png ../_images/Camel_5.png
  1. Создать “Camel DSL”

Главное меню: Инструменты администратора -> Инструменты

Неосновное меню: Интеграция -> Camel DSL

../_images/Camel_6.png

Контекст Camel DSL должен содержать маршрут выборки из БД. Например:

- route:
    from:
      uri: "timer:start?delay=-1&repeatCount=1"
      steps:
        - setBody:
            constant: "select * from actions"
        - to: "jdbc:datasource"
        - split:
            simple: "${body}"
            steps:
              - to: "stream:out"

где

  • datasource – имя источника данных, созданного в п.2 при его использовании в маршруте нужно добавлять префикс «jdbc:»;

  • actions – имя таблицы БД, из которой делается выборка;

  • timer – таймер, который запускает маршрут delay=-1 - немедленно при старте контекста и только один раз repeatCount=1;

  • блок split разделяет результат выборки на строки, которые выводятся в трассу stream:out

  1. Для выполнения содержимого контекста нужно изменить состояние Camel DSL на Started

Подключение RecordsDaoEndpoint

Для записи данных в RecordsDao в содержании контекста Camel DSL нужно описать RecordsDaoEndpoint. Для этого до маршрутов описывается секция beans. Например:

- beans:
    - name: "recordsDaoEndpoint"
      type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
      properties:
        sourceId: testDao
        pkProp: id
        columnMap:
        name: content
        state: currentState
        type: type
        valueConvertMap: |
          {"type": {"*": "YAML"}, "state": {"1":"STARTED", "*": "STOPPED"}}
- route:
    from:
      uri: "timer:start?delay=-1&repeatCount=1"
      steps:
        - setBody:
            constant: "select * from actions"
        - to: "jdbc:datasource"
        - split:
            simple: "${body}"
            steps:
              - to: "bean:recordsDaoEndpoint"

Где

  • recordsDaoEndpoint – имя RecordsDaoEndpoint, при его использовании в маршруте нужно добавлять префикс «bean:»;

  • type – класс бина, всегда указывается ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint

  • в секции properties описываются настройки RecordsDaoEndpoint:

  • appName - целевой идентификатор приложения, например alfresco:

  • sourceId - целевой идентификатор источника данных, куда будут помещаться данные. Обязательное свойство;

  • pkProp – атрибут исходного источника, который является первичным ключом;

  • columnMap – соответствие атрибутов исходного источника и атрибутов назначения. В приведенном примере значение атрибута name из источника будет перекладываться в атрибут content назначения, state в currentState, type в type. Общий вид карты:

sourcePropName1: targetPropName1
sourcePropName2: targetPropName2
…
sourcePropNameN: targetPropNameN
чтоБерем: кудаКладем
  • valueConvertMap – карта преобразований исходных значений перед записью их в БД назначения. Карта пишется в формате JSON, символ „*“ означает любое значение атрибута. В приведенном примере перед записью в атрибут currentState значение поля state будет заменено на STARTED, если оно равно 1, и на STOPPED во всех других случаях. Таким образом, атрибут currentState в результирующей таблице будет содержать только два значения: STARTED или STOPPED. Общий вид карты:

    {“sourcePropName1”:
    {“value1”:”resultValue1”,
        “value2”:”resultValue2”,
        …
        “valueN”:”resultValueN”},
    “sourcePropName2”:
    {“value21”:”resultValue21”,
        “value22”:”resultValue22”,
        …
        “value2N”:”resultValue2N”},
    …
    “sourcePropNameM”:
    {“valueM1”:”resultValueM1”,
        “valueM2”:”resultValueM2”,
        …
        “valueMN”:”resultValueMN”}}
    

Так как valueConvertMap многострочное свойство, то перед значением необходимо указать символ «|».

В одном контексте может быть описано несколько RecordsDaoEndpoint.

- beans:
  - name: "recordsTestDaoEndpoint"
    type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
    properties:
      sourceId: recordsTestDao
      pkProp: id
  - name: "testDaoEndpoint"
    type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
    properties:
      sourceId: testDao
      pkProp: id
      columnMap:
      name: content
      state: currentState
      type: type
      valueConvertMap: |
        {"type": {"*": "YAML"}}
  - name: "…"
    

RecordsDaoEndpoint также может обрабатывать данные полученные из XML-файла, CSV-файла или текстового файла, содержащего строковые представления Map.

Пример контекста, содержащего маршруты для обработки RecordsDaoEndpoint данных из файлов:

- beans:
    - name: "recordsDaoEndpoint"
      type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
      properties:
        sourceId: testDao
        pkProp: id
        columnMap:
          name: content
          state: currentState
        delimiter: ","
- route:
    id: "fromXmlFileToDb"
    from:
      uri: "direct:fromXmlFileToDb"
      steps:
        - split:
            xpath: "//someObject"
            steps:
              - to: "bean:recordsDaoEndpoint"
- route:
    id: "fromTxtFileToDb"
    from:
      uri: "direct:fromTxtFileToDb"
      steps:
        - split:
            tokenize: "\n"
            steps:
              - to: "bean:recordsDaoEndpoint"

Маршрут fromXmlFileToDb делит входной XML-поток из файла на элементы someObject и передает их в RecordsDaoEndpoint.

Пример входного XML-файла:

<?xml version="1.0" encoding="UTF-8"?>
 <massages>
   <someObject id="50" usage ="Additional">
     <name>Test route name James</name>
     <purpose>Test endpoint</purpose>
   </someObject>
   <someObject id="210" usage ="Standard">
     <name>Route 61</name>
     <purpose>Test</purpose>
     <city>Moscow</city>
   </someObject>
 </massages>

В приведенном примере для установки значений доступны атрибуты записи id, usage, name и purpose.

Маршрут fromTxtFileToDb делит входной текстовый поток из файла на строки. Пример CSV-файла:

id,name,value
10,SomeName,
908,- route:,additional
77,,

Пример файла со строковыми представлениями Map:

id=15, name=Test
id=64, name=Route, value=null
id=48, name=Open route, value=null

Для работы со строковыми данными используются настройки RecordsDaoEndpoint delimiter и keyValueSeparator.

  • delimiter – определяет строку-разделитель значений в строке для CSV-файла и пар ключ-значение для строкового представления Map, по умолчанию значение «,»

  • keyValueSeparator – определяет строку-разделитель ключа и значения в строковом представлении Map, по умолчанию значение «=»

Удаление данных из БД

Для удаления данных из БД необходимо создать Credentials, Источник данных и Camel DSL как указано в пункте «Выборка из БД». При этом, содержимое маршрута должно включать в себя SQL-запрос на удаление данных.

Например, следующий маршрут clearValues удаляет все записи из таблицы simple источника данных datasource, кроме тех у которых атрибут id равен „1“ или „2“.

- route:
    id: "clearValues"
    from:
      uri: "timer:start?delay=-1&repeatCount=1"
      steps:
        - setBody:
            constant: "delete from simple where id not in ('1','2')"
        - to: "jdbc:datasource"

Пример контекста, который берет данные из источника данных todb, обрабатывает их через R`RecordsDaoEndpoint`` daoEndpoint и очищает таблицу simple, из которой взял данные:

- beans:
    - name: "daoEndpoint"
      type: ru.citeck.ecos.integrations.domain.cameldsl.service.RecordsDaoEndpoint
      properties:
        sourceId: testDao
        pkProp: id
        columnMap:
          name: content
          state: currentState
          type: type
- route:
    id: "getValues"
    from:
      uri: "timer:start?delay=-1&repeatCount=1"
      steps:
        - setBody:
            constant: "select * from simple"
        - to: "jdbc:todb"
        - split:
            simple: "${body}"
            steps:
              - to: "bean:daoEndpoint"
              - to: "direct:clearValues"
- route:
  id: "clearValues"
  from:
    uri: "direct:clearValues"
    steps:
      - setBody:
          constant: "delete from simple"
      - to: "jdbc:todb"

Примечание

Особенности контекста: Содержимое constant переводится в нижний регистр. Например, выборка «select * from simple order by COMPANY_ID» приводит к ошибке ERROR: column «company_id» does not exist

Получение сообщений из RabbitMQ и отправка события ECOS

Пример чтения из rabbitmq и отправка события ECOS:

  1. Создаем новый секрет для подключения к RMQ

  2. Создаем новый endpoint с id „rabbitmq-endpoint“ (можно любой id, но в camel конфиге мы на него ссылаемся) для подключения к RMQ и устанавливаем секрет из п.1 в него

  3. Заходим в журнал Camel DSL и создаем новый контекст со следующим конфигом:

- beans:
    - name: rabbitConnectionFactory
      type: org.springframework.amqp.rabbit.connection.CachingConnectionFactory
      properties:
        uri: '{{ecos-endpoint:rabbitmq-endpoint/url}}'
        username: '{{ecos-endpoint:rabbitmq-endpoint/credentials/username}}'
        password: '{{ecos-endpoint:rabbitmq-endpoint/credentials/password}}'
- route:
    from:
      uri: spring-rabbitmq:default # default здесь -это дефолтный exchange в RMQ. Обычно он обозначается пустой строкой, но в camel endpoint'е вместо этого пишется "default"
      parameters:
        connectionFactory: '#bean:rabbitConnectionFactory'
        queues: test-queue
      steps:
        - removeHeaders: # если в дальнейшем предполагается переотправка сообщения в RMQ, то лучше удалить заголовки, которые относятся к RMQ. Здесь этот этап просто для примера.
            pattern: "CamelRabbitmq*" #"CamelRabbitmqRoutingKey"
        - to: log:rmq-test # вывод в лог. Можно убрать
        - to: ecos-event:test-event-type # отправка события с типом "test-event-type". В теле отправляется DataValue.of(exchange.message.body)

Подписка на событие ECOS

- route:
    from:
      uri: 'ecos-event:record-created' # подписываемся на событие "Запись создана"
      parameters:
        attributes:
          recordId: 'record?id' # указываем какие атрибуты нам нужны из события
        filter: # устанавливаем фильтр
          t: not-eq
          a: conditionField
          v: true
      steps:
        - to: log:record-was-created

Чтение из RabbitMQ -> роутинг по jsonPath -> переотправка в Ecos Event + Dead Letter Queue

- beans:
    - name: myRabbitConnectionFactory
      type: org.springframework.amqp.rabbit.connection.CachingConnectionFactory
      properties:
        uri: '{{ecos-endpoint:my-rabbitmq-endpoint/url}}'
        username: '{{ecos-endpoint:my-rabbitmq-endpoint/credentials/username}}'
        password: '{{ecos-endpoint:my-rabbitmq-endpoint/credentials/password}}'
- route:
    from:
      uri: "spring-rabbitmq:income-test-data"
      parameters:
        connectionFactory: '#bean:myRabbitConnectionFactory'
        queues: test-data-queue
        autoDeclare: true
        deadLetterExchange: income-test-data
        deadLetterQueue: test-data-queue-dlq
        deadLetterRoutingKey: deadLetterTestData
        retryDelay: 5000
        arg.queue.durable: true
        arg.queue.autoDelete: false
      steps:
        - to:
            uri: "log:income?level=INFO&showAll=true"
        - choice:
            when:
              - jsonpath:
                  expression: "$.[?(@.operation == 'CREATE')]"
                steps:
                  - to: "ecos-event:test-data-create"
              - jsonpath:
                  expression: "$.[?(@.operation == 'UPDATE')]"
                steps:
                  - to: "ecos-event:test-data-update"
            otherwise:
              steps:
                - throwException:
                    exceptionType: "java.lang.IllegalArgumentException"
                    message: "Unsupported operation. Only CREATE and UPDATE are supported."

EcosRecordsSync camel component

EcosRecordsSyncComponent - компонент camel, созданный для перебора/обновления записей через RecordsAPI. Ключ для использования компонента в camel-контексте: ecos-records-sync

Компонент включает в себя как потребителя EcosRecordsSyncConsumer, так и производителя EcosRecordsSyncProducer по терминологии camel

Ниже будут примеры регистрации компонента в yaml формате, например, при регистрации через Camel DSL.

  1. EcosRecordsSyncConsumer. Расширяет стандартный ScheduledBatchPollingConsumer, реализует перебор записей по ecos типу + sourceId. Возможные настройки для ecos-records-sync консьюмера:

Key

Value

syncId

уникальное значение в рамках инстанса приложения, на котором запускаются camel контексты. На основе этого значения создается стейт для периодического пуллинга из sourceId
см: journalId=ecos-sync-state

syncMode

DEFAULT | CREATE | UPDATE
DEFAULT, UPDATE - перебор записей по дате обновления
CREATE - перебор записей по дате создания

sourceId

sourceId типа

typeRef

ecos тип

batchSize

размер батча при пуллинге

Пример использования:

- route:
    from:
      uri: ecos-records-sync:testEcosRecordsSync
      parameters:
        delay: 15000
        sourceId: emodel/test-type-mig-from
        typeRef: emodel/type@test-type-mig-from
        batchSize: 5
      steps:
      - to: log:ers-test
  1. EcosRecordsSyncProducer Расширяет DefaultProducer, реализует обновление записи через RecordsAPI. Данные для обновления берется из тела сообщения (id из тела из проперти сообщения - CamelEcosRecordsSyncEntityRef). Возможные настройки для ecos-records-sync продюсера:

Key

Value

syncId

любое значение, скорее информационное

sourceId

sourceId типа

Пример использования:

- route:
    from:
      uri: .....
      steps:
      - to:
          uri: ecos-records-sync:test-type-mig-to
          parameters:
            sourceId: emodel/test-type-mig-to