Endpoints

Endpoints are used for reading or writing data in a specific source.

More details - https://camel.apache.org/manual/endpoint.html

FileFromCamelDslEndpoint

This component allows working with a file added to the Camel DSL configuration form:

../../_images/Proc_01.png

Key for using the component in a Camel context: file-from-camel-dsl

FileFromCamelDslConsumer reads data from the file as a ByteArray and places it in the exchange for further processing

Example:

- route:
    from:
      uri: "file-from-camel-dsl:randomName"
      steps:
        - process:
            ref: csvToListOfDataProcessor
        - split:
            simple: "${body}"
            steps:
              - to: ecos-records-mutate:?sourceId=emodel/camel-example-employee

EcosRecordsSyncConsumer

Endpoint for sequential extraction of any number of records from the specified data source.

URI

ecos-records-sync-consumer:syncName

ecos-records-sync-consumer - constant

syncName - synchronization name. Can be any and is used for saving and accessing the state. I.e., if you change syncName, the consumer will work “from scratch”.

Output Data

Data Type: List<DataValue>

Description: A list of DataValue objects with attributes that were loaded for the records. The global record identifier is not included in the attributes. If needed, it should be explicitly specified in the attributes:

attributes:
  ?id: ?id

Parameters

Parameter

Type

Default Value

Mandatory

Description

sourceId

String?

null

No

Identifier of the data source from which we will load records.
Can be omitted if ecosType is specified.
Examples: emodel/source0, emodel/source1

ecosType

String?

null

No

Local type identifier.
If sourceId is not specified, it is taken from the type.
Examples: contract, attorney

predicate

Predicate?

Always True

No

Record search criteria.
Examples: {“t”: “eq”, “a”: “_type”, “v”: “emodel/type@case”}

batchSize

Int

100

No

Size of the batch of records processed simultaneously

attributes

Map<String, String>

Yes

Attributes to load for records

addAuditAttributes

Boolean

true

No

Add audit attributes (_created,_creator,_modified,_modifier) to the list of attributes for loading

delay

Long

500

No

Number of milliseconds between processing batches of records

greedy

Boolean

false

No

If true and the number of processed records is greater than zero, do not wait for delay before the next processing, but immediately call the next poll

initialDelay

Long

1000

No

Delay before the first record processing

Note

Additional parameters can be viewed in the source code of the org.apache.camel.support.ScheduledPollEndpoint class

Usage Example

- route:
    from:
      uri: ecos-records-sync-consumer:legalEntity-mgr-from
      parameters:
        delay: 30000
        sourceId: emodel/source1
        predicate:
          t: eq
          a: type
          v: legalEntity
        addAuditAttributes: true
        attributes:
          ?id: ?id
          id: ?localId
          title: title
          name: name
      steps:
        - to: log:ecos-records-sync

Operating Principle

Every {delay} milliseconds, a request is made to the data source {sourceId}, which is either explicitly set or loaded from {ecosType}. A batch of records of size <= {batchSize} is loaded from the data source. The loaded batch is sent for processing to the steps specified in the route.

The state is updated only if the batch of records is processed successfully. In case of an error, the state remains old and upon the next trigger, loading will continue from the previous state.

Individual Record Processing

If individual record processing is required, you can split the list elements and process each one separately:

- route:
    from:
      uri: ecos-records-sync-consumer:routeStage-mgr-from
      parameters:
        initialDelay: 10000
        delay: 15000
        sourceId: emodel/source1
        predicate:
          t: eq
          a: type
          v: routeStage
        batchSize: 30
        addAuditAttributes: true
        attributes:
          id: ?localId
          title: title
          name: name
      steps:
        - split:
            simple: "${body}"
            steps:
              - to: log:result # в этих шагах каждый элемент будет обработан отдельно

If you need to process records individually in some way and then collect them back into a single batch, you can use the aggregation strategy:

- beans:
    - name: customJsonPatch
      type: ru.citeck.ecos.camel.processor.data.JsonPatchOperationsProcessor

    - name: collectToListStrategy
      type: org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy
- route:
    from:
      uri: ecos-records-sync-consumer:routeStage-mgr-from
      parameters:
        initialDelay: 10000
        delay: 15000
        sourceId: emodel/source1
        predicate:
          t: eq
          a: type
          v: routeStage
        batchSize: 30
        addAuditAttributes: true
        attributes:
          id: ?localId
          title: title
          name: name
      steps:
        - split:
            simple: "${body}"
            aggregationStrategy: collectToListStrategy # это ключевое отличие
            steps:
              - setHeader:
                  name: JsonPatchOperations
                  constant:
                    - op: set
                      path: "_parentAtt"
                      value: templateRouteApprovingStages
              - process:
                  ref: customJsonPatch
        - to: log:result # после split мы будем обрабатывать лист, который собрался после индивидуальной обработки записей

Strategies

Iteration is performed by several strategies. Each time a poll is triggered (processing the next batch of records), a request for the next batch of records is made through one of the strategies described below. The first non-empty result is used, and further strategy iteration is not performed. Each strategy has a state that stores data to continue iteration from the last processed point.

1. By Creation Date

Iteration is performed by the _created attribute from the start of the epoch (1970-01-01T00:00:00Z)

State:

Property

Type

Default

Description

totalCount

Long

-1

Expected total number of all records for synchronization.
Filled at the beginning and not updated during iteration

lastCreated

Instant

Instant.EPOCH

Date of the last created node we processed

lastRef

EntityRef

EntityRef.EMPTY

Reference to the last processed record

skipCount

Int

0

Number of elements to skip in the next request.
Used for processing records that have the same creation date.

processedCount

Long

0

Number of processed records

lastCreatedCounter

Int

0

Counter of records with the same creation date.
Used to adjust the creation date in the resulting data by adding lastCreatedCounter microseconds to it.
This is necessary so that after loading this data into another database, the order is preserved when sorting by the _created field.

Strategy Features:

Order preservation - if the exported data has records with the same creation date, the first one will have the original date, and all subsequent ones will have a creation date increased by N microseconds. The number of microseconds increases with each new record having the same creation date.

2. By Modification Date

Iteration is performed by the _modified attribute from the synchronization start date.

State:

Property

Type

Default

Description

lastModified

Instant

Synchronization start date

Date of the last modified record we processed

lastRef

EntityRef

EntityRef.EMPTY

Reference to the last processed record

skipCount

Int

0

Number of elements to skip in the next request.
Used for processing records that have the same modification date.

processedCount

Long

0

Number of processed records

Strategy Features:

Synchronization is performed only for those records that were created before the creation date of the last synchronized record from the first strategy.

EcosRecordsMutateEndpoint

Endpoint for performing mutation via RecordsAPI

URI

ecos-records-mutate:?sourceId={sourceId}

  • ecos-records-mutate - constant

  • sourceId - identifier of the data source where the mutation request should be sent. For example: ‘emodel/contract’, ‘emodel/custom-type’

Input Data

The endpoint accepts either a single map-like value, or a list.

A map-like value is Map<String, Any?>, DataValue, ObjectData, and others that can be converted to Map<String, Any?>.

A map-like value structurally contains a list of attributes that will be used for mutation.

If among the attributes there is “?id” with the full ref of the mutated entity, then the mutation will be performed specifically for that entity.

If among the attributes there is “id” (local entity id), then the mutation will either update an existing entity or create a new one if it does not exist yet.