Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate kubernetes_sumologic filter plugin #167

Merged
merged 3 commits into from
Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deploy/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ RUN gem install fluent-plugin-systemd -v 1.0.2 \
&& gem install fluent-plugin-sumologic_output -v 1.5.0 \
&& gem install fluent-plugin-concat -v 2.4.0 \
&& gem install fluent-plugin-rewrite-tag-filter -v 2.2.0 \
&& gem install fluent-plugin-prometheus -v 1.5.0 \
&& gem install fluent-plugin-kubernetes_sumologic -v 2.4.2
&& gem install fluent-plugin-prometheus -v 1.5.0

# FluentD plugins from this repository
RUN gem install --local fluent-plugin-prometheus-format \
&& gem install --local fluent-plugin-kubernetes-sumologic \
&& gem install --local fluent-plugin-enhance-k8s-metadata \
&& gem install --local fluent-plugin-datapoint \
&& gem install --local fluent-plugin-protobuf \
Expand Down
9 changes: 9 additions & 0 deletions fluent-plugin-kubernetes-sumologic/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source 'https://rubygems.org'

group :test do
gem 'codecov'
gem 'simplecov'
gem 'webmock'
end

gemspec
5 changes: 5 additions & 0 deletions fluent-plugin-kubernetes-sumologic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# fluent-plugin-kubernetes-sumologic

[Fluentd](https://fluentd.org/) filter plugin to enrich logs with Sumo Logic specific metadata.

This README is a WIP. For detailed documentation please go [here](https://github.com/SumoLogic/fluentd-kubernetes-sumologic).
13 changes: 13 additions & 0 deletions fluent-plugin-kubernetes-sumologic/Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require "bundler"
Bundler::GemHelper.install_tasks

require "rake/testtask"

Rake::TestTask.new(:test) do |t|
t.libs.push("lib", "test")
t.test_files = FileList["test/**/test_*.rb"]
t.verbose = true
t.warning = true
end

task default: [:test]
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# coding: utf-8
lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-kubernetes-sumologic"
spec.version = "0.0.0"
spec.authors = ["Sumo Logic"]
spec.email = ["collection@sumologic.com"]
spec.description = %q{FluentD plugin to enrich logs with Sumo Logic specific metadata.}
spec.summary = %q{FluentD plugin to enrich logs with Sumo Logic specific metadata.}
spec.homepage = "https://github.com/SumoLogic/sumologic-kubernetes-collection"
spec.license = "Apache-2.0"

test_files, files = `git ls-files -z`.split("\x0").partition do |f|
f.match(%r{^(test|spec|features)/})
end
spec.files = files
spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = test_files
spec.require_paths = ["lib"]

spec.required_ruby_version = '>= 2.0.0'

spec.add_development_dependency "bundler", "~> 2"
spec.add_development_dependency "rake"
spec.add_development_dependency 'test-unit', '~> 3.1.0'
spec.add_development_dependency "codecov", ">= 0.1.10"
spec.add_runtime_dependency "fluentd", [">= 0.14.10", "< 2"]
spec.add_runtime_dependency 'httpclient', '~> 2.8.0'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
require "fluent/filter"

module Fluent::Plugin
class SumoContainerOutput < Fluent::Plugin::Filter
# Register type
Fluent::Plugin.register_filter("kubernetes_sumologic", self)

config_param :kubernetes_meta, :bool, :default => true
config_param :kubernetes_meta_reduce, :bool, :default => false
config_param :source_category, :string, :default => "%{namespace}/%{pod_name}"
config_param :source_category_replace_dash, :string, :default => "/"
config_param :source_category_prefix, :string, :default => "kubernetes/"
config_param :source_name, :string, :default => "%{namespace}.%{pod}.%{container}"
config_param :log_format, :string, :default => "json"
config_param :source_host, :string, :default => ""
config_param :exclude_container_regex, :string, :default => ""
config_param :exclude_facility_regex, :string, :default => ""
config_param :exclude_host_regex, :string, :default => ""
config_param :exclude_namespace_regex, :string, :default => ""
config_param :exclude_pod_regex, :string, :default => ""
config_param :exclude_priority_regex, :string, :default => ""
config_param :exclude_unit_regex, :string, :default => ""
config_param :add_stream, :bool, :default => true
config_param :add_time, :bool, :default => true

def configure(conf)
super
end

def is_number?(string)
true if Float(string) rescue false
end

def sanitize_pod_name(k8s_metadata)
# Strip out dynamic bits from pod name.
# NOTE: Kubernetes deployments append a template hash.
# At the moment this can be in 3 different forms:
# 1) pre-1.8: numeric in pod_template_hash and pod_parts[-2]
# 2) 1.8-1.11: numeric in pod_template_hash, hash in pod_parts[-2]
# 3) post-1.11: hash in pod_template_hash and pod_parts[-2]

pod_parts = k8s_metadata[:pod].split("-")
pod_template_hash = k8s_metadata[:"label:pod-template-hash"]
if (pod_template_hash == pod_parts[-2] ||
to_hash(pod_template_hash) == pod_parts[-2])
k8s_metadata[:pod_name] = pod_parts[0..-3].join("-")
else
k8s_metadata[:pod_name] = pod_parts[0..-2].join("-")
end
end

def to_hash(pod_template_hash)
# Convert the pod_template_hash to an alphanumeric string using the same logic Kubernetes
# uses at https://github.com/kubernetes/apimachinery/blob/18a5ff3097b4b189511742e39151a153ee16988b/pkg/util/rand/rand.go#L119
alphanums = "bcdfghjklmnpqrstvwxz2456789"
pod_template_hash.each_byte.map { |i| alphanums[i.to_i % alphanums.length] }.join("")
end

def filter(tag, time, record)
# Set the sumo metadata fields
sumo_metadata = record["_sumo_metadata"] || {}
record["_sumo_metadata"] = sumo_metadata
log_fields = {}
sumo_metadata[:log_format] = @log_format
sumo_metadata[:host] = @source_host if @source_host
sumo_metadata[:source] = @source_name if @source_name

unless @source_category.nil?
sumo_metadata[:category] = @source_category.dup
unless @source_category_prefix.nil?
sumo_metadata[:category].prepend(@source_category_prefix)
end
end

if record.key?("_SYSTEMD_UNIT") and not record.fetch("_SYSTEMD_UNIT").nil?
unless @exclude_unit_regex.empty?
if Regexp.compile(@exclude_unit_regex).match(record["_SYSTEMD_UNIT"])
return nil
end
end

unless @exclude_facility_regex.empty?
if Regexp.compile(@exclude_facility_regex).match(record["SYSLOG_FACILITY"])
return nil
end
end

unless @exclude_priority_regex.empty?
if Regexp.compile(@exclude_priority_regex).match(record["PRIORITY"])
return nil
end
end

unless @exclude_host_regex.empty?
if Regexp.compile(@exclude_host_regex).match(record["_HOSTNAME"])
return nil
end
end
end

# Allow fields to be overridden by annotations
if record.key?("kubernetes") and not record.fetch("kubernetes").nil?
# Clone kubernetes hash so we don't override the cache
kubernetes = record["kubernetes"].clone
k8s_metadata = {
:namespace => kubernetes["namespace_name"],
:pod => kubernetes["pod_name"],
:pod_id => kubernetes['pod_id'],
:container => kubernetes["container_name"],
:source_host => kubernetes["host"],
}


if kubernetes.has_key? "labels"
kubernetes["labels"].each { |k, v| k8s_metadata["label:#{k}".to_sym] = v }
end
if kubernetes.has_key? "namespace_labels"
kubernetes["namespace_labels"].each { |k, v| k8s_metadata["namespace_label:#{k}".to_sym] = v }
end
k8s_metadata.default = "undefined"

annotations = kubernetes.fetch("annotations", {})
if annotations["sumologic.com/include"] == "true"
include = true
else
include = false
end

unless @exclude_namespace_regex.empty?
if Regexp.compile(@exclude_namespace_regex).match(k8s_metadata[:namespace]) and not include
return nil
end
end

unless @exclude_pod_regex.empty?
if Regexp.compile(@exclude_pod_regex).match(k8s_metadata[:pod]) and not include
return nil
end
end

unless @exclude_container_regex.empty?
if Regexp.compile(@exclude_container_regex).match(k8s_metadata[:container]) and not include
return nil
end
end

unless @exclude_host_regex.empty?
if Regexp.compile(@exclude_host_regex).match(k8s_metadata[:source_host]) and not include
return nil
end
end

sanitize_pod_name(k8s_metadata)

if annotations["sumologic.com/exclude"] == "true"
return nil
end

sumo_metadata[:log_format] = annotations["sumologic.com/format"] if annotations["sumologic.com/format"]

if annotations["sumologic.com/sourceHost"].nil?
sumo_metadata[:host] = sumo_metadata[:host] % k8s_metadata
else
sumo_metadata[:host] = annotations["sumologic.com/sourceHost"] % k8s_metadata
end

if annotations["sumologic.com/sourceName"].nil?
sumo_metadata[:source] = sumo_metadata[:source] % k8s_metadata
else
sumo_metadata[:source] = annotations["sumologic.com/sourceName"] % k8s_metadata
end

if annotations["sumologic.com/sourceCategory"].nil?
sumo_metadata[:category] = sumo_metadata[:category] % k8s_metadata
else
sumo_metadata[:category] = (annotations["sumologic.com/sourceCategory"] % k8s_metadata).prepend(@source_category_prefix)
end
sumo_metadata[:category].gsub!("-", @source_category_replace_dash)

# Strip kubernetes metadata from json if disabled
if annotations["sumologic.com/kubernetes_meta"] == "false" || !@kubernetes_meta
record.delete("docker")
record.delete("kubernetes")
end
if annotations["sumologic.com/kubernetes_meta_reduce"] == "true" || annotations["sumologic.com/kubernetes_meta_reduce"].nil? && @kubernetes_meta_reduce == true
record.delete("docker")
record["kubernetes"].delete("pod_id")
record["kubernetes"].delete("namespace_id")
record["kubernetes"].delete("labels")
record["kubernetes"].delete("namespace_labels")
record["kubernetes"].delete("master_url")
record["kubernetes"].delete("annotations")
end
if @add_stream == false
record.delete("stream")
end
if @add_time == false
record.delete("time")
end
# Strip sumologic.com annotations
kubernetes.delete("annotations") if annotations

if @log_format == "fields" and record.key?("docker") and not record.fetch("docker").nil?
record["docker"].each {|k, v| log_fields[k] = v}
end

if @log_format == "fields" and record.key?("kubernetes") and not record.fetch("kubernetes").nil?
if kubernetes.has_key? "labels"
kubernetes["labels"].each { |k, v| log_fields["pod_labels_#{k}".to_sym] = v }
end
if kubernetes.has_key? "namespace_labels"
kubernetes["namespace_labels"].each { |k, v| log_fields["namespace_labels_#{k}".to_sym] = v }
end
log_fields["container"] = kubernetes["container_name"] unless kubernetes["container_name"].nil?
log_fields["namespace"] = kubernetes["namespace_name"] unless kubernetes["namespace_name"].nil?
log_fields["pod"] = kubernetes["pod_name"] unless kubernetes["pod_name"].nil?
["pod_id", "host", "master_url", "namespace_id", "service", "deployment", "daemonset", "replicaset", "statefulset"].each do |key|
log_fields[key] = kubernetes[key] unless kubernetes[key].nil?
end
log_fields["node"] = kubernetes["host"] unless kubernetes["host"].nil?
end
end

if @log_format == "fields" and not log_fields.nil?
sumo_metadata[:fields] = log_fields.map{|k,v| "#{k}=#{v}"}.join(',')
record.delete("docker")
record.delete("kubernetes")
end
record
end
end
end
16 changes: 16 additions & 0 deletions fluent-plugin-kubernetes-sumologic/test/helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
require "simplecov"
SimpleCov.start

if ENV["CI"] == "true"
require "codecov"
SimpleCov.formatter = SimpleCov::Formatter::Codecov
end

$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/filter"
require "fluent/test/helpers"

Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)
Loading