Skip to content

Commit

Permalink
update(site): add archives for release v1.3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Aug 6, 2020
1 parent 9e90720 commit d3dc16a
Show file tree
Hide file tree
Showing 24 changed files with 1,638 additions and 3 deletions.
8 changes: 6 additions & 2 deletions site/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ copyright = "The Kafka Connect File Pulse Authors"
# This menu appears only if you have at least one [params.versions] set.
version_menu = "Releases"
archived_version = false
version = "v1.3.0"
version = "v1.3.x"

# Repository configuration (URLs for in-page links to opening issues and suggesting changes)
github_repo = "https://github.com/streamthoughts/kafka-connect-file-pulse"
Expand Down Expand Up @@ -129,7 +129,11 @@ no = 'Sorry to hear that. Please <a href="https://github.com/streamthoughts/kafk

[[params.versions]]
version = "master"
url = "./docs"
url = "/kafka-connect-file-pulse/docs/"

[[params.versions]]
version = "v1.3.x"
url = "/kafka-connect-file-pulse/v1-3/docs"

# As of Hugo 0.60
[markup]
Expand Down
8 changes: 8 additions & 0 deletions site/content/en/docs/Archives/_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
title: "Archives"
linkTitle: "Archives"
weight: 100
url: "/archives"
description: >
The documentations of prior releases.
---
12 changes: 12 additions & 0 deletions site/content/en/docs/Archives/v1.3.x/Developer Guide/_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
date: 2020-05-21
title: "Developer Guide"
linkTitle: "Developer Guide"
weight: 20
description: >
Learn about the concepts and the functionalities of the Connect File Pulse Plugin.
---
The Developer Guide section helps you learn about the functionalities of the File Pulse Connector and the concepts
File Pulse uses to process and transform your data, and helps you obtain a deeper understanding of how File Pulse Connector works.


Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
---
date: 2020-05-21
title: "Accessing Data and Metadata"
linkTitle: "Accessing Data and Metadata"
weight: 60
description: >
The commons configuration for Connect File Pulse.
---

Some filters (e.g : [AppendFilter](#appendfilter)) can be configured using *Simple Connect Expression Language*.

*Simple Connect Expression Language* (ScEL for short) is an expression language based on regex that allows quick access and manipulating record fields and metadata.

The syntax to define an expression is of the form : "`{{ <expression string> }}`".

{{% alert title="Note" color="info" %}}
In some situation double brackets can be omitted if the expression is used to write a value into a target field.
{{% /alert %}}

ScEL supports the following capabilities :

* **Field Selector**
* **Nested Navigation**
* **String substitution**
* **Functions**

## Field Selector

The expression language can be used to easily select one field from the input record :

"`{{ username }}`"

## Nested Navigation

To navigate down a struct value, just use a period to indicate a nested field value :

"`{{ address.city }}`"

## String substitution

The expression language can be used to easily build a new string field that concatenate multiple ones :

"`{{ <expression one> }}-{{ <expression two>}}`"

## Built-in Functions

ScEL supports a number of predefined functions that can be used to apply a single transformation on a field.

| Function | Description | Syntax |
| ---------------| --------------|-----------|
| `contains` | Returns `true` if an array field's value contains the specified value | `{{ contains(array, value) }}` |
| `converts` | Converts a field'value into the specified type | `{{ converts(field, INTEGER) }}` |
| `ends_with` | Returns `true` if an a string field's value end with the specified string suffix | `{{ ends_with(field, suffix) }}` |
| `equals` | Returns `true` if an a string or number fields's value equals the specified value | `{{ equals(field, value) }}` |
| `exists` | Returns `true` if an the specified field exists | `{{ ends_with(field, value) }}` |
| `extract_array`| Returns the element at the specified position of the specified array | `{{extract_array(array, 0) }}` |
| `is_null` | Returns `true` if a field's value is null | `{{ is_null(field) }}` |
| `length` | Returns the number of elements into an array of the length of an string field | `{{ length(array) }}` |
| `lowercase` | Converts all of the characters in a string field's value to lower case | `{{ lowercase(field) }}` |
| `matches` | Returns `true` if a field's value match the specified regex | `{{ matches(field, regex) }}` |
| `nlv` | Sets a default value if a field's value is null | `{{ length(array) }}` |
| `replace_all ` | Replaces every subsequence of the field's value that matches the given pattern with the given replacement string. | `{{ replace_all(field, regex, replacement) }}` |
| `starts_with` | Returns `true` if an a string field's value start with the specified string prefix | `{{ starts_with(field, prefix) }}` |
| `trim` | Trims the spaces from the beginning and end of a string. | `{{ trim(field) }}` |
| `uppercase` | Converts all of the characters in a string field's value to upper case | `{{ uppercase(field) }}` |


In addition, ScEL supports nested functions.

For example, the following expression is used to replace all whitespace characters after transforming our field's value into lowercase.

```
{{ replace_all(lowercase(field), \\s, -)}}
```

{{% alert title="Limitation" color="warning" %}}
Currently, FilePulse does not support user-defined functions (UDFs). So you cannot register your own functions to enrich the expression language.
{{% /alert %}}


## Scopes


In previous section, we have shown how to use the expression language to select a specific field.
The selected field was part of our the current record being processed.

Actually, ScEL allows you to get access to additional fields through the used of scopes.
Basically, a scope defined the root object on which a selector expression must evaluated.

The syntax to define an expression with a scope is of the form : "`{{ $<scope>.<selector expression string> }}`".

By default, if no scope is defined in the expression, the scope `$value` is implicitly used.

ScEL supports a number of predefined scopes that can be used for example :

- **To override the output topic.**
- **To define record the key to be used.**
- **To get access to the source file metadata.**
- Etc.

| Scope | Description | Type |
|--- | --- |--- |
| `{{ $headers }}` | The record headers | - |
| `{{ $key }}` | The record key | `string` |
| `{{ $metadata }}` | The file metadata | `struct` |
| `{{ $offset }}` | The offset information of this record into the source file | `struct` |
| `{{ $system }}` | The system environment variables and runtime properties | `struct` |
| `{{ $timestamp }}` | The record timestamp | `long` |
| `{{ $topic }}` | The output topic | `string` |
| `{{ $value }}` | The record value| `struct` |
| `{{ $variables }}` | The contextual filter-chain variables| `map[string, object]` |

Note, that in case of failures more fields are added to the current filter context (see : [Handling Failures](./handling-failures)

### Record Headers

The scope `headers` allows to defined the headers of the output record.

### Record key

The scope `key` allows to defined the key of the output record. Only string key is currently supported.

### Source Metadata

The scope `metadata` allows read access to information about the file being processing.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $metadata.name }}` | The file name | `string` |
| `{{ $metadata.path }}` | The file directory path | `string` |
| `{{ $metadata.absolutePath }}` | The file absolute path | `string` |
| `{{ $metadata.hash }}` | The file CRC32 hash | `int` |
| `{{ $metadata.lastModified }}` | The file last modified time. | `long` |
| `{{ $metadata.size }}` | The file size | `long` |
| `{{ $metadata.inode }}` | The file Unix inode | `long` |

## Record Offset

The scope `offset` allows read access to information about the original position of the record into the source file.
The available fields depend of the configured FileInputRecord.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.timestamp }}` | The creation time of the record (millisecond) | `long` |

Information only available if `RowFilterReader` is configured.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.startPosition }}` | The start position of the record into the source file | `long` |
| `{{ $offset.endPosition }}` | The end position of the record into the source file | `long` |
| `{{ $offset.size }}` | The size in bytes | `long` |
| `{{ $offset.row }}` | The row number of the record into the source | `long` |

Information only available if `BytesArrayInputReader` is configured.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.startPosition }}` | The start position of the record into the source file (always equals to 0) | `long` |
| `{{ $offset.endPosition }}` | The end position of the record into the source file (equals to the file size) | `long` |

Information only available if `AvroFilterInputReader` is configured.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.blockStart }}` | The start position of the current block | `long` |
| `{{ $offset.position }}` | The position into the current block. | `long` |
| `{{ $offset.records }}` | The number of record read into the current block. | `long` |

## System

The scope `system` allows read access to system environment variables and runtime properties.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $system.env }}` | The system environment variables. | `map[string, string]` |
| `{{ $system.props }}` | The system environment properties. | `map[string, string]` |

## Timestamp

The scope `timestamp` allows to defined the timestamp of the output record.

## Topic

The scope `topic` allows to defined the target topic of the output record.

## Value

The scope `value` allows to defined the fields of the output record

## Variables

The scope `variables` allows read/write access to a simple key-value map structure.
This scope can be used to share user-defined variables between filters.

Note : variables are not cached between records.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
date: 2020-05-21
title: "File Cleanup Policies"
linkTitle: "File Cleanup Policies"
weight: 100
description: >
The commons configuration for Connect File Pulse.
---

The connector can be configured with a specific [FileCleanupPolicy](connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/clean/FileCleanupPolicy.java) implementation.

The cleanup policy can be configured with the below connect property :

| Configuration | Description | Type | Default | Importance |
| --------------| --------------|-----------| --------- | ------------- |
|`fs.cleanup.policy.class` | The fully qualified name of the class which is used to cleanup files | class | *-* | high |


## Available Cleanup Policies

### DeleteCleanPolicy

This policy deletes all files regardless of their final status (completed or failed).

To enable this policy, the property `fs.cleanup.policy.class` must configured to :

```
io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanPolicy
```

#### Configuration
no configuration

### LogCleanPolicy

This policy prints to logs some information after files completion.

To enable this policy, the property `fs.cleanup.policy.class` must configured to :

```
io.streamthoughts.kafka.connect.filepulse.clean.LogCleanPolicy
```

#### Configuration
no configuration

### MoveCleanPolicy

This policy attempts to move atomically files to configurable target directories.

To enable this policy, the property `fs.cleanup.policy.class` must configured to :

```
io.streamthoughts.kafka.connect.filepulse.clean.MoveCleanPolicy
```

#### Configuration

| Configuration | Description | Type | Default | Importance |
| --------------| --------------|-----------| --------- | ------------- |
|`cleaner.output.failed.path` | Target directory for file proceed with failure | string | *.failure* | high |
|`cleaner.output.succeed.path` | Target directory for file proceed successfully | string | *.success* | high |

## Implementing your own policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
date: 2020-05-21
title: "Conditional Execution"
linkTitle: "Conditional Execution"
weight: 60
description: >
Learn how to conditionally execute a transformation filter.
---

A conditional property `if` can be configured on each filter to determine if that filter should be applied or skipped.
When a filter is skipped, message flow to the next filter without any modification.

`if` configuration accepts a Simple Connect Expression that must return to `true` or `false`.
If the configured expression does not evaluate to a boolean value the filter chain will failed.

The`if` property supports ([simple expression](accessing-data-and-metadata))

The boolean value returned from the filter condition can be inverted by setting the property `invert` to `true`.

For example, the below filter will only be applied on message having a log message containing "BadCredentialsException"

```
filters.TagSecurityException.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.TagSecurityException.if={{ contains(data.logmessage, BadCredentialsException) }}
filters.TagSecurityException.invert=false
filters.TagSecurityException.field=tags
filters.TagSecurityException.values=SecurityAlert
```

These boolean functions are available for use with `if` configuration :

| Function | Description | Syntax |
| --------------| --------------|-----------|
| `contains` | Returns `true` if an array field's value contains the specified value | `{% raw %}{{ contains(field, value) }}{% endraw %}` |
| `ends_with` | Returns `true` if an a string field's value end with the specified string suffix | `{% raw %}{{ ends_with(field, suffix) }}{% endraw %}` |
| `equals` | Returns `true` if an a string or number fields's value equals the specified value | `{% raw %}{{ equals(field, value) }}{% endraw %}` |
| `exists` | Returns `true` if an the specified field exists | `{% raw %}{{ exists(struct, field) }}{% endraw %}` |
| `is_null` | Returns `true` if a field's value is null | `{% raw %}{{ is_null(field) }}{% endraw %}` |
| `matches` | Returns `true` if a field's value match the specified regex | `{% raw %}{{ matches(field, regex) }}{% endraw %}` |
| `starts_with` | Returns `true` if an a string field's value start with the specified string prefix | `{% raw %}{{ starts_with(field, prefix) }}{% endraw %}` |


**Limitations** :
* `if` property does not support binary operator and then a single condition can be configured.
* condition cannot be used to easily create pipeline branching.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
date: 2020-05-25
title: "Basic Configuration"
linkTitle: "Basic Configuration"
weight: 20
description: >
The commons configuration for deploying a File Pulse connector.
---

## Commons configuration

Whatever the kind of files you are processing a connector should always be configured with the below properties.
Those configuration are described in detail in subsequent chapters.

| Configuration | Description | Type | Default | Importance |
| --------------| --------------|-----------| --------- | ------------- |
|`fs.scanner.class` | The fully qualified name of the class which is used to scan file system | class | *io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker* | medium |
|`fs.cleanup.policy.class` | The fully qualified name of the class which is used to cleanup files | class | *-* | high |
|`fs.scan.directory.path` | The input directory to scan | string | *-* | high |
|`fs.scan.interval.ms` | Time interval (in milliseconds) at wish to scan input directory | long | *10000* | high |
|`fs.scan.filters` | Filters use to list eligible input files| list | *-* | medium |
|`filters` | List of filters aliases to apply on each data (order is important) | list | *-* | medium |
|`internal.kafka.reporter.topic` | Name of the internal topic used by tasks and connector to report and monitor file progression. | class | *connect-file-pulse-status* | high |
|`internal.kafka.reporter.bootstrap.servers` |A list of host/port pairs uses by the reporter for establishing the initial connection to the Kafka cluster. | string | *-* | high |
|`task.reader.class` | The fully qualified name of the class which is used by tasks to read input files | class | *io.streamthoughts.kafka.connect.filepulse.reader.RowFileReader* | high |
|`offset.strategy` | The strategy to use for building source offset from an input file; must be one of [name, path, name+hash] | string | *name+hash* | high |
|`topic` | The default output topic to write | string | *-* | high |


### Prior to Connect FilePulse 1.3.x (deprecated)
| Configuration | Description | Type | Default | Importance |
| --------------| --------------|-----------| --------- | ------------- |
|`internal.kafka.reporter.id` | The reporter identifier to be used by tasks and connector to report and monitor file progression (default null). This property must only be set for users that have run a connector in version prior to 1.3.x to ensure backward-compatibility (when set, must be unique for each connect instance). | string | *-* | high |

Loading

0 comments on commit d3dc16a

Please sign in to comment.