Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce distinct controlplane instance group #10324

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ init:
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \
$(MANAGEMENT_COMMAND) register_queue --queuename=tower --instance_percent=100;
$(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;

# Refresh development environment after pulling new code.
refresh: clean requirements_dev version_file develop migrate
Expand Down
4 changes: 3 additions & 1 deletion awx/api/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# Python
import logging

from django.conf import settings

# Django REST Framework
from rest_framework.exceptions import MethodNotAllowed, PermissionDenied
from rest_framework import permissions
Expand Down Expand Up @@ -245,7 +247,7 @@ def has_permission(self, request, view):

class InstanceGroupTowerPermission(ModelAccessPermission):
def has_object_permission(self, request, view, obj):
if request.method == 'DELETE' and obj.name == "tower":
if request.method == 'DELETE' and obj.name in [settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]:
return False
return super(InstanceGroupTowerPermission, self).has_object_permission(request, view, obj)

Expand Down
8 changes: 6 additions & 2 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4918,8 +4918,12 @@ def validate_policy_instance_minimum(self, value):
return value

def validate_name(self, value):
if self.instance and self.instance.name == 'tower' and value != 'tower':
raise serializers.ValidationError(_('tower instance group name may not be changed.'))
if self.instance and self.instance.name == settings.DEFAULT_EXECUTION_QUEUE_NAME and value != settings.DEFAULT_EXECUTION_QUEUE_NAME:
raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_EXECUTION_QUEUE_NAME))

if self.instance and self.instance.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME and value != settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
raise serializers.ValidationError(_('%s instance group name may not be changed.' % settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME))

return value

def validate_credential(self, value):
Expand Down
18 changes: 18 additions & 0 deletions awx/main/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,24 @@
read_only=True,
)

register(
'DEFAULT_CONTROL_PLANE_QUEUE_NAME',
field_class=fields.CharField,
label=_('The instance group where control plane tasks run'),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the instance group where control plane tasks run?

The default EE uses a ForeignKey instead of name, this isn't consistent. We've never had settings reference objects before, which is why we're facing it with this release cycle.

If the user changes the value, what is the expected behavior? It won't create a new instance group with that name right? So would they have to make a corresponding change to their installer (or installer variables)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you miss the read_only=True part?

category=_('System'),
category_slug='system',
read_only=True,
)

register(
'DEFAULT_EXECUTION_QUEUE_NAME',
field_class=fields.CharField,
label=_('The instance group where user jobs run (currently only on non-VM installs)'),
category=_('System'),
category_slug='system',
read_only=True,
)

register(
'DEFAULT_EXECUTION_ENVIRONMENT',
field_class=fields.PrimaryKeyRelatedField,
Expand Down
8 changes: 2 additions & 6 deletions awx/main/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ def get_or_register(self):

pod_ip = os.environ.get('MY_POD_IP')
registered = self.register(ip_address=pod_ip)
is_container_group = settings.IS_K8S
RegisterQueue('tower', 100, 0, [], is_container_group).register()
RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register()
RegisterQueue(settings.DEFAULT_EXECUTION_QUEUE_NAME, 100, 0, [], is_container_group=True).register()
return registered
else:
return (False, self.me())
Expand All @@ -156,10 +156,6 @@ def active_count(self):
"""Return count of active Tower nodes for licensing."""
return self.all().count()

def my_role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "tower"


class InstanceGroupManager(models.Manager):
"""A custom manager class for the Instance model.
Expand Down
6 changes: 0 additions & 6 deletions awx/main/models/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ def is_lost(self, ref_time=None):
return self.modified < ref_time - timedelta(seconds=grace_period)

def refresh_capacity(self):
if settings.IS_K8S:
self.capacity = self.cpu = self.memory = self.cpu_capacity = self.mem_capacity = 0 # noqa
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
return

cpu = get_cpu_capacity()
mem = get_mem_capacity()
if self.enabled:
Expand Down
6 changes: 4 additions & 2 deletions awx/main/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,10 +1265,12 @@ def event_class(self):
return UnpartitionedSystemJobEvent
return SystemJobEvent

@property
def can_run_on_control_plane(self):
return True

@property
def task_impact(self):
if settings.IS_K8S:
return 0
return 5

@property
Expand Down
8 changes: 6 additions & 2 deletions awx/main/models/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ def websocket_emit_data(self):
websocket_data.update(dict(project_id=self.project.id))
return websocket_data

@property
def can_run_on_control_plane(self):
return True

@property
def event_class(self):
if self.has_unpartitioned_events:
Expand All @@ -561,8 +565,6 @@ def event_class(self):

@property
def task_impact(self):
if settings.IS_K8S:
return 0
return 0 if self.job_type == 'run' else 1

@property
Expand Down Expand Up @@ -623,6 +625,8 @@ def preferred_instance_groups(self):
organization_groups = []
template_groups = [x for x in super(ProjectUpdate, self).preferred_instance_groups]
selected_groups = template_groups + organization_groups
if not any([not group.is_container_group for group in selected_groups]):
selected_groups = selected_groups + list(self.control_plane_instance_group)
if not selected_groups:
return self.global_instance_groups
return selected_groups
Expand Down
22 changes: 18 additions & 4 deletions awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,13 @@ def get_ui_url(self):
def _get_task_class(cls):
raise NotImplementedError # Implement in subclasses.

@property
def can_run_on_control_plane(self):
if settings.IS_K8S:
return False

return True

@property
def can_run_containerized(self):
return False
Expand Down Expand Up @@ -1415,14 +1422,21 @@ def preferred_instance_groups(self):
return []
return list(self.unified_job_template.instance_groups.all())

@property
def control_plane_instance_group(self):
from awx.main.models.ha import InstanceGroup

control_plane_instance_group = InstanceGroup.objects.filter(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)

return list(control_plane_instance_group)

@property
def global_instance_groups(self):
from awx.main.models.ha import InstanceGroup

default_instance_group = InstanceGroup.objects.filter(name='tower')
if default_instance_group.exists():
return [default_instance_group.first()]
return []
default_instance_groups = InstanceGroup.objects.filter(name__in=[settings.DEFAULT_EXECUTION_QUEUE_NAME, settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME])

return list(default_instance_groups)

def awx_meta_vars(self):
"""
Expand Down
12 changes: 7 additions & 5 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ def process_pending_tasks(self, pending_tasks):
tasks_to_update_job_explanation.append(task)
continue
preferred_instance_groups = task.preferred_instance_groups

found_acceptable_queue = False
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
Expand All @@ -484,19 +485,20 @@ def process_pending_tasks(self, pending_tasks):
running_workflow_templates.add(task.unified_job_template_id)
self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue

for rampart_group in preferred_instance_groups:
if task.can_run_containerized and rampart_group.is_container_group:
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
break

if not task.can_run_on_control_plane:
logger.debug("Skipping group {}, task cannot run on control plane".format(rampart_group.name))
continue

remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if (
task.task_impact > 0
and not rampart_group.is_container_group # project updates have a cost of zero
and self.get_remaining_capacity(rampart_group.name) <= 0
):
if task.task_impact > 0 and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(rampart_group.name, remaining_capacity))
continue

Expand Down
2 changes: 1 addition & 1 deletion awx/main/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def instance_group_factory():

@pytest.fixture
def default_instance_group(instance_factory, instance_group_factory):
return create_instance_group("tower", instances=[create_instance("hostA")])
return create_instance_group("default", instances=[create_instance("hostA")])


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions awx/main/tests/functional/api/test_instance_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

@pytest.fixture
def tower_instance_group():
ig = InstanceGroup(name='tower')
ig = InstanceGroup(name='default')
ig.save()
return ig

Expand Down Expand Up @@ -117,8 +117,8 @@ def test_delete_rename_tower_instance_group_prevented(delete, options, tower_ins
assert 'GET' in resp.data['actions']
assert 'PUT' in resp.data['actions']

# Rename 'tower' instance group denied
patch(url, {'name': 'tower_prime'}, super_user, expect=400)
# Rename 'default' instance group denied
patch(url, {'name': 'default_prime'}, super_user, expect=400)

# Rename, other instance group OK
url = reverse("api:instance_group_detail", kwargs={'pk': instance_group.pk})
Expand Down
12 changes: 6 additions & 6 deletions awx/main/tests/functional/task_management/test_capacity.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ class TestCapacityMapping(TransactionTestCase):
def sample_cluster(self):
ig_small = InstanceGroup.objects.create(name='ig_small')
ig_large = InstanceGroup.objects.create(name='ig_large')
tower = InstanceGroup.objects.create(name='tower')
default = InstanceGroup.objects.create(name='default')
i1 = Instance.objects.create(hostname='i1', capacity=200)
i2 = Instance.objects.create(hostname='i2', capacity=200)
i3 = Instance.objects.create(hostname='i3', capacity=200)
ig_small.instances.add(i1)
ig_large.instances.add(i2, i3)
tower.instances.add(i2)
return [tower, ig_large, ig_small]
default.instances.add(i2)
return [default, ig_large, ig_small]

def test_mapping(self):
self.sample_cluster()
with self.assertNumQueries(2):
inst_map, ig_map = InstanceGroup.objects.capacity_mapping()
assert inst_map['i1'] == set(['ig_small'])
assert inst_map['i2'] == set(['ig_large', 'tower'])
assert inst_map['i2'] == set(['ig_large', 'default'])
assert ig_map['ig_small'] == set(['ig_small'])
assert ig_map['ig_large'] == set(['ig_large', 'tower'])
assert ig_map['tower'] == set(['ig_large', 'tower'])
assert ig_map['ig_large'] == set(['ig_large', 'default'])
assert ig_map['default'] == set(['ig_large', 'default'])
36 changes: 18 additions & 18 deletions awx/main/tests/unit/test_capacity.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,34 @@ class Instance(FakeObject):

ig_small = InstanceGroup(name='ig_small')
ig_large = InstanceGroup(name='ig_large')
tower = InstanceGroup(name='tower')
default = InstanceGroup(name='default')
i1 = Instance(hostname='i1', capacity=200)
i2 = Instance(hostname='i2', capacity=200)
i3 = Instance(hostname='i3', capacity=200)
ig_small.instances.add(i1)
ig_large.instances.add(i2, i3)
tower.instances.add(i2)
return [tower, ig_large, ig_small]
default.instances.add(i2)
return [default, ig_large, ig_small]

return stand_up_cluster


def test_committed_capacity(sample_cluster):
tower, ig_large, ig_small = sample_cluster()
tasks = [Job(status='waiting', instance_group=tower), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True)
default, ig_large, ig_small = sample_cluster()
tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True)
# Jobs submitted to either tower or ig_larg must count toward both
assert capacities['tower']['committed_capacity'] == 43 * 2
assert capacities['default']['committed_capacity'] == 43 * 2
assert capacities['ig_large']['committed_capacity'] == 43 * 2
assert capacities['ig_small']['committed_capacity'] == 43


def test_running_capacity(sample_cluster):
tower, ig_large, ig_small = sample_cluster()
default, ig_large, ig_small = sample_cluster()
tasks = [Job(status='running', execution_node='i1'), Job(status='running', execution_node='i2'), Job(status='running', execution_node='i3')]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks, breakdown=True)
capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks, breakdown=True)
# Tower is only given 1 instance
assert capacities['tower']['running_capacity'] == 43
assert capacities['default']['running_capacity'] == 43
# Large IG has 2 instances
assert capacities['ig_large']['running_capacity'] == 43 * 2
assert capacities['ig_small']['running_capacity'] == 43
Expand All @@ -81,21 +81,21 @@ def test_offline_node_running(sample_cluster):
Assure that algorithm doesn't explode if a job is marked running
in an offline node
"""
tower, ig_large, ig_small = sample_cluster()
default, ig_large, ig_small = sample_cluster()
ig_small.instance_list[0].capacity = 0
tasks = [Job(status='running', execution_node='i1', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks)
capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks)
assert capacities['ig_small']['consumed_capacity'] == 43


def test_offline_node_waiting(sample_cluster):
"""
Same but for a waiting job
"""
tower, ig_large, ig_small = sample_cluster()
default, ig_large, ig_small = sample_cluster()
ig_small.instance_list[0].capacity = 0
tasks = [Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower, ig_large, ig_small], tasks=tasks)
capacities = InstanceGroup.objects.capacity_values(qs=[default, ig_large, ig_small], tasks=tasks)
assert capacities['ig_small']['consumed_capacity'] == 43


Expand All @@ -105,9 +105,9 @@ def test_RBAC_reduced_filter(sample_cluster):
but user does not have permission to see those actual instance groups.
Verify that this does not blow everything up.
"""
tower, ig_large, ig_small = sample_cluster()
tasks = [Job(status='waiting', instance_group=tower), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[tower], tasks=tasks, breakdown=True)
default, ig_large, ig_small = sample_cluster()
tasks = [Job(status='waiting', instance_group=default), Job(status='waiting', instance_group=ig_large), Job(status='waiting', instance_group=ig_small)]
capacities = InstanceGroup.objects.capacity_values(qs=[default], tasks=tasks, breakdown=True)
# Cross-links between groups not visible to current user,
# so a naieve accounting of capacities is returned instead
assert capacities['tower']['committed_capacity'] == 43
assert capacities['default']['committed_capacity'] == 43
5 changes: 5 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,3 +945,8 @@ def IS_TESTING(argv=None):
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5

DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}

# Name of the default task queue
DEFAULT_EXECUTION_QUEUE_NAME = 'default'
# Name of the default controlplane queue
DEFAULT_CONTROL_PLANE_QUEUE_NAME = 'controlplane'