Skip to content

Commit

Permalink
Merge pull request #136 from OlehPalanskyi/main
Browse files Browse the repository at this point in the history
Added a retry logic and a service availability check function for high availability.
  • Loading branch information
cosmo0920 authored Apr 30, 2024
2 parents 0762b3f + 3abd9eb commit 89fe756
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 3 deletions.
92 changes: 92 additions & 0 deletions README.OpenSearchInput.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
+ [docinfo_fields](#docinfo_fields)
+ [docinfo_target](#docinfo_target)
+ [docinfo](#docinfo)
+ [check_connection](#check_connection)
+ [retry_forever](#retry_forever)
+ [retry_timeout](#retry_timeout)
+ [retry_max_times](#retry_max_times)
+ [retry_type](#retry_type)
+ [retry_wait](#retry_wait)
+ [retry_exponential_backoff_base](#retry_exponential_backoff_base)
+ [retry_max_interval](#retry_max_interval)
+ [retry_randomize](#retry_randomize)

* [Advanced Usage](#advanced-usage)

## Usage
Expand Down Expand Up @@ -274,6 +284,88 @@ This parameter specifies whether docinfo information including or not. The defau
docinfo false
```

### check_connection

The parameter for checking on connection availability with Elasticsearch or Opensearch hosts. The default value is `true`.

```
check_connection true
```
### retry_forever

The parameter If true, plugin will ignore retry_timeout and retry_max_times options and retry forever. The default value is `true`.

```
retry_forever true
```

### retry_timeout

The parameter maximum time (seconds) to retry again the failed try, until the plugin discards the retry.
If the next retry is going to exceed this time limit, the last retry will be made at exactly this time limit..
The default value is `72h`.
72hours == 17 times with exponential backoff (not to change default behavior)

```
retry_timeout 72 * 60 * 60
```

### retry_max_times

The parameter maximum number of times to retry the failed try. The default value is `5`

```
retry_max_times 5
```

### retry_type

The parameter needs for how long need to wait (time in seconds) to retry again:
`exponential_backoff`: wait in seconds will become large exponentially per failure,
`periodic`: plugin will retry periodically with fixed intervals (configured via retry_wait). The default value is `:exponential_backoff`
Periodic -> fixed :retry_wait
Exponential backoff: k is number of retry times
c: constant factor, @retry_wait
b: base factor, @retry_exponential_backoff_base
k: times
total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1

```
retry_type exponential_backoff
```

### retry_wait

The parameter needs for wait in seconds before the next retry to again or constant factor of exponential backoff. The default value is `5`

```
retry_wait 5
```

### retry_exponential_backoff_base

The parameter The base number of exponential backoff for retries. The default value is `2`

```
retry_exponential_backoff_base 2
```

### retry_max_interval

The parameter maximum interval (seconds) for exponential backoff between retries while failing. The default value is `nil`

```
retry_max_interval nil
```

### retry_randomize

The parameter If true, the plugin will retry after randomized interval not to do burst retries. The default value is `false`

```
retry_randomize false
```

## Advanced Usage

OpenSearch Input plugin and OpenSearch output plugin can combine to transfer records into another cluster.
Expand Down
81 changes: 78 additions & 3 deletions lib/fluent/plugin/in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
require 'faraday/excon'
require 'fluent/log-ext'
require 'fluent/plugin/input'
require 'fluent/plugin_helper'
require_relative 'opensearch_constants'

module Fluent::Plugin
Expand All @@ -39,7 +40,7 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
DEFAULT_STORAGE_TYPE = 'local'
METADATA = "@metadata".freeze

helpers :timer, :thread
helpers :timer, :thread, :retry_state

Fluent::Plugin.register_input('opensearch', self)

Expand Down Expand Up @@ -80,6 +81,23 @@ class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id']
config_param :docinfo_target, :string, :default => METADATA
config_param :docinfo, :bool, :default => false
config_param :check_connection, :bool, :default => true
config_param :retry_forever, :bool, default: true, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry forever.'
config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry'
# 72hours == 17 times with exponential backoff (not to change default behavior)
config_param :retry_max_times, :integer, default: 5, desc: 'The maximum number of times to retry'
# exponential backoff sequence will be initialized at the time of this threshold
config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff
### Periodic -> fixed :retry_wait
### Exponential backoff: k is number of retry times
# c: constant factor, @retry_wait
# b: base factor, @retry_exponential_backoff_base
# k: times
# total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1
config_param :retry_wait, :time, default: 5, desc: 'Seconds to wait before next retry , or constant factor of exponential backoff.'
config_param :retry_exponential_backoff_base, :float, default: 2, desc: 'The base number of exponential backoff for retries.'
config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'
config_param :retry_randomize, :bool, default: false, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'

include Fluent::Plugin::OpenSearchConstants

Expand All @@ -92,6 +110,7 @@ def configure(conf)

@timestamp_parser = create_time_parser
@backend_options = backend_options
@retry = nil

raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

Expand Down Expand Up @@ -138,6 +157,15 @@ def backend_options
raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end

def retry_state(randomize)
retry_state_create(
:input_retries, @retry_type, @retry_wait, @retry_timeout,
forever: @retry_forever, max_steps: @retry_max_times,
max_interval: @retry_max_interval, backoff_base: @retry_exponential_backoff_base,
randomize: randomize
)
end

def get_escaped_userinfo(host_str)
if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
m["scheme"] +
Expand Down Expand Up @@ -176,12 +204,29 @@ def get_connection_options(con_host=nil)
host.merge!(user: @user, password: @password) if !host[:user] && @user
host.merge!(path: @path) if !host[:path] && @path
end

live_hosts = @check_connection ? hosts.select { |host| reachable_host?(host) } : hosts
{
hosts: hosts
hosts: live_hosts
}
end

def reachable_host?(host)
client = OpenSearch::Client.new(
host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"],
user: host[:user],
password: host[:password],
reload_connections: @reload_connections,
request_timeout: @request_timeout,
resurrect_after: @resurrect_after,
reload_on_failure: @reload_on_failure,
transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }
)
client.ping
rescue => e
log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}: #{e.message}"
false
end

def emit_error_label_event(&block)
# If `emit_error_label_event` is specified as false, error event emittions are not occurred.
if emit_error_label_event
Expand Down Expand Up @@ -292,6 +337,25 @@ def is_existing_connection(host)
return true
end

def update_retry_state(error=nil)
if error
unless @retry
@retry = retry_state(@retry_randomize)
end
@retry.step
#Raise error if the retry limit has been reached
raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{error.message}" if @retry.limit?
#Retry if the limit hasn't been reached
log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message)
sleep(@retry.next_time - Time.now)
else
unless @retry.nil?
log.debug("retry succeeded.")
@retry = nil
end
end
end

def run
return run_slice if @num_slices <= 1

Expand All @@ -302,6 +366,9 @@ def run
run_slice(slice_id)
end
end
rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => error
update_retry_state(error)
retry
end

def run_slice(slice_id=nil)
Expand All @@ -321,7 +388,15 @@ def run_slice(slice_id=nil)
end

router.emit_stream(@tag, es)
clear_scroll(scroll_id)
update_retry_state
end

def clear_scroll(scroll_id)
client.clear_scroll(scroll_id: scroll_id) if scroll_id
rescue => e
# ignore & log any clear_scroll errors
log.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end

def process_scroll_request(scroll_id)
Expand Down
7 changes: 7 additions & 0 deletions test/plugin/test_in_opensearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class OpenSearchInputTest < Test::Unit::TestCase
CONFIG = %[
tag raw.opensearch
interval 2
check_connection false
]

def setup
Expand Down Expand Up @@ -190,6 +191,7 @@ def test_configure
user john
password doe
tag raw.opensearch
check_connection false
}
instance = driver(config).instance

Expand Down Expand Up @@ -228,6 +230,7 @@ def test_single_host_params_and_defaults
user john
password doe
tag raw.opensearch
check_connection false
}
instance = driver(config).instance

Expand All @@ -249,6 +252,7 @@ def test_single_host_params_and_defaults_with_escape_placeholders
user %{j+hn}
password %{d@e}
tag raw.opensearch
check_connection false
}
instance = driver(config).instance

Expand All @@ -271,6 +275,7 @@ def test_legacy_hosts_list
path /es/
port 123
tag raw.opensearch
check_connection false
}
instance = driver(config).instance

Expand All @@ -295,6 +300,7 @@ def test_hosts_list
user default_user
password default_password
tag raw.opensearch
check_connection false
}
instance = driver(config).instance

Expand Down Expand Up @@ -323,6 +329,7 @@ def test_hosts_list_with_escape_placeholders
user default_user
password default_password
tag raw.opensearch
check_connection false
}
instance = driver(config).instance

Expand Down

0 comments on commit 89fe756

Please sign in to comment.