-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add service metadata monitoring #57
Changes from all commits
bc86150
8ed34f5
b60ae6b
66cdd08
b96f077
1b53550
ca15fe4
df1fbdb
305b931
baf80c0
68b73c6
8017fe1
12799cd
8f5d54a
4e03bf8
35002ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ rules: | |
- configmaps | ||
- daemonsets | ||
- deployments | ||
- endpoints | ||
- events | ||
- namespaces | ||
- nodes | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
require 'concurrent' | ||
|
||
module SumoLogic | ||
module Kubernetes | ||
# module for watching changes to services | ||
module ServiceMonitor | ||
require_relative 'connector.rb' | ||
|
||
def start_service_monitor | ||
log.info "Starting watching for service changes" | ||
|
||
@watch_service_interval_seconds = 300 | ||
|
||
last_recreated = Time.now.to_i | ||
log.debug "last_recreated initialized to #{last_recreated}" | ||
|
||
while true do | ||
# Periodically restart watcher connection by checking if enough time has passed since | ||
# last time watcher thread was recreated or if the watcher thread has been stopped. | ||
now = Time.now.to_i | ||
watcher_exists = Thread.list.select {|thread| thread.object_id == @watcher_id && thread.alive?}.count > 0 | ||
if now - last_recreated >= @watch_service_interval_seconds || !watcher_exists | ||
|
||
log.debug "Recreating service watcher thread" | ||
@watch_stream.finish if @watch_stream | ||
|
||
start_service_watcher_thread | ||
last_recreated = now | ||
log.debug "last_recreated updated to #{last_recreated}" | ||
end | ||
sleep(10) | ||
end | ||
end | ||
|
||
def start_service_watcher_thread | ||
log.debug "Starting service endpoints watcher thread" | ||
params = Hash.new | ||
params[:as] = :raw | ||
params[:resource_version] = get_current_service_snapshot_resource_version | ||
params[:timeout_seconds] = @watch_service_interval_seconds + 60 | ||
|
||
@watcher = @clients['v1'].public_send("watch_endpoints", params).tap do |watcher| | ||
thread_create(:"watch_endpoints") do | ||
@watch_stream = watcher | ||
@watcher_id = Thread.current.object_id | ||
log.debug "New thread to watch service endpoints #{@watcher_id} from resource version #{params[:resource_version]}" | ||
|
||
watcher.each do |event| | ||
begin | ||
event = JSON.parse(event) | ||
handle_service_event(event) | ||
rescue => e | ||
log.error "Got exception #{e} parsing entity #{entity}. Skipping." | ||
end | ||
end | ||
log.info "Closing watch stream" | ||
end | ||
end | ||
end | ||
|
||
def get_current_service_snapshot_resource_version | ||
log.debug "Getting current service snapshot" | ||
begin | ||
params = Hash.new | ||
params[:as] = :raw | ||
response = @clients['v1'].public_send "get_endpoints", params | ||
result = JSON.parse(response) | ||
new_snapshot_pods_to_services = Concurrent::Map.new {|h, k| h[k] = []} | ||
|
||
result['items'].each do |endpoint| | ||
service = endpoint['metadata']['name'] | ||
get_pods_for_service(endpoint).each {|pod| new_snapshot_pods_to_services[pod] << service} | ||
end | ||
|
||
@pods_to_services = new_snapshot_pods_to_services | ||
result['metadata']['resourceVersion'] | ||
rescue => e | ||
log.error "Got exception #{e} getting current service snapshot and corresponding resource version." | ||
0 | ||
end | ||
end | ||
|
||
def get_pods_for_service(endpoint) | ||
pods = [] | ||
if endpoint.key? 'subsets' | ||
endpoint['subsets'].each do |subset| | ||
['addresses', 'notReadyAddresses'].each do |key| | ||
if subset.key? key | ||
subset[key].each do |object| | ||
if object.key? 'targetRef' | ||
if object['targetRef']['kind'] == 'Pod' | ||
pod = object['targetRef']['name'] | ||
log.debug "Found Pod #{pod} for Service #{endpoint['metadata']['name']}" | ||
pods << pod | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
pods | ||
end | ||
|
||
def handle_service_event(event) | ||
type = event['type'] | ||
endpoint = event['object'] | ||
service = endpoint['metadata']['name'] | ||
case type | ||
when 'ADDED' | ||
get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service unless @pods_to_services[pod].include? service} | ||
when 'MODIFIED' | ||
desired_pods = get_pods_for_service(endpoint) | ||
desired_pods.each {|pod| @pods_to_services[pod] |= [service]} | ||
@pods_to_services.each do |pod, services| | ||
if services.include? service | ||
services.delete service unless desired_pods.include? pod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does deleting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I believe |
||
end | ||
@pods_to_services.delete pod if services.length == 0 | ||
end | ||
when 'DELETED' | ||
get_pods_for_service(endpoint).each do |pod| | ||
@pods_to_services[pod].delete service | ||
@pods_to_services.delete pod if @pods_to_services[pod].length == 0 | ||
end | ||
else | ||
log.error "Unknown type for watch endpoint event #{type}" | ||
end | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if I'm following the
MODIFIED
case, can you give some explanation?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the
MODIFIED
case was the hardest part. There's two cases to cover:@pods_to_services[pod]
unless it already exists.MODIFIED
event. Then if there are no services for that pod in the map, we can remove that key from the map.I offhandedly asked Chris about it at some point, since the "looking through the entire map for mentions of the service" part really irked me... but he seemed to think it wasn't that big of an issue to be worth "over-optimizing prematurely", since this case likely will not happen often. I'm open to any suggestions 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the detailed explanation! I'm fine with current approach