Skip to content

Commit

Permalink
Introduce distinct controlplane instance group
Browse files Browse the repository at this point in the history
  • Loading branch information
shanemcd committed Jun 7, 2021
1 parent 82c4f6b commit ec8ac6f
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ init:
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \
$(MANAGEMENT_COMMAND) register_queue --queuename=tower --instance_percent=100;
$(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;

# Refresh development environment after pulling new code.
refresh: clean requirements_dev version_file develop migrate
Expand Down
2 changes: 1 addition & 1 deletion awx/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def has_permission(self, request, view):

class InstanceGroupTowerPermission(ModelAccessPermission):
def has_object_permission(self, request, view, obj):
if request.method == 'DELETE' and obj.name == settings.DEFAULT_QUEUE_NAME:
if request.method == 'DELETE' and obj.name in [settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]:
return False
return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj)

Expand Down
8 changes: 6 additions & 2 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4918,8 +4918,12 @@ def validate_policy_instance_minimum(self, value):
return value

def validate_name(self, value):
if self.instance and self.instance.name == settings.DEFAULT_QUEUE_NAME and value != settings.DEFAULT_QUEUE_NAME:
raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_QUEUE_NAME))
if self.instance and self.instance.name == settings.DEFAULT_EXECUTION_QUEUE_NAME and value != settings.DEFAULT_EXECUTION_QUEUE_NAME:
raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_EXECUTION_QUEUE_NAME))

if self.instance and self.instance.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME and value != settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME))

return value

def validate_credential(self, value):
Expand Down
10 changes: 3 additions & 7 deletions awx/main/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def active_count(self):
- Only consider results that are unique
- Return the count of this query
"""
return self.order_by().exclude(inventory_sources__source=settings.DEFAULT_QUEUE_NAME).values('name').distinct().count()
return self.order_by().exclude(inventory_sources__source='tower').values('name').distinct().count()

def org_active_count(self, org_id):
"""Return count of active, unique hosts used by an organization.
Expand Down Expand Up @@ -146,8 +146,8 @@ def get_or_register(self):

pod_ip = os.environ.get('MY_POD_IP')
registered = self.register(ip_address=pod_ip)
is_container_group = settings.IS_K8S
RegisterQueue('tower', 100, 0, [], is_container_group).register()
RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register()
RegisterQueue(settings.DEFAULT_EXECUTION_QUEUE_NAME, 100, 0, [], is_container_group=True).register()
return registered
else:
return (False, self.me())
Expand All @@ -156,10 +156,6 @@ def active_count(self):
"""Return count of active Tower nodes for licensing."""
return self.all().count()

def my_role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "tower"


class InstanceGroupManager(models.Manager):
"""A custom manager class for the Instance model.
Expand Down
6 changes: 0 additions & 6 deletions awx/main/models/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ def is_lost(self, ref_time=None):
return self.modified < ref_time - timedelta(seconds=grace_period)

def refresh_capacity(self):
if settings.IS_K8S:
self.capacity = self.cpu = self.memory = self.cpu_capacity = self.mem_capacity = 0 # noqa
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
return

cpu = get_cpu_capacity()
mem = get_mem_capacity()
if self.enabled:
Expand Down
6 changes: 4 additions & 2 deletions awx/main/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,10 +1265,12 @@ def event_class(self):
return UnpartitionedSystemJobEvent
return SystemJobEvent

@property
def can_run_on_control_plane(self):
return True

@property
def task_impact(self):
if settings.IS_K8S:
return 0
return 5

@property
Expand Down
8 changes: 6 additions & 2 deletions awx/main/models/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ def websocket_emit_data(self):
websocket_data.update(dict(project_id=self.project.id))
return websocket_data

@property
def can_run_on_control_plane(self):
return True

@property
def event_class(self):
if self.has_unpartitioned_events:
Expand All @@ -561,8 +565,6 @@ def event_class(self):

@property
def task_impact(self):
if settings.IS_K8S:
return 0
return 0 if self.job_type == 'run' else 1

@property
Expand Down Expand Up @@ -623,6 +625,8 @@ def preferred_instance_groups(self):
organization_groups = []
template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups]
selected_groups = template_groups + organization_groups
if not any([not group.is_container_group for group in selected_groups]):
selected_groups = selected_groups + list(self.control_plane_instance_group)
if not selected_groups:
return self.global_instance_groups
return selected_groups
Expand Down
22 changes: 18 additions & 4 deletions awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,13 @@ def get_ui_url(self):
def _get_task_class(cls):
raise NotImplementedError # Implement in subclasses.

@property
def can_run_on_control_plane(self):
if settings.IS_K8S:
return False

return True

@property
def can_run_containerized(self):
return False
Expand Down Expand Up @@ -1415,14 +1422,21 @@ def preferred_instance_groups(self):
return []
return list(self.unified_job_template.instance_groups.all())

@property
def control_plane_instance_group(self):
from awx.main.models.ha import InstanceGroup

control_plane_instance_group = InstanceGroup.objects.filter(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)

return list(control_plane_instance_group)

@property
def global_instance_groups(self):
from awx.main.models.ha import InstanceGroup

default_instance_group = InstanceGroup.objects.filter(name=settings.DEFAULT_QUEUE_NAME)
if default_instance_group.exists():
return [default_instance_group.first()]
return []
default_instance_groups = InstanceGroup.objects.filter(name__in=[settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME])

return list(default_instance_groups)

def awx_meta_vars(self):
"""
Expand Down
12 changes: 7 additions & 5 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ def process_pending_tasks(self, pending_tasks):
tasks_to_update_job_explanation.append(task)
continue
preferred_instance_groups = task.preferred_instance_groups

found_acceptable_queue = False
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
Expand All @@ -484,19 +485,20 @@ def process_pending_tasks(self, pending_tasks):
running_workflow_templates.add(task.unified_job_template_id)
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue

for rampart_group in preferred_instance_groups:
if task.can_run_containerized and rampart_group.is_container_group:
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
break

if not task.can_run_on_control_plane:
logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name))
continue

remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if (
task.task_impact > 0
and not rampart_group.is_container_group # project updates have a cost of zero
and self.get_remaining_capacity(rampart_group.name) <= 0
):
if task.task_impact > 0 and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity))
continue

Expand Down
2 changes: 1 addition & 1 deletion awx/main/tests/functional/models/test_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_host_active_count(self, organization):

def test_active_count_minus_tower(self, inventory):
inventory.hosts.create(name='locally-managed-host')
source = inventory.inventory_sources.create(name='tower-source', source='default')
source = inventory.inventory_sources.create(name='tower-source', source='tower')
source.hosts.create(name='remotely-managed-host', inventory=inventory)
assert Host.objects.active_count() == 1

Expand Down
6 changes: 4 additions & 2 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,5 +946,7 @@ def IS_TESTING(argv=None):

DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}

# Default name of the task queue
DEFAULT_QUEUE_NAME = 'default'
# Name of the default task queue
DEFAULT_EXECUTION_QUEUE_NAME = 'default'
# Name of the default controlplane queue
DEFAULT_CONTROL_PLANE_QUEUE_NAME = 'controlplane'

0 comments on commit ec8ac6f

Please sign in to comment.