From bc8615043f524259e5aaa127dcb0475d2cb1f6a7 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Fri, 21 Jun 2019 11:09:13 -0700 Subject: [PATCH 01/15] fix watch stream 403 --- .../lib/sumologic/kubernetes/connector.rb | 6 +++--- fluent-plugin-enhance-k8s-metadata/test/helper.rb | 12 ++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/connector.rb b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/connector.rb index efcd0d783b..513fa17e58 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/connector.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/connector.rb @@ -29,11 +29,11 @@ def group_clients end def create_client(base, ver) - url = "#{@kubernetes_url}/#{base}/#{ver}" - log.info "create client with URL: #{url}" + url = "#{@kubernetes_url}/#{base}" + log.info "create client with URL: #{url} and apiVersion: #{ver}" client = Kubeclient::Client.new( url, - '', + ver, ssl_options: ssl_options, auth_options: auth_options, as: :parsed diff --git a/fluent-plugin-enhance-k8s-metadata/test/helper.rb b/fluent-plugin-enhance-k8s-metadata/test/helper.rb index 1e2a33ec18..aef4c7ece6 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/helper.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/helper.rb @@ -15,6 +15,18 @@ def test_resource(name) def stub_apis init_globals + stub_request(:any, %r{/api$}) + .to_return( + 'body' => { + 'versions' => ['v1'] + }.to_json + ) + stub_request(:any, %r{/apis$}) + .to_return( + 'body' => { + 'versions' => ['apps/v1', 'extensions/v1beta1'] + }.to_json + ) stub_request(:get, %r{/api/v1$}) .to_return(body: test_resource('api_list_core_v1.json'), status: 200) stub_request(:get, %r{/apis/apps/v1$}) From 8ed34f54df8d3c2b77e32ccdc9aceb09caf497f4 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 24 Jun 2019 13:51:31 -0700 Subject: [PATCH 02/15] add service endpoints watcher thread to get pods for service --- deploy/docker/Dockerfile | 1 + deploy/kubernetes/fluentd-sumologic.yaml | 1 + ...fluent-plugin-enhance-k8s-metadata.gemspec | 1 + .../plugin/filter_enhance_k8s_metadata.rb | 108 ++++++++++++++++++ 4 files changed, 111 insertions(+) diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index e22e916aa1..982604747d 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -10,6 +10,7 @@ RUN apk add --no-cache libexecinfo libexecinfo-dev RUN apk add --no-cache snappy g++ snappy-dev RUN apk add --no-cache --update --virtual .build-deps sudo build-base ruby-dev \ + && gem install concurrent-ruby \ && gem install google-protobuf \ && gem install kubeclient \ && gem install lru_redux \ diff --git a/deploy/kubernetes/fluentd-sumologic.yaml b/deploy/kubernetes/fluentd-sumologic.yaml index 7799fdacb6..3195a94891 100644 --- a/deploy/kubernetes/fluentd-sumologic.yaml +++ b/deploy/kubernetes/fluentd-sumologic.yaml @@ -24,6 +24,7 @@ rules: - configmaps - daemonsets - deployments + - endpoints - events - namespaces - nodes diff --git a/fluent-plugin-enhance-k8s-metadata/fluent-plugin-enhance-k8s-metadata.gemspec b/fluent-plugin-enhance-k8s-metadata/fluent-plugin-enhance-k8s-metadata.gemspec index 1e8b786b45..32d7c4f5e8 100644 --- a/fluent-plugin-enhance-k8s-metadata/fluent-plugin-enhance-k8s-metadata.gemspec +++ b/fluent-plugin-enhance-k8s-metadata/fluent-plugin-enhance-k8s-metadata.gemspec @@ -19,6 +19,7 @@ Gem::Specification.new do |spec| spec.test_files = test_files spec.require_paths = ['lib'] + spec.add_runtime_dependency 'concurrent-ruby', '~> 1.1' spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2'] spec.add_runtime_dependency 'kubeclient', '~> 4.4.0' spec.add_runtime_dependency 'lru_redux', '~> 1.1.0' diff --git a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb index bd66d6e5ba..0f3f7c9ab3 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb @@ -1,3 +1,4 @@ +require 'concurrent' require 'fluent/plugin/filter' module Fluent @@ -9,6 +10,7 @@ class EnhanceK8sMetadataFilter < Fluent::Plugin::Filter require_relative '../../sumologic/kubernetes/cache_strategy.rb' helpers :record_accessor + helpers :thread include SumoLogic::Kubernetes::Connector include SumoLogic::Kubernetes::Reader include SumoLogic::Kubernetes::CacheStrategy @@ -44,6 +46,11 @@ def configure(conf) @in_pod_ac = @in_pod_path.map { |path| record_accessor_create(path) } end + def start + super + start_service_monitor + end + def filter(tag, time, record) decorate_record(record) record @@ -92,6 +99,107 @@ def normalize_param @out_root = 'kubernetes' if @out_root.nil? || @out_root.empty? log.info "out_root: #{@out_root}" end + + def start_service_monitor + log.info "Starting watching for service changes" + + @pods_to_services = Concurrent::Map.new + @watch_service_interval_seconds = 300 + @configmap_update_interval_seconds = 10 + + last_recreated = Time.now.to_i + log.debug "last_recreated initialized to #{last_recreated}" + + while true do + # Periodically restart watcher connection by checking if enough time has passed since + # last time watcher thread was recreated or if the watcher thread has been stopped. + now = Time.now.to_i + watcher_exists = Thread.list.select {|thread| thread.object_id == @watcher_id && thread.alive?}.count > 0 + if now - last_recreated >= @watch_service_interval_seconds || !watcher_exists + + log.debug "Recreating service watcher thread" + @watch_stream.finish if @watch_stream + + start_service_watcher_thread + last_recreated = now + log.debug "last_recreated updated to #{last_recreated}" + end + + sleep(@configmap_update_interval_seconds) + end + end + + def start_service_watcher_thread + log.debug "Starting service endpoints watcher thread" + params = Hash.new + params[:as] = :raw + params[:resource_version] = get_current_service_snapshot_resource_version + params[:timeout_seconds] = @watch_service_interval_seconds + 60 + + @watcher = @clients['v1'].public_send("watch_endpoints", params).tap do |watcher| + thread_create(:"watch_endpoints") do + @watch_stream = watcher + @watcher_id = Thread.current.object_id + log.debug "New thread to watch service endpoints #{@watcher_id} from resource version #{params[:resource_version]}" + + watcher.each do |entity| + begin + endpoint = JSON.parse(entity)['object'] + service = endpoint['metadata']['name'] + get_pods_for_service(endpoint).each do |pod| + @pods_to_services[pod] = service + end + rescue => e + log.error "Got exception #{e} parsing entity #{entity}. Skipping." + end + end + log.info "Closing watch stream" + end + end + end + + def get_current_service_snapshot_resource_version + log.debug "Getting current service snapshot" + begin + params = Hash.new + params[:as] = :raw + response = @clients['v1'].public_send "get_endpoints", params + result = JSON.parse(response) + result['items'].each do |endpoint| + service = endpoint['metadata']['name'] + get_pods_for_service(endpoint).each do |pod| + @pods_to_services[pod] = service + end + end + result['metadata']['resourceVersion'] + rescue => e + log.error "Got exception #{e} getting current service snapshot and corresponding resource version." + 0 + end + end + + def get_pods_for_service(endpoint) + log.debug "Getting pods for service #{endpoint['metadata']['name']}" + pods = [] + if endpoint.key? 'subsets' + endpoint['subsets'].each do |subset| + ['addresses', 'notReadyAddresses'].each do |key| + if subset.key? key + subset[key].each do |object| + if object.key? 'targetRef' + if object['targetRef']['kind'] == 'Pod' + pod = object['targetRef']['name'] + log.debug "Found Pod: #{pod}" + pods << pod + end + end + end + end + end + end + end + pods + end end end end From b60ae6b9d532dad08b11469275e3a0edca70e704 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 24 Jun 2019 14:01:54 -0700 Subject: [PATCH 03/15] remove reference to configmap --- .../lib/fluent/plugin/filter_enhance_k8s_metadata.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb index 0f3f7c9ab3..2f0df428f3 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb @@ -105,7 +105,6 @@ def start_service_monitor @pods_to_services = Concurrent::Map.new @watch_service_interval_seconds = 300 - @configmap_update_interval_seconds = 10 last_recreated = Time.now.to_i log.debug "last_recreated initialized to #{last_recreated}" @@ -125,7 +124,7 @@ def start_service_monitor log.debug "last_recreated updated to #{last_recreated}" end - sleep(@configmap_update_interval_seconds) + sleep(10) end end From 66cdd0811aec86388470377ce08d52388c5ed999 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Tue, 25 Jun 2019 13:25:24 -0700 Subject: [PATCH 04/15] correctly handle changes to service in pods to service hash --- .../plugin/filter_enhance_k8s_metadata.rb | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb index 2f0df428f3..f6ce8955bc 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb @@ -103,7 +103,6 @@ def normalize_param def start_service_monitor log.info "Starting watching for service changes" - @pods_to_services = Concurrent::Map.new @watch_service_interval_seconds = 300 last_recreated = Time.now.to_i @@ -123,7 +122,6 @@ def start_service_monitor last_recreated = now log.debug "last_recreated updated to #{last_recreated}" end - sleep(10) end end @@ -141,13 +139,10 @@ def start_service_watcher_thread @watcher_id = Thread.current.object_id log.debug "New thread to watch service endpoints #{@watcher_id} from resource version #{params[:resource_version]}" - watcher.each do |entity| + watcher.each do |event| begin - endpoint = JSON.parse(entity)['object'] - service = endpoint['metadata']['name'] - get_pods_for_service(endpoint).each do |pod| - @pods_to_services[pod] = service - end + event = JSON.parse(event) + handle_service_event(event) rescue => e log.error "Got exception #{e} parsing entity #{entity}. Skipping." end @@ -164,12 +159,14 @@ def get_current_service_snapshot_resource_version params[:as] = :raw response = @clients['v1'].public_send "get_endpoints", params result = JSON.parse(response) + new_snapshot_pods_to_services = Concurrent::Map.new {|h, k| h[k] = []} + result['items'].each do |endpoint| service = endpoint['metadata']['name'] - get_pods_for_service(endpoint).each do |pod| - @pods_to_services[pod] = service - end + get_pods_for_service(endpoint).each {|pod| new_snapshot_pods_to_services[pod] << service} end + + @pods_to_services = new_snapshot_pods_to_services result['metadata']['resourceVersion'] rescue => e log.error "Got exception #{e} getting current service snapshot and corresponding resource version." @@ -178,7 +175,6 @@ def get_current_service_snapshot_resource_version end def get_pods_for_service(endpoint) - log.debug "Getting pods for service #{endpoint['metadata']['name']}" pods = [] if endpoint.key? 'subsets' endpoint['subsets'].each do |subset| @@ -188,7 +184,7 @@ def get_pods_for_service(endpoint) if object.key? 'targetRef' if object['targetRef']['kind'] == 'Pod' pod = object['targetRef']['name'] - log.debug "Found Pod: #{pod}" + log.debug "Found Pod #{pod} for Service #{endpoint['metadata']['name']}" pods << pod end end @@ -199,6 +195,28 @@ def get_pods_for_service(endpoint) end pods end + + def handle_service_event(event) + type = event['type'] + endpoint = event['object'] + service = endpoint['metadata']['name'] + case type + when 'ADDED' + get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service} + when 'MODIFIED' + desired_pods = get_pods_for_service(endpoint) + @pods_to_services.each do |pod, services| + if services.include? service + services.delete service unless desired_pods.include? pod + end + end + desired_pods.each {|pod| @pods_to_services[pod] |= [service]} + when 'DELETED' + get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod].delete service} + else + log.error "Unknown type for watch endpoint event #{type}" + end + end end end end From b96f077fa88e27b0f99aae79ae9be2f38de53546 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Thu, 27 Jun 2019 10:55:39 -0700 Subject: [PATCH 05/15] move service_monitor out to separate file --- .../plugin/filter_enhance_k8s_metadata.rb | 121 +------ .../sumologic/kubernetes/service_monitor.rb | 128 +++++++ .../test/helper.rb | 2 + .../test/resources/endpoints_list.json | 324 ++++++++++++++++++ 4 files changed, 456 insertions(+), 119 deletions(-) create mode 100644 fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb create mode 100644 fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json diff --git a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb index f6ce8955bc..3ca610bbd9 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/fluent/plugin/filter_enhance_k8s_metadata.rb @@ -1,4 +1,3 @@ -require 'concurrent' require 'fluent/plugin/filter' module Fluent @@ -8,12 +7,14 @@ class EnhanceK8sMetadataFilter < Fluent::Plugin::Filter Fluent::Plugin.register_filter('enhance_k8s_metadata', self) require_relative '../../sumologic/kubernetes/cache_strategy.rb' + require_relative '../../sumologic/kubernetes/service_monitor.rb' helpers :record_accessor helpers :thread include SumoLogic::Kubernetes::Connector include SumoLogic::Kubernetes::Reader include SumoLogic::Kubernetes::CacheStrategy + include SumoLogic::Kubernetes::ServiceMonitor # parameters for read/write record config_param :in_namespace_path, :array, default: ['$.namespace'] @@ -99,124 +100,6 @@ def normalize_param @out_root = 'kubernetes' if @out_root.nil? || @out_root.empty? log.info "out_root: #{@out_root}" end - - def start_service_monitor - log.info "Starting watching for service changes" - - @watch_service_interval_seconds = 300 - - last_recreated = Time.now.to_i - log.debug "last_recreated initialized to #{last_recreated}" - - while true do - # Periodically restart watcher connection by checking if enough time has passed since - # last time watcher thread was recreated or if the watcher thread has been stopped. - now = Time.now.to_i - watcher_exists = Thread.list.select {|thread| thread.object_id == @watcher_id && thread.alive?}.count > 0 - if now - last_recreated >= @watch_service_interval_seconds || !watcher_exists - - log.debug "Recreating service watcher thread" - @watch_stream.finish if @watch_stream - - start_service_watcher_thread - last_recreated = now - log.debug "last_recreated updated to #{last_recreated}" - end - sleep(10) - end - end - - def start_service_watcher_thread - log.debug "Starting service endpoints watcher thread" - params = Hash.new - params[:as] = :raw - params[:resource_version] = get_current_service_snapshot_resource_version - params[:timeout_seconds] = @watch_service_interval_seconds + 60 - - @watcher = @clients['v1'].public_send("watch_endpoints", params).tap do |watcher| - thread_create(:"watch_endpoints") do - @watch_stream = watcher - @watcher_id = Thread.current.object_id - log.debug "New thread to watch service endpoints #{@watcher_id} from resource version #{params[:resource_version]}" - - watcher.each do |event| - begin - event = JSON.parse(event) - handle_service_event(event) - rescue => e - log.error "Got exception #{e} parsing entity #{entity}. Skipping." - end - end - log.info "Closing watch stream" - end - end - end - - def get_current_service_snapshot_resource_version - log.debug "Getting current service snapshot" - begin - params = Hash.new - params[:as] = :raw - response = @clients['v1'].public_send "get_endpoints", params - result = JSON.parse(response) - new_snapshot_pods_to_services = Concurrent::Map.new {|h, k| h[k] = []} - - result['items'].each do |endpoint| - service = endpoint['metadata']['name'] - get_pods_for_service(endpoint).each {|pod| new_snapshot_pods_to_services[pod] << service} - end - - @pods_to_services = new_snapshot_pods_to_services - result['metadata']['resourceVersion'] - rescue => e - log.error "Got exception #{e} getting current service snapshot and corresponding resource version." - 0 - end - end - - def get_pods_for_service(endpoint) - pods = [] - if endpoint.key? 'subsets' - endpoint['subsets'].each do |subset| - ['addresses', 'notReadyAddresses'].each do |key| - if subset.key? key - subset[key].each do |object| - if object.key? 'targetRef' - if object['targetRef']['kind'] == 'Pod' - pod = object['targetRef']['name'] - log.debug "Found Pod #{pod} for Service #{endpoint['metadata']['name']}" - pods << pod - end - end - end - end - end - end - end - pods - end - - def handle_service_event(event) - type = event['type'] - endpoint = event['object'] - service = endpoint['metadata']['name'] - case type - when 'ADDED' - get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service} - when 'MODIFIED' - desired_pods = get_pods_for_service(endpoint) - @pods_to_services.each do |pod, services| - if services.include? service - services.delete service unless desired_pods.include? pod - end - end - desired_pods.each {|pod| @pods_to_services[pod] |= [service]} - when 'DELETED' - get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod].delete service} - else - log.error "Unknown type for watch endpoint event #{type}" - end - end end end end diff --git a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb new file mode 100644 index 0000000000..30ebe0ead9 --- /dev/null +++ b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb @@ -0,0 +1,128 @@ +require 'concurrent' + +module SumoLogic + module Kubernetes + # module for watching changes to services + module ServiceMonitor + require_relative 'connector.rb' + + def start_service_monitor + log.info "Starting watching for service changes" + + @watch_service_interval_seconds = 300 + + last_recreated = Time.now.to_i + log.debug "last_recreated initialized to #{last_recreated}" + + while true do + # Periodically restart watcher connection by checking if enough time has passed since + # last time watcher thread was recreated or if the watcher thread has been stopped. + now = Time.now.to_i + watcher_exists = Thread.list.select {|thread| thread.object_id == @watcher_id && thread.alive?}.count > 0 + if now - last_recreated >= @watch_service_interval_seconds || !watcher_exists + + log.debug "Recreating service watcher thread" + @watch_stream.finish if @watch_stream + + start_service_watcher_thread + last_recreated = now + log.debug "last_recreated updated to #{last_recreated}" + end + sleep(10) + end + end + + def start_service_watcher_thread + log.debug "Starting service endpoints watcher thread" + params = Hash.new + params[:as] = :raw + params[:resource_version] = get_current_service_snapshot_resource_version + params[:timeout_seconds] = @watch_service_interval_seconds + 60 + + @watcher = @clients['v1'].public_send("watch_endpoints", params).tap do |watcher| + thread_create(:"watch_endpoints") do + @watch_stream = watcher + @watcher_id = Thread.current.object_id + log.debug "New thread to watch service endpoints #{@watcher_id} from resource version #{params[:resource_version]}" + + watcher.each do |event| + begin + event = JSON.parse(event) + handle_service_event(event) + rescue => e + log.error "Got exception #{e} parsing entity #{entity}. Skipping." + end + end + log.info "Closing watch stream" + end + end + end + + def get_current_service_snapshot_resource_version + log.debug "Getting current service snapshot" + begin + params = Hash.new + params[:as] = :raw + response = @clients['v1'].public_send "get_endpoints", params + result = JSON.parse(response) + new_snapshot_pods_to_services = Concurrent::Map.new {|h, k| h[k] = []} + + result['items'].each do |endpoint| + service = endpoint['metadata']['name'] + get_pods_for_service(endpoint).each {|pod| new_snapshot_pods_to_services[pod] << service} + end + + @pods_to_services = new_snapshot_pods_to_services + result['metadata']['resourceVersion'] + rescue => e + log.error "Got exception #{e} getting current service snapshot and corresponding resource version." + 0 + end + end + + def get_pods_for_service(endpoint) + pods = [] + if endpoint.key? 'subsets' + endpoint['subsets'].each do |subset| + ['addresses', 'notReadyAddresses'].each do |key| + if subset.key? key + subset[key].each do |object| + if object.key? 'targetRef' + if object['targetRef']['kind'] == 'Pod' + pod = object['targetRef']['name'] + log.debug "Found Pod #{pod} for Service #{endpoint['metadata']['name']}" + pods << pod + end + end + end + end + end + end + end + pods + end + + def handle_service_event(event) + type = event['type'] + endpoint = event['object'] + service = endpoint['metadata']['name'] + case type + when 'ADDED' + get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service} + when 'MODIFIED' + desired_pods = get_pods_for_service(endpoint) + @pods_to_services.each do |pod, services| + if services.include? service + services.delete service unless desired_pods.include? pod + end + end + desired_pods.each {|pod| @pods_to_services[pod] |= [service]} + when 'DELETED' + get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod].delete service} + else + log.error "Unknown type for watch endpoint event #{type}" + end + end + end + end +end \ No newline at end of file diff --git a/fluent-plugin-enhance-k8s-metadata/test/helper.rb b/fluent-plugin-enhance-k8s-metadata/test/helper.rb index aef4c7ece6..ddb307b4b9 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/helper.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/helper.rb @@ -33,6 +33,8 @@ def stub_apis .to_return(body: test_resource('api_list_apps_v1.json'), status: 200) stub_request(:get, %r{/apis/extensions/v1beta1$}) .to_return(body: test_resource('api_list_extensions_v1beta1.json'), status: 200) + stub_request(:get, %r{/api/v1/endpoints$}) + .to_return(body: test_resource('endpoints_list.json'), status: 200) stub_request(:get, %r{/api/v1/namespaces/sumologic/pods}) .to_return(body: test_resource('pod_sumologic.json'), status: 200) stub_request(:get, %r{/apis/extensions/v1beta1/namespaces/sumologic/replicasets}) diff --git a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json new file mode 100644 index 0000000000..2aab2824c6 --- /dev/null +++ b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json @@ -0,0 +1,324 @@ +{ + "apiVersion": "v1", + "items": [ + { + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "creationTimestamp": "2019-04-29T21:16:33Z", + "name": "kubernetes", + "namespace": "default", + "resourceVersion": "10425414", + "selfLink": "/api/v1/namespaces/default/endpoints/kubernetes", + "uid": "10526daf-6ac4-11e9-950a-028777db66fe" + }, + "subsets": [ + { + "addresses": [ + { + "ip": "172.20.38.183" + }, + { + "ip": "172.20.48.207" + }, + { + "ip": "172.20.78.16" + } + ], + "ports": [ + { + "name": "https", + "port": 443, + "protocol": "TCP" + } + ] + } + ] + }, + { + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "annotations": { + "control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"ip-172-20-48-207_4f02f76e-6b95-11e9-a1a7-06604569b78c\",\"leaseDurationSeconds\":15,\"acquireTime\":\"2019-04-30T22:18:10Z\",\"renewTime\":\"2019-06-25T20:29:47Z\",\"leaderTransitions\":3}" + }, + "creationTimestamp": "2019-02-27T18:59:29Z", + "name": "kube-controller-manager", + "namespace": "kube-system", + "resourceVersion": "19601594", + "selfLink": "/api/v1/namespaces/kube-system/endpoints/kube-controller-manager", + "uid": "cf49c821-3ac1-11e9-85d5-062f31d2c4ec" + } + }, + { + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "creationTimestamp": "2019-02-27T18:59:55Z", + "labels": { + "k8s-addon": "kube-dns.addons.k8s.io", + "k8s-app": "kube-dns", + "kubernetes.io/cluster-service": "true", + "kubernetes.io/name": "KubeDNS" + }, + "name": "kube-dns", + "namespace": "kube-system", + "resourceVersion": "17788387", + "selfLink": "/api/v1/namespaces/kube-system/endpoints/kube-dns", + "uid": "dedc02ed-3ac1-11e9-85d5-062f31d2c4ec" + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.114.10.95", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "name": "kube-dns-6b4f4b544c-gzl2r", + "namespace": "kube-system", + "resourceVersion": "17788386", + "uid": "6c47f296-8ed8-11e9-b605-06604569b78c" + } + }, + { + "ip": "100.115.226.65", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "name": "kube-dns-6b4f4b544c-98jxv", + "namespace": "kube-system", + "resourceVersion": "17309602", + "uid": "f4a54748-8c8f-11e9-b605-06604569b78c" + } + } + ], + "ports": [ + { + "name": "dns", + "port": 53, + "protocol": "UDP" + }, + { + "name": "dns-tcp", + "port": 53, + "protocol": "TCP" + } + ] + } + ] + }, + { + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "annotations": { + "control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"ip-172-20-48-207_43b66f71-6b95-11e9-b8ef-06604569b78c\",\"leaseDurationSeconds\":15,\"acquireTime\":\"2019-04-30T22:25:55Z\",\"renewTime\":\"2019-06-25T20:29:47Z\",\"leaderTransitions\":2}" + }, + "creationTimestamp": "2019-02-27T18:59:29Z", + "name": "kube-scheduler", + "namespace": "kube-system", + "resourceVersion": "19601595", + "selfLink": "/api/v1/namespaces/kube-system/endpoints/kube-scheduler", + "uid": "cf52af6e-3ac1-11e9-85d5-062f31d2c4ec" + } + }, + { + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "creationTimestamp": "2019-04-29T22:13:59Z", + "labels": { + "k8s-app": "kubelet" + }, + "name": "prometheus-operator-kubelet", + "namespace": "kube-system", + "resourceVersion": "17638823", + "selfLink": "/api/v1/namespaces/kube-system/endpoints/prometheus-operator-kubelet", + "uid": "16c80c7f-6acc-11e9-950a-028777db66fe" + }, + "subsets": [ + { + "addresses": [ + { + "ip": "172.20.38.183", + "targetRef": { + "kind": "Node", + "name": "ip-172-20-38-183.us-west-1.compute.internal", + "uid": "837d73a3-6b96-11e9-b250-06f6400abec8" + } + }, + { + "ip": "172.20.48.207", + "targetRef": { + "kind": "Node", + "name": "ip-172-20-48-207.us-west-1.compute.internal", + "uid": "774bd6d1-6b95-11e9-b605-06604569b78c" + } + }, + { + "ip": "172.20.54.182", + "targetRef": { + "kind": "Node", + "name": "ip-172-20-54-182.us-west-1.compute.internal", + "uid": "4c695b72-8c90-11e9-b605-06604569b78c" + } + }, + { + "ip": "172.20.60.118", + "targetRef": { + "kind": "Node", + "name": "ip-172-20-60-118.us-west-1.compute.internal", + "uid": "37e50e12-8c8f-11e9-b250-06f6400abec8" + } + }, + { + "ip": "172.20.74.252", + "targetRef": { + "kind": "Node", + "name": "ip-172-20-74-252.us-west-1.compute.internal", + "uid": "e65c1035-8e1b-11e9-b250-06f6400abec8" + } + }, + { + "ip": "172.20.78.16", + "targetRef": { + "kind": "Node", + "name": "ip-172-20-78-16.us-west-1.compute.internal", + "uid": "779ef345-6b97-11e9-aeec-0202bfa61b70" + } + } + ], + "ports": [ + { + "name": "http-metrics", + "port": 10255, + "protocol": "TCP" + }, + { + "name": "cadvisor", + "port": 4194, + "protocol": "TCP" + }, + { + "name": "https-metrics", + "port": 10250, + "protocol": "TCP" + } + ] + } + ] + }, + { + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "creationTimestamp": "2019-03-01T21:51:03Z", + "labels": { + "app": "helm", + "name": "tiller" + }, + "name": "tiller-deploy", + "namespace": "kube-system", + "resourceVersion": "17309596", + "selfLink": "/api/v1/namespaces/kube-system/endpoints/tiller-deploy", + "uid": "1be02498-3c6c-11e9-85d5-062f31d2c4ec" + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.115.226.66", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "name": "tiller-deploy-69458576b-27mp8", + "namespace": "kube-system", + "resourceVersion": "17309595", + "uid": "f4bff1fc-8c8f-11e9-b605-06604569b78c" + } + } + ], + "ports": [ + { + "name": "tiller", + "port": 44134, + "protocol": "TCP" + } + ] + } + ] + }, + { + "apiVersion": "v1", + "kind": "Endpoints", + "metadata": { + "creationTimestamp": "2019-06-25T20:29:04Z", + "labels": { + "k8s-app": "fluentd-sumologic" + }, + "name": "fluentd", + "namespace": "sumologic", + "resourceVersion": "19601538", + "selfLink": "/api/v1/namespaces/sumologic/endpoints/fluentd", + "uid": "e0395a93-9787-11e9-b605-06604569b78c" + }, + "subsets": [ + { + "notReadyAddresses": [ + { + "ip": "100.108.94.43", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "name": "fluentd-59d9c9656d-cg5m4", + "namespace": "sumologic", + "resourceVersion": "19601526", + "uid": "e01c6c20-9787-11e9-b605-06604569b78c" + } + }, + { + "ip": "100.114.10.110", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "name": "fluentd-59d9c9656d-5pwjg", + "namespace": "sumologic", + "resourceVersion": "19601532", + "uid": "e02af0b6-9787-11e9-b605-06604569b78c" + } + }, + { + "ip": "100.115.226.103", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "name": "fluentd-59d9c9656d-zlhjh", + "namespace": "sumologic", + "resourceVersion": "19601537", + "uid": "e02ad64d-9787-11e9-b605-06604569b78c" + } + } + ], + "ports": [ + { + "name": "prom-write", + "port": 9888, + "protocol": "TCP" + }, + { + "name": "fluent-bit", + "port": 24321, + "protocol": "TCP" + } + ] + } + ] + } + ], + "kind": "List", + "metadata": { + "resourceVersion": "", + "selfLink": "" + } +} From 1b5355029cf34b86d5036d56ed99c69c9678fd7d Mon Sep 17 00:00:00 2001 From: Sam Song Date: Thu, 27 Jun 2019 16:57:03 -0700 Subject: [PATCH 06/15] add get_pods_for_service UTs --- .../kubernetes/test_service_monitor.rb | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb diff --git a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb new file mode 100644 index 0000000000..aba394fd37 --- /dev/null +++ b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb @@ -0,0 +1,54 @@ +require 'helper' +require 'sumologic/kubernetes/service_monitor.rb' +require 'fluent/test/log' + +class ServiceMonitorTest < Test::Unit::TestCase + include SumoLogic::Kubernetes::Connector + include SumoLogic::Kubernetes::ServiceMonitor + + def setup + # runs before each test + stub_apis + connect_kubernetes + end + + def teardown + # runs after each test + end + + def log + Fluent::Test::TestLogger.new + end + + def get_test_endpoint + JSON.parse(File.read("test/resources/endpoints_list.json"))['items'] + end + + sub_test_case 'get_pods_for_service' do + test 'endpoint with no subsets' do + input = get_test_endpoint[1] + assert_equal 0, get_pods_for_service(input).length + end + + test 'endpoint with subsets and addresses with no targetRef' do + input = get_test_endpoint[0] + assert_equal 0, get_pods_for_service(input).length + end + + test 'endpoint with subsets and addresses with targetRef but type is not Pods' do + input = get_test_endpoint[4] + assert_equal 0, get_pods_for_service(input).length + end + + test 'endpoint with subsets and addresses with targetRef and type is Pods' do + input = get_test_endpoint[2] + expected = ['kube-dns-6b4f4b544c-gzl2r', 'kube-dns-6b4f4b544c-98jxv'] + assert_equal expected, get_pods_for_service(input) + end + + test 'endpoint with subsets and notReadyAddresses with targetRef and type is Pods' do + input = get_test_endpoint[6] + expected = ['fluentd-59d9c9656d-cg5m4', 'fluentd-59d9c9656d-5pwjg', 'fluentd-59d9c9656d-zlhjh'] + end + end +end From ca15fe474f311e47b31d49cd466a592aaff5903c Mon Sep 17 00:00:00 2001 From: Sam Song Date: Fri, 28 Jun 2019 11:25:56 -0700 Subject: [PATCH 07/15] add get_current_service_snapshot_resource_version UTs --- .../test/resources/endpoints_list.json | 2 +- .../kubernetes/test_service_monitor.rb | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json index 2aab2824c6..cbdef3f7b3 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json +++ b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_list.json @@ -318,7 +318,7 @@ ], "kind": "List", "metadata": { - "resourceVersion": "", + "resourceVersion": "123456789", "selfLink": "" } } diff --git a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb index aba394fd37..0cfa0002af 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb @@ -51,4 +51,26 @@ def get_test_endpoint expected = ['fluentd-59d9c9656d-cg5m4', 'fluentd-59d9c9656d-5pwjg', 'fluentd-59d9c9656d-zlhjh'] end end + + sub_test_case 'get_current_service_snapshot_resource_version' do + test 'get_current_service_snapshot_resource_version' do + resource_version = get_current_service_snapshot_resource_version + assert_equal "123456789", resource_version + + expected = { + "kube-dns-6b4f4b544c-gzl2r": ["kube-dns"], + "kube-dns-6b4f4b544c-98jxv": ["kube-dns"], + "tiller-deploy-69458576b-27mp8": ["tiller-deploy"], + "fluentd-59d9c9656d-cg5m4": ["fluentd"], + "fluentd-59d9c9656d-5pwjg": ["fluentd"], + "fluentd-59d9c9656d-zlhjh": ["fluentd"] + } + + @pods_to_services.each do |k,v| + assert_equal expected[k.to_sym], v + end + + assert_equal 6, expected.keys.length + end + end end From df1fbdbd0b210b7ecd3849a8b28a839feb216019 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 12:58:03 -0700 Subject: [PATCH 08/15] ensure service not already in hash for ADDED event --- .../sumologic/kubernetes/service_monitor.rb | 2 +- .../test/resources/endpoints_events.json | 102 ++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json diff --git a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb index 30ebe0ead9..79b2e0c7df 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb @@ -108,7 +108,7 @@ def handle_service_event(event) service = endpoint['metadata']['name'] case type when 'ADDED' - get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service} + get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service unless @pods_to_services[pod].include? service} when 'MODIFIED' desired_pods = get_pods_for_service(endpoint) @pods_to_services.each do |pod, services| diff --git a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json new file mode 100644 index 0000000000..59b06cb3f5 --- /dev/null +++ b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json @@ -0,0 +1,102 @@ +{ + "items": [ + { + "type": "ADDED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "kube-scheduler", + "namespace": "kube-system", + "selfLink": "/api/v1/namespaces/kube-system/endpoints/kube-scheduler", + "uid": "cf52af6e-3ac1-11e9-85d5-062f31d2c4ec", + "resourceVersion": "20568244", + "creationTimestamp": "2019-02-27T18:59:29Z", + "annotations": { + "control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"ip-172-20-48-207_43b66f71-6b95-11e9-b8ef-06604569b78c\",\"leaseDurationSeconds\":15,\"acquireTime\":\"2019-04-30T22:25:55Z\",\"renewTime\":\"2019-07-01T17:51:18Z\",\"leaderTransitions\":2}" + } + } + } + }, + { + "type": "ADDED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "fluentd", + "namespace": "sumologic", + "selfLink": "/api/v1/namespaces/sumologic/endpoints/fluentd", + "uid": "fadbb0e2-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568383", + "creationTimestamp": "2019-07-01T17:52:23Z", + "labels": { + "k8s-app": "fluentd-sumologic" + } + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.108.94.61", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-gvhxz", + "uid": "cb766b43-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568330" + } + }, + { + "ip": "100.114.10.126", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-events-76c68bc596-5clcp", + "uid": "cb9e9211-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568226" + } + }, + { + "ip": "100.114.10.127", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-rtp7d", + "uid": "cb805ca4-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568354" + } + }, + { + "ip": "100.115.226.113", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-nvhkg", + "uid": "cb80cba6-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568336" + } + } + ], + "ports": [ + { + "name": "prom-write", + "port": 9888, + "protocol": "TCP" + }, + { + "name": "fluent-bit", + "port": 24321, + "protocol": "TCP" + } + ] + } + ] + } + } + ] +} \ No newline at end of file From 305b931406731d6f15927a737f8266a91ab8e89c Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 12:59:16 -0700 Subject: [PATCH 09/15] fix get_current_service_snapshot_resource_version UT --- .../test/sumologic/kubernetes/test_service_monitor.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb index 0cfa0002af..f0891a62d9 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb @@ -65,12 +65,10 @@ def get_test_endpoint "fluentd-59d9c9656d-5pwjg": ["fluentd"], "fluentd-59d9c9656d-zlhjh": ["fluentd"] } - + assert_equal expected.keys.length, @pods_to_services.keys.length @pods_to_services.each do |k,v| assert_equal expected[k.to_sym], v end - - assert_equal 6, expected.keys.length end end end From baf80c079cb04ec22aca5e767f4efd068c52fb99 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 13:19:08 -0700 Subject: [PATCH 10/15] add handle_service_event UT's for ADDED case --- .../test/resources/endpoints_events.json | 80 +++++++++++++++++++ .../kubernetes/test_service_monitor.rb | 74 +++++++++++++++++ 2 files changed, 154 insertions(+) diff --git a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json index 59b06cb3f5..902c79e1e1 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json +++ b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json @@ -97,6 +97,86 @@ } ] } + }, + { + "type": "ADDED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "fluentd-2", + "namespace": "sumologic", + "selfLink": "/api/v1/namespaces/sumologic/endpoints/fluentd-2", + "uid": "fadbb0e2-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568383", + "creationTimestamp": "2019-07-01T17:52:23Z", + "labels": { + "k8s-app": "fluentd-sumologic" + } + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.108.94.61", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-gvhxz", + "uid": "cb766b43-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568330" + } + }, + { + "ip": "100.114.10.126", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-events-76c68bc596-5clcp", + "uid": "cb9e9211-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568226" + } + }, + { + "ip": "100.114.10.127", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-rtp7d", + "uid": "cb805ca4-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568354" + } + }, + { + "ip": "100.115.226.113", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-nvhkg", + "uid": "cb80cba6-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568336" + } + } + ], + "ports": [ + { + "name": "prom-write", + "port": 9888, + "protocol": "TCP" + }, + { + "name": "fluent-bit", + "port": 24321, + "protocol": "TCP" + } + ] + } + ] + } } ] } \ No newline at end of file diff --git a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb index f0891a62d9..fd47fdef8b 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb @@ -10,6 +10,7 @@ def setup # runs before each test stub_apis connect_kubernetes + @pods_to_services = Concurrent::Map.new {|h, k| h[k] = []} end def teardown @@ -24,6 +25,10 @@ def get_test_endpoint JSON.parse(File.read("test/resources/endpoints_list.json"))['items'] end + def get_test_endpoint_event + JSON.parse(File.read("test/resources/endpoints_events.json"))['items'] + end + sub_test_case 'get_pods_for_service' do test 'endpoint with no subsets' do input = get_test_endpoint[1] @@ -71,4 +76,73 @@ def get_test_endpoint end end end + + sub_test_case 'handle_service_event' do + test 'ADDED event with no pods' do + event = get_test_endpoint_event[0] + handle_service_event(event) + assert_equal 0, @pods_to_services.keys.length + end + + test 'ADDED event with new pods' do + event = get_test_endpoint_event[1] + handle_service_event(event) + + expected = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + assert_equal expected.keys.length, @pods_to_services.keys.length + @pods_to_services.each do |k,v| + assert_equal expected[k.to_sym], v + end + end + + test 'ADDED event with existing service on existing pods' do # shouldn't happen but check anyway + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + current_state.each do |k,v| + @pods_to_services[k.to_s] = v + end + + event = get_test_endpoint_event[1] + handle_service_event(event) + expected = current_state + assert_equal expected.keys.length, @pods_to_services.keys.length + @pods_to_services.each do |k,v| + assert_equal expected[k.to_sym], v + end + end + + test 'ADDED event with new service on existing pods' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + current_state.each do |k,v| + @pods_to_services[k.to_s] = v + end + + event = get_test_endpoint_event[2] + handle_service_event(event) + expected = { + "fluentd-59d9c9656d-gvhxz": ["fluentd", "fluentd-2"], + "fluentd-59d9c9656d-rtp7d": ["fluentd", "fluentd-2"], + "fluentd-59d9c9656d-nvhkg": ["fluentd", "fluentd-2"], + "fluentd-events-76c68bc596-5clcp": ["fluentd", "fluentd-2"] + } + assert_equal expected.keys.length, @pods_to_services.keys.length + @pods_to_services.each do |k,v| + assert_equal expected[k.to_sym], v + end + end + end end From 68b73c6964e536aaaf449edbec05fe4f2ba58c65 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 13:33:29 -0700 Subject: [PATCH 11/15] refactor populate and assert state of hash --- .../kubernetes/test_service_monitor.rb | 41 +++++++++---------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb index fd47fdef8b..b012f45712 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb @@ -29,6 +29,19 @@ def get_test_endpoint_event JSON.parse(File.read("test/resources/endpoints_events.json"))['items'] end + def populate_current_state(current_state) + current_state.each do |k,v| + @pods_to_services[k.to_s] = v + end + end + + def assert_expected_state(expected) + assert_equal expected.keys.length, @pods_to_services.keys.length + @pods_to_services.each do |k,v| + assert_equal expected[k.to_sym], v + end + end + sub_test_case 'get_pods_for_service' do test 'endpoint with no subsets' do input = get_test_endpoint[1] @@ -70,10 +83,7 @@ def get_test_endpoint_event "fluentd-59d9c9656d-5pwjg": ["fluentd"], "fluentd-59d9c9656d-zlhjh": ["fluentd"] } - assert_equal expected.keys.length, @pods_to_services.keys.length - @pods_to_services.each do |k,v| - assert_equal expected[k.to_sym], v - end + assert_expected_state(expected) end end @@ -94,10 +104,7 @@ def get_test_endpoint_event "fluentd-59d9c9656d-nvhkg": ["fluentd"], "fluentd-events-76c68bc596-5clcp": ["fluentd"] } - assert_equal expected.keys.length, @pods_to_services.keys.length - @pods_to_services.each do |k,v| - assert_equal expected[k.to_sym], v - end + assert_expected_state(expected) end test 'ADDED event with existing service on existing pods' do # shouldn't happen but check anyway @@ -107,17 +114,12 @@ def get_test_endpoint_event "fluentd-59d9c9656d-nvhkg": ["fluentd"], "fluentd-events-76c68bc596-5clcp": ["fluentd"] } - current_state.each do |k,v| - @pods_to_services[k.to_s] = v - end + populate_current_state(current_state) event = get_test_endpoint_event[1] handle_service_event(event) expected = current_state - assert_equal expected.keys.length, @pods_to_services.keys.length - @pods_to_services.each do |k,v| - assert_equal expected[k.to_sym], v - end + assert_expected_state(expected) end test 'ADDED event with new service on existing pods' do @@ -127,9 +129,7 @@ def get_test_endpoint_event "fluentd-59d9c9656d-nvhkg": ["fluentd"], "fluentd-events-76c68bc596-5clcp": ["fluentd"] } - current_state.each do |k,v| - @pods_to_services[k.to_s] = v - end + populate_current_state(current_state) event = get_test_endpoint_event[2] handle_service_event(event) @@ -139,10 +139,7 @@ def get_test_endpoint_event "fluentd-59d9c9656d-nvhkg": ["fluentd", "fluentd-2"], "fluentd-events-76c68bc596-5clcp": ["fluentd", "fluentd-2"] } - assert_equal expected.keys.length, @pods_to_services.keys.length - @pods_to_services.each do |k,v| - assert_equal expected[k.to_sym], v - end + assert_expected_state(expected) end end end From 8017fe15e6d0266dc80b8b49569ec3508024a5ea Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 13:48:45 -0700 Subject: [PATCH 12/15] fix MODIFIED case to delete pod if services is empty --- .../lib/sumologic/kubernetes/service_monitor.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb index 79b2e0c7df..9226d05fd9 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb @@ -111,12 +111,13 @@ def handle_service_event(event) get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service unless @pods_to_services[pod].include? service} when 'MODIFIED' desired_pods = get_pods_for_service(endpoint) + desired_pods.each {|pod| @pods_to_services[pod] |= [service]} @pods_to_services.each do |pod, services| if services.include? service services.delete service unless desired_pods.include? pod end + @pods_to_services.delete pod if services.length == 0 end - desired_pods.each {|pod| @pods_to_services[pod] |= [service]} when 'DELETED' get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod].delete service} else From 12799cddd2ef356f431f0e8269b94fe5c3880350 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 13:49:11 -0700 Subject: [PATCH 13/15] add handle_service_event UT for MODIFIED case --- .../test/resources/endpoints_events.json | 284 ++++++++++++++++++ .../kubernetes/test_service_monitor.rb | 116 +++++++ 2 files changed, 400 insertions(+) diff --git a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json index 902c79e1e1..424a079208 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json +++ b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json @@ -177,6 +177,290 @@ } ] } + }, + { + "type": "MODIFIED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "kube-scheduler", + "namespace": "kube-system", + "selfLink": "/api/v1/namespaces/kube-system/endpoints/kube-scheduler", + "uid": "cf52af6e-3ac1-11e9-85d5-062f31d2c4ec", + "resourceVersion": "20568244", + "creationTimestamp": "2019-02-27T18:59:29Z", + "annotations": { + "control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"ip-172-20-48-207_43b66f71-6b95-11e9-b8ef-06604569b78c\",\"leaseDurationSeconds\":15,\"acquireTime\":\"2019-04-30T22:25:55Z\",\"renewTime\":\"2019-07-01T17:51:18Z\",\"leaderTransitions\":2}" + } + } + } + }, + { + "type": "MODIFIED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "fluentd", + "namespace": "sumologic", + "selfLink": "/api/v1/namespaces/sumologic/endpoints/fluentd", + "uid": "cbc1b62d-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568252", + "creationTimestamp": "2019-07-01T17:51:04Z", + "labels": { + "k8s-app": "fluentd-sumologic" + } + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.114.10.126", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-events-76c68bc596-5clcp", + "uid": "cb9e9211-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568226" + } + } + ], + "notReadyAddresses": [ + { + "ip": "100.108.94.61", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-gvhxz", + "uid": "cb766b43-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568230" + } + }, + { + "ip": "100.114.10.127", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-rtp7d", + "uid": "cb805ca4-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568251" + } + }, + { + "ip": "100.115.226.113", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-nvhkg", + "uid": "cb80cba6-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568241" + } + } + ], + "ports": [ + { + "name": "prom-write", + "port": 9888, + "protocol": "TCP" + }, + { + "name": "fluent-bit", + "port": 24321, + "protocol": "TCP" + } + ] + } + ] + } + }, + { + "type": "MODIFIED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "fluentd", + "namespace": "sumologic", + "selfLink": "/api/v1/namespaces/sumologic/endpoints/fluentd", + "uid": "cbc1b62d-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568469", + "creationTimestamp": "2019-07-01T17:51:04Z", + "labels": { + "k8s-app": "fluentd-sumologic" + } + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.108.94.61", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-gvhxz", + "uid": "cb766b43-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568330" + } + }, + { + "ip": "100.114.10.126", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-events-76c68bc596-5clcp", + "uid": "cb9e9211-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568226" + } + }, + { + "ip": "100.114.10.127", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-rtp7d", + "uid": "cb805ca4-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568354" + } + }, + { + "ip": "100.115.226.113", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-nvhkg", + "uid": "cb80cba6-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568336" + } + } + ], + "notReadyAddresses": [ + { + "ip": "100.108.94.62", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-z8hxn", + "uid": "099a7e3b-9c29-11e9-b605-06604569b78c", + "resourceVersion": "20568465" + } + }, + { + "ip": "100.108.94.63", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-drqph", + "uid": "0996ed8f-9c29-11e9-b605-06604569b78c", + "resourceVersion": "20568468" + } + }, + { + "ip": "100.115.226.114", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-kfkkj", + "uid": "0999cd47-9c29-11e9-b605-06604569b78c", + "resourceVersion": "20568461" + } + } + ], + "ports": [ + { + "name": "prom-write", + "port": 9888, + "protocol": "TCP" + }, + { + "name": "fluent-bit", + "port": 24321, + "protocol": "TCP" + } + ] + } + ] + } + }, + { + "type": "MODIFIED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "fluentd", + "namespace": "sumologic", + "selfLink": "/api/v1/namespaces/sumologic/endpoints/fluentd", + "uid": "cbc1b62d-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568552", + "creationTimestamp": "2019-07-01T17:51:04Z", + "labels": { + "k8s-app": "fluentd-sumologic" + } + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.108.94.61", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-gvhxz", + "uid": "cb766b43-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568330" + } + }, + { + "ip": "100.114.10.126", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-events-76c68bc596-5clcp", + "uid": "cb9e9211-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568226" + } + }, + { + "ip": "100.115.226.113", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-nvhkg", + "uid": "cb80cba6-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568336" + } + } + ], + "ports": [ + { + "name": "prom-write", + "port": 9888, + "protocol": "TCP" + }, + { + "name": "fluent-bit", + "port": 24321, + "protocol": "TCP" + } + ] + } + ] + } } ] } \ No newline at end of file diff --git a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb index b012f45712..56a1efc51b 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb @@ -141,5 +141,121 @@ def assert_expected_state(expected) } assert_expected_state(expected) end + + test 'MODIFIED event with no pods, empty hash' do + event = get_test_endpoint_event[3] + handle_service_event(event) + assert_equal 0, @pods_to_services.keys.length + end + + test 'MODIFIED event with no pods, hash populated' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + populate_current_state(current_state) + + event = get_test_endpoint_event[3] + handle_service_event(event) + expected = current_state + assert_expected_state(expected) + end + + test 'MODIFIED event with ready and not ready pods that are already in hash' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + populate_current_state(current_state) + + event = get_test_endpoint_event[4] + handle_service_event(event) + expected = current_state + assert_expected_state(expected) + end + + test 'MODIFIED event with new ready and not ready pods' do + event = get_test_endpoint_event[4] + handle_service_event(event) + expected = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + assert_expected_state(expected) + end + + test 'MODIFIED event with increased replicas' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + populate_current_state(current_state) + + event = get_test_endpoint_event[5] + handle_service_event(event) + expected = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-59d9c9656d-z8hxn": ["fluentd"], + "fluentd-59d9c9656d-drqph": ["fluentd"], + "fluentd-59d9c9656d-kfkkj": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + assert_expected_state(expected) + end + + test 'MODIFIED event with increased replicas multiple service' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd-2"], + "fluentd-59d9c9656d-rtp7d": ["fluentd-2"], + "fluentd-59d9c9656d-nvhkg": ["fluentd-2"], + "fluentd-events-76c68bc596-5clcp": ["fluentd-2"] + } + populate_current_state(current_state) + + event = get_test_endpoint_event[5] + handle_service_event(event) + expected = { + "fluentd-59d9c9656d-gvhxz": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-z8hxn": ["fluentd"], + "fluentd-59d9c9656d-drqph": ["fluentd"], + "fluentd-59d9c9656d-kfkkj": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd-2", "fluentd"] + } + assert_expected_state(expected) + end + + test 'MODIFIED event with deleted replicas' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-59d9c9656d-z8hxn": ["fluentd"], + "fluentd-59d9c9656d-drqph": ["fluentd"], + "fluentd-59d9c9656d-kfkkj": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + populate_current_state(current_state) + + event = get_test_endpoint_event[6] + handle_service_event(event) + expected = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + assert_expected_state(expected) + end end end From 8f5d54a38f6be21f4e72dfb5319e90e146a916d7 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 13:56:21 -0700 Subject: [PATCH 14/15] fix DELETED case, add handle_service_event UT for DELETED case --- .../sumologic/kubernetes/service_monitor.rb | 5 +- .../test/resources/endpoints_events.json | 115 ++++++++++++++++++ .../kubernetes/test_service_monitor.rb | 45 ++++++- 3 files changed, 163 insertions(+), 2 deletions(-) diff --git a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb index 9226d05fd9..d6377f76a8 100644 --- a/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/lib/sumologic/kubernetes/service_monitor.rb @@ -119,7 +119,10 @@ def handle_service_event(event) @pods_to_services.delete pod if services.length == 0 end when 'DELETED' - get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod].delete service} + get_pods_for_service(endpoint).each do |pod| + @pods_to_services[pod].delete service + @pods_to_services.delete pod if @pods_to_services[pod].length == 0 + end else log.error "Unknown type for watch endpoint event #{type}" end diff --git a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json index 424a079208..d3f13b82a2 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json +++ b/fluent-plugin-enhance-k8s-metadata/test/resources/endpoints_events.json @@ -461,6 +461,121 @@ } ] } + }, + { + "type": "DELETED", + "object": { + "kind": "Endpoints", + "apiVersion": "v1", + "metadata": { + "name": "fluentd-2", + "namespace": "sumologic", + "selfLink": "/api/v1/namespaces/sumologic/endpoints/fluentd-2", + "uid": "fadbb0e2-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568514", + "creationTimestamp": "2019-07-01T17:52:23Z", + "labels": { + "k8s-app": "fluentd-sumologic" + } + }, + "subsets": [ + { + "addresses": [ + { + "ip": "100.108.94.61", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-gvhxz", + "uid": "cb766b43-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568330" + } + }, + { + "ip": "100.114.10.126", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-events-76c68bc596-5clcp", + "uid": "cb9e9211-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568226" + } + }, + { + "ip": "100.114.10.127", + "nodeName": "ip-172-20-54-182.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-rtp7d", + "uid": "cb805ca4-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568354" + } + }, + { + "ip": "100.115.226.113", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-nvhkg", + "uid": "cb80cba6-9c28-11e9-b605-06604569b78c", + "resourceVersion": "20568336" + } + } + ], + "notReadyAddresses": [ + { + "ip": "100.108.94.62", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-z8hxn", + "uid": "099a7e3b-9c29-11e9-b605-06604569b78c", + "resourceVersion": "20568465" + } + }, + { + "ip": "100.108.94.63", + "nodeName": "ip-172-20-74-252.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-drqph", + "uid": "0996ed8f-9c29-11e9-b605-06604569b78c", + "resourceVersion": "20568468" + } + }, + { + "ip": "100.115.226.114", + "nodeName": "ip-172-20-60-118.us-west-1.compute.internal", + "targetRef": { + "kind": "Pod", + "namespace": "sumologic", + "name": "fluentd-59d9c9656d-kfkkj", + "uid": "0999cd47-9c29-11e9-b605-06604569b78c", + "resourceVersion": "20568461" + } + } + ], + "ports": [ + { + "name": "prom-write", + "port": 9888, + "protocol": "TCP" + }, + { + "name": "fluent-bit", + "port": 24321, + "protocol": "TCP" + } + ] + } + ] + } } ] } \ No newline at end of file diff --git a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb index 56a1efc51b..7253369f4f 100644 --- a/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb +++ b/fluent-plugin-enhance-k8s-metadata/test/sumologic/kubernetes/test_service_monitor.rb @@ -213,7 +213,7 @@ def assert_expected_state(expected) assert_expected_state(expected) end - test 'MODIFIED event with increased replicas multiple service' do + test 'MODIFIED event with increased replicas new service' do current_state = { "fluentd-59d9c9656d-gvhxz": ["fluentd-2"], "fluentd-59d9c9656d-rtp7d": ["fluentd-2"], @@ -257,5 +257,48 @@ def assert_expected_state(expected) } assert_expected_state(expected) end + + test 'DELETED event only one service' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd-2"], + "fluentd-59d9c9656d-rtp7d": ["fluentd-2"], + "fluentd-59d9c9656d-nvhkg": ["fluentd-2"], + "fluentd-59d9c9656d-z8hxn": ["fluentd-2"], + "fluentd-59d9c9656d-drqph": ["fluentd-2"], + "fluentd-59d9c9656d-kfkkj": ["fluentd-2"], + "fluentd-events-76c68bc596-5clcp": ["fluentd-2"] + } + populate_current_state(current_state) + + event = get_test_endpoint_event[7] + handle_service_event(event) + assert_equal 0, @pods_to_services.keys.length + end + + test 'DELETED event 2nd service' do + current_state = { + "fluentd-59d9c9656d-gvhxz": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-z8hxn": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-drqph": ["fluentd-2", "fluentd"], + "fluentd-59d9c9656d-kfkkj": ["fluentd-2", "fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd-2", "fluentd"] + } + populate_current_state(current_state) + + event = get_test_endpoint_event[7] + handle_service_event(event) + expected = { + "fluentd-59d9c9656d-gvhxz": ["fluentd"], + "fluentd-59d9c9656d-rtp7d": ["fluentd"], + "fluentd-59d9c9656d-nvhkg": ["fluentd"], + "fluentd-59d9c9656d-z8hxn": ["fluentd"], + "fluentd-59d9c9656d-drqph": ["fluentd"], + "fluentd-59d9c9656d-kfkkj": ["fluentd"], + "fluentd-events-76c68bc596-5clcp": ["fluentd"] + } + assert_expected_state(expected) + end end end From 35002edeff5f9ea7ff306579ba98a5f04e93651e Mon Sep 17 00:00:00 2001 From: Sam Song Date: Mon, 1 Jul 2019 16:44:29 -0700 Subject: [PATCH 15/15] remove dummy plugin from docker test --- deploy/docker/fluent.conf | 3 --- 1 file changed, 3 deletions(-) diff --git a/deploy/docker/fluent.conf b/deploy/docker/fluent.conf index e87abf16fd..52c6ed2a27 100644 --- a/deploy/docker/fluent.conf +++ b/deploy/docker/fluent.conf @@ -20,9 +20,6 @@ @type prometheus_format - - @type enhance_k8s_metadata - @type kubernetes_metadata