Skip to content

Commit

Permalink
[hotfix][runtime] Extract class ClusterResourceOverview into interfac…
Browse files Browse the repository at this point in the history
…e ClusterResourceStatisticsProvider

The class references internal states of TaskManagerTracker, providing up-to-date statistics rather than snapshots.
  • Loading branch information
xintongsong committed Mar 15, 2021
1 parent 0b52cf3 commit 80ff16c
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 133 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.InstanceID;

/** Provides statistics of cluster resources. */
public interface ClusterResourceStatisticsProvider {

/** Get total number of registered slots. */
int getNumberRegisteredSlots();

/** Get number of registered slots from the TaskManager with the given instance id. */
int getNumberRegisteredSlotsOf(InstanceID instanceId);

/** Get total number of free slots. */
int getNumberFreeSlots();

/** Get number of free slots from the TaskManager with the given instance id. */
int getNumberFreeSlotsOf(InstanceID instanceId);

/** Get profile of total registered resources. */
ResourceProfile getRegisteredResource();

/** Get profile of registered resources from the TaskManager with the given instance id. */
ResourceProfile getRegisteredResourceOf(InstanceID instanceId);

/** Get profile of total free resources. */
ResourceProfile getFreeResource();

/** Get profile of free resources from the TaskManager with the given instance id. */
ResourceProfile getFreeResourceOf(InstanceID instanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -562,24 +562,22 @@ private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(

@Override
public int getNumberRegisteredSlots() {
return taskManagerTracker.getClusterResourceOverview().getNumberRegisteredSlots();
return taskManagerTracker.getNumberRegisteredSlots();
}

@Override
public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
return taskManagerTracker
.getClusterResourceOverview()
.getNumberRegisteredSlotsOf(instanceId);
return taskManagerTracker.getNumberRegisteredSlotsOf(instanceId);
}

@Override
public int getNumberFreeSlots() {
return taskManagerTracker.getClusterResourceOverview().getNumberFreeSlots();
return taskManagerTracker.getNumberFreeSlots();
}

@Override
public int getNumberFreeSlotsOf(InstanceID instanceId) {
return taskManagerTracker.getClusterResourceOverview().getNumberFreeSlotsOf(instanceId);
return taskManagerTracker.getNumberFreeSlotsOf(instanceId);
}

@Override
Expand All @@ -595,22 +593,22 @@ public Map<WorkerResourceSpec, Integer> getRequiredResources() {

@Override
public ResourceProfile getRegisteredResource() {
return taskManagerTracker.getClusterResourceOverview().getRegisteredResource();
return taskManagerTracker.getRegisteredResource();
}

@Override
public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
return taskManagerTracker.getClusterResourceOverview().getRegisteredResourceOf(instanceID);
return taskManagerTracker.getRegisteredResourceOf(instanceID);
}

@Override
public ResourceProfile getFreeResource() {
return taskManagerTracker.getClusterResourceOverview().getFreeResource();
return taskManagerTracker.getFreeResource();
}

@Override
public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
return taskManagerTracker.getClusterResourceOverview().getRegisteredResourceOf(instanceID);
return taskManagerTracker.getRegisteredResourceOf(instanceID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,6 @@ public Collection<PendingTaskManager> getPendingTaskManagers() {
return Collections.unmodifiableCollection(pendingTaskManagers.values());
}

@Override
public ClusterResourceOverview getClusterResourceOverview() {
return new ClusterResourceOverview(taskManagerRegistrations);
}

@Override
public Collection<PendingTaskManager>
getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(
Expand All @@ -272,6 +267,67 @@ public ClusterResourceOverview getClusterResourceOverview() {
Collections.emptySet()));
}

@Override
public int getNumberRegisteredSlots() {
return taskManagerRegistrations.values().stream()
.mapToInt(TaskManagerInfo::getDefaultNumSlots)
.sum();
}

@Override
public int getNumberRegisteredSlotsOf(InstanceID instanceId) {
return Optional.ofNullable(taskManagerRegistrations.get(instanceId))
.map(TaskManagerInfo::getDefaultNumSlots)
.orElse(0);
}

@Override
public int getNumberFreeSlots() {
return taskManagerRegistrations.keySet().stream()
.mapToInt(this::getNumberFreeSlotsOf)
.sum();
}

@Override
public int getNumberFreeSlotsOf(InstanceID instanceId) {
return Optional.ofNullable(taskManagerRegistrations.get(instanceId))
.map(
taskManager ->
Math.max(
taskManager.getDefaultNumSlots()
- taskManager.getAllocatedSlots().size(),
0))
.orElse(0);
}

@Override
public ResourceProfile getRegisteredResource() {
return taskManagerRegistrations.values().stream()
.map(TaskManagerInfo::getTotalResource)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}

@Override
public ResourceProfile getRegisteredResourceOf(InstanceID instanceId) {
return Optional.ofNullable(taskManagerRegistrations.get(instanceId))
.map(TaskManagerInfo::getTotalResource)
.orElse(ResourceProfile.ZERO);
}

@Override
public ResourceProfile getFreeResource() {
return taskManagerRegistrations.values().stream()
.map(TaskManagerInfo::getAvailableResource)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}

@Override
public ResourceProfile getFreeResourceOf(InstanceID instanceId) {
return Optional.ofNullable(taskManagerRegistrations.get(instanceId))
.map(TaskManagerInfo::getAvailableResource)
.orElse(ResourceProfile.ZERO);
}

@Override
public void clear() {
slots.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ Map<JobID, ResourceCounter> getPendingAllocationsOfPendingTaskManager(
*/
Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(AllocationID allocationId);

/**
* Get the current {@link ClusterResourceOverview}.
*
* @return the current {@link ClusterResourceOverview}
*/
ClusterResourceOverview getClusterResourceOverview();

/**
* Get all pending task managers with given total and default slot profile.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.Map;

/** Tracks TaskManager's resource and slot status. */
interface TaskManagerTracker extends TaskManagerResourceInfoProvider {
interface TaskManagerTracker
extends TaskManagerResourceInfoProvider, ClusterResourceStatisticsProvider {

// ---------------------------------------------------------------------------------------------
// Add / Remove (pending) Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testRecordPendingAllocations() {
}

@Test
public void testGetStatusOverview() {
public void testGetStatistics() {
final FineGrainedTaskManagerTracker taskManagerTracker =
new FineGrainedTaskManagerTracker();
final ResourceProfile totalResource = ResourceProfile.fromResources(10, 1000);
Expand All @@ -319,14 +319,10 @@ public void testGetStatusOverview() {
TASK_EXECUTOR_CONNECTION.getInstanceID(),
defaultSlotResource,
SlotState.ALLOCATED);
final ClusterResourceOverview clusterResourceOverview =
taskManagerTracker.getClusterResourceOverview();

assertThat(
clusterResourceOverview.getFreeResource(),
is(ResourceProfile.fromResources(6, 700)));
assertThat(clusterResourceOverview.getRegisteredResource(), is(totalResource));
assertThat(clusterResourceOverview.getNumberRegisteredSlots(), is(10));
assertThat(clusterResourceOverview.getNumberFreeSlots(), is(8));
assertThat(taskManagerTracker.getFreeResource(), is(ResourceProfile.fromResources(6, 700)));
assertThat(taskManagerTracker.getRegisteredResource(), is(totalResource));
assertThat(taskManagerTracker.getNumberRegisteredSlots(), is(10));
assertThat(taskManagerTracker.getNumberFreeSlots(), is(8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class TestingTaskManagerResourceInfoProvider implements TaskManagerResour
private final Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier;
private final Function<AllocationID, Optional<TaskManagerSlotInformation>>
getAllocatedOrPendingSlotFunction;
private final Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier;
private final BiFunction<ResourceProfile, ResourceProfile, Collection<PendingTaskManager>>
getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction;

Expand All @@ -54,7 +53,6 @@ private TestingTaskManagerResourceInfoProvider(
Supplier<Collection<PendingTaskManager>> pendingTaskManagersSupplier,
Function<AllocationID, Optional<TaskManagerSlotInformation>>
getAllocatedOrPendingSlotFunction,
Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier,
BiFunction<ResourceProfile, ResourceProfile, Collection<PendingTaskManager>>
getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction) {
this.getPendingAllocationsOfPendingTaskManagerFunction =
Expand All @@ -66,8 +64,6 @@ private TestingTaskManagerResourceInfoProvider(
this.pendingTaskManagersSupplier = Preconditions.checkNotNull(pendingTaskManagersSupplier);
this.getAllocatedOrPendingSlotFunction =
Preconditions.checkNotNull(getAllocatedOrPendingSlotFunction);
this.clusterResourceOverviewSupplier =
Preconditions.checkNotNull(clusterResourceOverviewSupplier);
this.getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction =
Preconditions.checkNotNull(
getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction);
Expand Down Expand Up @@ -100,11 +96,6 @@ public Optional<TaskManagerSlotInformation> getAllocatedOrPendingSlot(
return getAllocatedOrPendingSlotFunction.apply(allocationId);
}

@Override
public ClusterResourceOverview getClusterResourceOverview() {
return clusterResourceOverviewSupplier.get();
}

@Override
public Collection<PendingTaskManager>
getPendingTaskManagersByTotalAndDefaultSlotResourceProfile(
Expand All @@ -130,18 +121,10 @@ public static class Builder {
Collections::emptyList;
private Function<AllocationID, Optional<TaskManagerSlotInformation>>
getAllocatedOrPendingSlotFunction = ignore -> Optional.empty();
private Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier =
() -> new ClusterResourceOverview(Collections.emptyMap());
private BiFunction<ResourceProfile, ResourceProfile, Collection<PendingTaskManager>>
getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction =
(ignored1, ignored2) -> Collections.emptyList();

public Builder setClusterResourceOverviewSupplier(
Supplier<ClusterResourceOverview> clusterResourceOverviewSupplier) {
this.clusterResourceOverviewSupplier = clusterResourceOverviewSupplier;
return this;
}

public Builder setGetAllocatedOrPendingSlotFunction(
Function<AllocationID, Optional<TaskManagerSlotInformation>>
getAllocatedOrPendingSlotFunction) {
Expand Down Expand Up @@ -190,7 +173,6 @@ public TestingTaskManagerResourceInfoProvider build() {
getRegisteredTaskManagerFunction,
pendingTaskManagersSupplier,
getAllocatedOrPendingSlotFunction,
clusterResourceOverviewSupplier,
getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction);
}
}
Expand Down

0 comments on commit 80ff16c

Please sign in to comment.