Skip to content

Commit

Permalink
Merge pull request #424 from rmoe/reflector-improvements
Browse files Browse the repository at this point in the history
Breaking change / performance: don't make kubernetes-client deserialize k8s events into objects
  • Loading branch information
yuvipanda authored Sep 2, 2020
2 parents d2ee601 + 680a22c commit b209ce1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 40 deletions.
15 changes: 10 additions & 5 deletions kubespawner/reflector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# asyncio Futures cannot be used across threads
from concurrent.futures import Future

from functools import partial
import json
import time
import threading

Expand Down Expand Up @@ -155,11 +157,13 @@ def _list_and_update(self):
label_selector=self.label_selector,
field_selector=self.field_selector,
_request_timeout=self.request_timeout,
_preload_content=False,
)
# This is an atomic operation on the dictionary!
self.resources = {p.metadata.name: p for p in initial_resources.items}
initial_resources = json.loads(initial_resources.read())
self.resources = {p["metadata"]["name"]: p for p in initial_resources["items"]}
# return the resource version so we can hook up a watch
return initial_resources.metadata.resource_version
return initial_resources["metadata"]["resourceVersion"]

def _watch_and_update(self):
"""
Expand Down Expand Up @@ -219,10 +223,11 @@ def _watch_and_update(self):
if self.timeout_seconds:
# set watch timeout
watch_args['timeout_seconds'] = self.timeout_seconds
method = partial(getattr(self.api, self.list_method_name), _preload_content=False)
# in case of timeout_seconds, the w.stream just exits (no exception thrown)
# -> we stop the watcher and start a new one
for watch_event in w.stream(
getattr(self.api, self.list_method_name),
method,
**watch_args
):
# Remember that these events are k8s api related WatchEvents
Expand All @@ -236,10 +241,10 @@ def _watch_and_update(self):
resource = watch_event['object']
if watch_event['type'] == 'DELETED':
# This is an atomic delete operation on the dictionary!
self.resources.pop(resource.metadata.name, None)
self.resources.pop(resource["metadata"]["name"], None)
else:
# This is an atomic operation on the dictionary!
self.resources[resource.metadata.name] = resource
self.resources[resource["metadata"]["name"]] = resource
if self._stop_event.is_set():
self.log.info("%s watcher stopped", self.kind)
break
Expand Down
64 changes: 29 additions & 35 deletions kubespawner/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class PodReflector(NamespacedResourceReflector):
@property
def pods(self):
"""
A dictionary of the python kubernetes client's representation of pods
for the namespace. The dictionary keys are the pod ids and the values
are the actual pod resource representations.
A dictionary of pods for the namespace as returned by the Kubernetes
API. The dictionary keys are the pod ids and the values are
dictionaries of the actual pod resource values.
ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.16/#pod-v1-core
"""
Expand All @@ -85,7 +85,7 @@ class EventReflector(NamespacedResourceReflector):
@property
def events(self):
"""
Returns list of the python kubernetes client's representation of k8s
Returns list of dictionaries representing the k8s
events within the namespace, sorted by the latest event.
ref: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.16/#event-v1-core
Expand All @@ -98,12 +98,13 @@ def events(self):
# suddenly refreshes itself entirely. We should not assume a call to
# this dictionary's values will result in a consistently ordered list,
# so we sort it to get it somewhat more structured.
# - We either seem to get only event.last_timestamp or event.event_time,
# both fields serve the same role but the former is a low resolution
# timestamp without and the other is a higher resolution timestamp.
# - We either seem to get only event['lastTimestamp'] or
# event['eventTime'], both fields serve the same role but the former
# is a low resolution timestamp without and the other is a higher
# resolution timestamp.
return sorted(
self.resources.values(),
key=lambda event: event.last_timestamp or event.event_time,
key=lambda event: event["lastTimestamp"] or event["eventTime"],
)


Expand Down Expand Up @@ -1499,10 +1500,10 @@ def is_pod_running(self, pod):
# FIXME: Validate if this is really the best way
is_running = (
pod is not None and
pod.status.phase == 'Running' and
pod.status.pod_ip is not None and
pod.metadata.deletion_timestamp is None and
all([cs.ready for cs in pod.status.container_statuses])
pod["status"]["phase"] == 'Running' and
pod["status"]["podIP"] is not None and
"deletionTimestamp" not in pod["metadata"] and
all([cs["ready"] for cs in pod["status"]["containerStatuses"]])
)
return is_running

Expand Down Expand Up @@ -1566,20 +1567,20 @@ def poll(self):
yield self.pod_reflector.first_load_future
data = self.pod_reflector.pods.get(self.pod_name, None)
if data is not None:
if data.status.phase == 'Pending':
if data["status"]["phase"] == 'Pending':
return None
ctr_stat = data.status.container_statuses
ctr_stat = data["status"]["containerStatuses"]
if ctr_stat is None: # No status, no container (we hope)
# This seems to happen when a pod is idle-culled.
return 1
for c in ctr_stat:
# return exit code if notebook container has terminated
if c.name == 'notebook':
if c.state.terminated:
if c["name"] == 'notebook':
if "terminated" in c["state"]:
# call self.stop to delete the pod
if self.delete_stopped_pods:
yield self.stop(now=True)
return c.state.terminated.exit_code
return c["state"]["terminated"]["exitCode"]
break
# None means pod is running or starting up
return None
Expand All @@ -1603,11 +1604,11 @@ def events(self):

events = []
for event in self.event_reflector.events:
if event.involved_object.name != self.pod_name:
if event["involvedObject"]["name"] != self.pod_name:
# only consider events for my pod name
continue

if self._last_event and event.metadata.uid == self._last_event:
if self._last_event and event["metadata"]["uid"]== self._last_event:
# saw last_event marker, ignore any previous events
# and only consider future events
# only include events *after* our _last_event marker
Expand Down Expand Up @@ -1649,7 +1650,7 @@ async def progress(self):
# pod_id may change if a previous pod is being stopped
# before starting a new one
# use the uid of the latest event to identify 'current'
pod_id = events[-1].involved_object.uid
pod_id = events[-1]["involvedObject"]["uid"]
for i in range(next_event, len_events):
event = events[i]
# move the progress bar.
Expand All @@ -1659,20 +1660,13 @@ async def progress(self):
# 30 50 63 72 78 82 84 86 87 88 88 89
progress += (90 - progress) / 3

# V1Event isn't serializable, and neither is the datetime
# objects within it, and we need what we pass back to be
# serializable to it can be sent back from JupyterHub to
# a browser wanting to display progress.
serializable_event = json.loads(
json.dumps(event.to_dict(), default=datetime.isoformat)
)
await yield_({
'progress': int(progress),
'raw_event': serializable_event,
'raw_event': event,
'message': "%s [%s] %s" % (
event.last_timestamp or event.event_time,
event.type,
event.message,
event["lastTimestamp"] or event["eventTime"],
event["type"],
event["message"],
)
})
next_event = len_events
Expand Down Expand Up @@ -1777,7 +1771,7 @@ def _start(self):
# pod if it's part of this spawn process
events = self.events
if events:
self._last_event = events[-1].metadata.uid
self._last_event = events[-1]["metadata"]["uid"]

if self.storage_pvc_ensure:
# Try and create the pvc. If it succeeds we are good. If
Expand Down Expand Up @@ -1871,19 +1865,19 @@ def _start(self):
raise

pod = self.pod_reflector.pods[self.pod_name]
self.pod_id = pod.metadata.uid
self.pod_id = pod["metadata"]["uid"]
if self.event_reflector:
self.log.debug(
'pod %s events before launch: %s',
self.pod_name,
"\n".join(
[
"%s [%s] %s" % (event.last_timestamp or event.event_time, event.type, event.message)
"%s [%s] %s" % (event["lastTimestamp"] or event["eventTime"], event["type"], event["message"])
for event in self.events
]
),
)
return (pod.status.pod_ip, self.port)
return (pod["status"]["podIP"], self.port)

@gen.coroutine
def stop(self, now=False):
Expand Down

0 comments on commit b209ce1

Please sign in to comment.