Skip to content

Commit

Permalink
Fix update handler, stablize scaling algo
Browse files Browse the repository at this point in the history
  • Loading branch information
gautamp8 committed Jul 8, 2020
1 parent 93b47b6 commit de584c9
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 52 deletions.
17 changes: 17 additions & 0 deletions constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Native API Objects
DEPLOYMENT_KIND = 'Deployment'
SERVICE_KIND = 'Service'

# Celery Worker Constants
WORKER_TYPE = 'worker'


# Flower Constants
FLOWER_TYPE = 'flower'


# Hander Status
STATUS_CREATED = 'CREATED'
STATUS_SUCCESS = 'SUCCESS'
STATUS_UPDATED = 'UPDATED'
STATUS_PATCHED = 'PATCHED'
73 changes: 45 additions & 28 deletions handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import kopf
import kubernetes
import requests
import constants
from math import ceil
from collections import namedtuple

Expand All @@ -12,20 +13,15 @@
)
from update_utils import (
update_all_deployments,
update_celery_deployment,
update_worker_deployment,
update_flower_deployment
)


@kopf.on.create('celeryproject.org', 'v1alpha1', 'celery')
def create_fn(spec, name, namespace, logger, **kwargs):
"""
TODO -
1. Validate the spec for incoming obj
2. Create a config-map for celery
3. Instantiate a celery Deployment with the specified parameters
4. Define and expolre a flower Service to keep a watch on those metrics
5. Scale/Downscale on the basis of task queue length
Celery custom resource creation handler
"""
children_count = 0

Expand Down Expand Up @@ -56,53 +52,68 @@ def create_fn(spec, name, namespace, logger, **kwargs):
children = [
{
'name': worker_deployment.metadata.name,
'replicas': worker_deployment.spec.replicas
'replicas': worker_deployment.spec.replicas,
'kind': constants.DEPLOYMENT_KIND,
'type': constants.WORKER_TYPE
},
{
'name': flower_deployment.metadata.name,
'replicas': flower_deployment.spec.replicas
'replicas': flower_deployment.spec.replicas,
'kind': constants.DEPLOYMENT_KIND,
'type': constants.FLOWER_TYPE
},
{
'name': flower_svc.metadata.name,
'spec': flower_svc.spec.to_dict()
'spec': flower_svc.spec.to_dict(),
'kind': constants.SERVICE_KIND,
'type': constants.FLOWER_TYPE
}
]

return {
'children': children,
'children_count': len(children),
'status': "CREATED"
'status': constants.STATUS_CREATED
}


@kopf.on.update('celeryproject.org', 'v1alpha1', 'celery')
def update_fn(spec, status, namespace, logger, **kwargs):
# TODO - app name still cannot be updated(Fix that)
diff = kwargs.get('diff')
modified_spec = get_modified_spec_object(diff)

api = kubernetes.client.CoreV1Api()
apps_api_instance = kubernetes.client.AppsV1Api()
result = status.get('update_fn') or status.get('create_fn')

if modified_spec.common_spec:
# if common spec was updated, need to update all deployments
return update_all_deployments(
api, apps_api_instance, spec, status, namespace
)
else:
result = {}
if modified_spec.worker_spec:
result.update({
'worker_deployment': update_celery_deployment(
apps_api_instance, spec, status, namespace
)
# if worker spec was updated, just update worker deployments
worker_deployment = update_worker_deployment(
apps_api_instance, spec, status, namespace
)
deployment_status = next(child for child in result.get('children') if child['type'] == constants.WORKER_TYPE) # NOQA

deployment_status.update({
'name': worker_deployment.metadata.name,
'replicas': worker_deployment.spec.replicas
})

if modified_spec.flower_spec:
result.update({
'flower_deployment': update_flower_deployment(
apps_api_instance, spec, status, namespace
)
# if flower spec was updated, just update flower deployments
flower_deployment = update_flower_deployment(
apps_api_instance, spec, status, namespace
)
deployment_status = next(child for child in result.get('children') if child['type'] == constants.FLOWER_TYPE) # NOQA

deployment_status.update({
'name': flower_deployment.metadata.name,
'replicas': flower_deployment.spec.replicas
})
return result

Expand Down Expand Up @@ -149,9 +160,9 @@ def check_flower_label(value, spec, **_):


@kopf.timer('celeryproject.org', 'v1alpha1', 'celery',
initial_delay=5, interval=10, idle=10)
initial_delay=50000, interval=100000, idle=10)
def message_queue_length(spec, status, **kwargs):
flower_svc_host = "http://192.168.64.2:31737"
flower_svc_host = "http://192.168.64.2:32289"
url = f"{flower_svc_host}/api/queues/length"
response = requests.get(url=url)
if response.status_code == 200:
Expand All @@ -170,11 +181,11 @@ def get_current_replicas(child_name, status):


def get_current_queue_len(child_name, status):
for queue in status.get('message_queue_length'):
for queue in status.get('message_queue_length', []):
if queue.get('name') == child_name:
return queue.get('messages')

return None
return 0


@kopf.on.field('celeryproject.org', 'v1alpha1', 'celery',
Expand All @@ -192,9 +203,15 @@ def horizontal_autoscale(spec, status, namespace, **kwargs):
queue_name = spec['workerSpec']['queues']
current_queue_length = get_current_queue_len(queue_name, status)
avg_queue_length = scaling_target['metrics'][0].get('target').get('averageValue')
updated_num_of_replicas = ceil(
current_replicas * (current_queue_length / avg_queue_length)
) or min_replicas
updated_num_of_replicas = min(
max(
ceil(
current_replicas * (current_queue_length / avg_queue_length)
),
min_replicas
),
max_replicas
)

patch_body = {
"spec": {
Expand Down
103 changes: 79 additions & 24 deletions update_utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,77 @@
import constants
from models.worker_spec import (
args_list_from_spec_params
)


def update_all_deployments(api, apps_api_instance, spec, status, namespace):
worker_deployment = update_worker_deployment(
apps_api_instance, spec, status, namespace
)

flower_deployment = update_flower_deployment(
apps_api_instance, spec, status, namespace
)

flower_svc = update_flower_service(
api, spec, status, namespace
)

children = [
{
'name': worker_deployment.metadata.name,
'replicas': worker_deployment.spec.replicas,
'kind': constants.DEPLOYMENT_KIND,
'type': constants.WORKER_TYPE
},
{
'name': flower_deployment.metadata.name,
'replicas': flower_deployment.spec.replicas,
'kind': constants.DEPLOYMENT_KIND,
'type': constants.FLOWER_TYPE
},
{
'name': flower_svc.metadata.name,
'spec': flower_svc.spec.to_dict(),
'kind': constants.SERVICE_KIND,
'type': constants.FLOWER_TYPE
}
]

return {
'worker_deployment': update_celery_deployment(
apps_api_instance, spec, status, namespace
),
'flower_deployment': update_flower_deployment(
apps_api_instance, spec, status, namespace
),
'flower_service': update_flower_service(
api, spec, status, namespace
)
'children': children,
'children_count': len(children),
'status': constants.STATUS_UPDATED
}


def update_celery_deployment(apps_api_instance, spec, status, namespace):
def get_curr_deployment_from_handler_status(handler_name, status, child_type):
"""
Get current deployment name from handler's status
@param: handler_name - which handler to get from
@param: child_type - worker or flower
@returns: current deployment name
"""
for child in status.get(handler_name).get('children'):
if child.get('type') == child_type and child.get('kind') == constants.DEPLOYMENT_KIND: # NOQA
return child.get('name')

return None


def get_curr_deployment_name(status, child_type):
"""
Get current deployment name from parent's status
@param: child_type - worker or flower
@returns: current deployment name
"""
if status.get('update_fn'):
return get_curr_deployment_from_handler_status('update_fn', status, child_type)

return get_curr_deployment_from_handler_status('create_fn', status, child_type)


def update_worker_deployment(apps_api_instance, spec, status, namespace):
worker_spec = spec['workerSpec']
worker_spec_dict = {
'args': args_list_from_spec_params(
Expand Down Expand Up @@ -46,12 +100,13 @@ def update_celery_deployment(apps_api_instance, spec, status, namespace):
}
}

deployment_name = status['create_fn']['children']['worker_deployment']
apps_api_instance.patch_namespaced_deployment(
deployment_name, namespace, patch_body
worker_deployment_name = get_curr_deployment_name(
status, constants.WORKER_TYPE
)

return deployment_name
return apps_api_instance.patch_namespaced_deployment(
worker_deployment_name, namespace, patch_body
)


def update_flower_deployment(apps_api_instance, spec, status, namespace):
Expand Down Expand Up @@ -82,13 +137,13 @@ def update_flower_deployment(apps_api_instance, spec, status, namespace):
}
}

deployment_name = status['create_fn']['children']['flower_deployment']
# TODO: Use a try catch here
apps_api_instance.patch_namespaced_deployment(
deployment_name, namespace, patch_body
flower_deployment_name = get_curr_deployment_name(
status, constants.FLOWER_TYPE
)

return deployment_name
return apps_api_instance.patch_namespaced_deployment(
flower_deployment_name, namespace, patch_body
)


def update_flower_service(api, spec, status, namespace):
Expand All @@ -101,9 +156,9 @@ def update_flower_service(api, spec, status, namespace):
}
}

svc_name = status['create_fn']['children']['flower_service']
api.patch_namespaced_service(
svc_name, namespace, patch_body
flower_svc_name = get_curr_deployment_name(
status, constants.FLOWER_TYPE
) # flower svc is named same as flower deployment
return api.patch_namespaced_service(
flower_svc_name, namespace, patch_body
)

return svc_name

0 comments on commit de584c9

Please sign in to comment.