Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow arbitrary resource definitions #87

Merged
merged 25 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2643e5f
allow arbitrary resource definitions
pedohorse Jun 23, 2024
adc5a87
implement arbitrary resource display
pedohorse Jun 23, 2024
0935226
add __repr__ __eq__ to WorkerResources
pedohorse Jun 24, 2024
4fd2f56
adjust tests to new changes
pedohorse Jun 24, 2024
0ec26c0
NodeSerializerBase INTERFACE CHANGE, simplify deserialization, do not…
pedohorse Jun 25, 2024
aaa029e
DESIGN CHANGE: SchedulerConfigProviderBase to provide structured and …
pedohorse Jun 27, 2024
3dde637
move BaseNodeWithTaskRequirements to node_plugin_base module
pedohorse Jun 27, 2024
6f41ffe
add node parameter evaluation context test
pedohorse Jun 30, 2024
97eee93
adjust base node resource tests for arbitrary resource design
pedohorse Jun 30, 2024
593cb13
introduce arbitrary resources WIP
pedohorse Jun 30, 2024
cc73e4b
allow resource definition overrides for testing
pedohorse Jul 1, 2024
25a92f0
NotPerformed operations not added to undo stack
pedohorse Jul 1, 2024
534ce91
appropriate display for locker/readonly parameters
pedohorse Jul 1, 2024
1ad5969
refactor: plugin loader configuration is supplied from config provider
pedohorse Jul 4, 2024
ca8319d
switch to tree model/view for workers
pedohorse Jul 4, 2024
d867993
refactor pluginloader
pedohorse Jul 6, 2024
6aae08e
fix broken config test
pedohorse Jul 6, 2024
9035e16
add default to resource definition
pedohorse Jul 7, 2024
ea38dd3
check db schema upgrade before dealing with resources
pedohorse Jul 7, 2024
9774c14
default for SHARABLE_COMPUTATIONAL_UNIT - int()
pedohorse Jul 7, 2024
c017272
update requirement-related parameters, assume res0 is cpu, res1 is mem
pedohorse Jul 7, 2024
7ba353f
fix resource test to be async
pedohorse Jul 7, 2024
f89bf4c
add cpu/mem requirement migration logic
pedohorse Jul 7, 2024
d9e87ba
add HardwareResources unittests
pedohorse Jul 7, 2024
ef77d76
cleanup
pedohorse Jul 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 7 additions & 99 deletions src/lifeblood/basenode.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import asyncio
import re
from copy import deepcopy
from typing import Dict, Optional, Any
from .nodethings import ProcessingResult
from .uidata import NodeUi, ParameterNotFound, Parameter
from .processingcontext import ProcessingContext
from .logging import get_logger
from .enums import NodeParameterType, WorkerType
from .plugin_info import PluginInfo, empty_plugin_info
from .nodegraph_holder_base import NodeGraphHolderBase

# reexport
from .nodethings import ProcessingError

Check warning on line 12 in src/lifeblood/basenode.py

View workflow job for this annotation

GitHub Actions / flake8

F401 '.nodethings.ProcessingError' imported but unused

from typing import TYPE_CHECKING, Iterable

Expand Down Expand Up @@ -56,6 +54,9 @@
self.__parent = graph_holder
self.__parent_nid = node_id_in_graph

def parent(self) -> Optional[NodeGraphHolderBase]:
return self.__parent

def logger(self) -> "Logger":
return self.__logger

Expand Down Expand Up @@ -167,11 +168,11 @@
# # this may also apply to _ui_changed above, but nodes really SHOULD NOT change their own parameters during processing
# asyncio.get_event_loop().create_task(self.__parent.node_reports_changes_needs_saving(self.__parent_nid))

def _process_task_wrapper(self, task_dict) -> ProcessingResult:
def _process_task_wrapper(self, task_dict, node_config) -> ProcessingResult:
# with self.get_ui().lock_interface_readonly(): # TODO: this is bad, RETHINK!
# TODO: , in case threads do l1---r1 - release2 WILL leave lock in locked state forever, as it remembered it at l2
# TODO: l2---r2
return self.process_task(ProcessingContext(self, task_dict))
return self.process_task(ProcessingContext(self, task_dict, node_config))

def process_task(self, context: ProcessingContext) -> ProcessingResult:
"""
Expand All @@ -182,9 +183,9 @@
"""
raise NotImplementedError()

def _postprocess_task_wrapper(self, task_dict) -> ProcessingResult:
def _postprocess_task_wrapper(self, task_dict, node_config) -> ProcessingResult:
# with self.get_ui().lock_interface_readonly(): #TODO: read comment for _process_task_wrapper
return self.postprocess_task(ProcessingContext(self, task_dict))
return self.postprocess_task(ProcessingContext(self, task_dict, node_config))

def postprocess_task(self, context: ProcessingContext) -> ProcessingResult:
"""
Expand Down Expand Up @@ -252,96 +253,3 @@
restore state as given by get_state
"""
pass


class BaseNodeWithTaskRequirements(BaseNode):
def __init__(self, name: str):
super(BaseNodeWithTaskRequirements, self).__init__(name)
ui = self.get_ui()
with ui.initializing_interface_lock():
with ui.collapsable_group_block('main worker requirements', 'worker requirements'):
ui.add_parameter('priority adjustment', 'priority adjustment', NodeParameterType.FLOAT, 0).set_slider_visualization(-100, 100)
with ui.parameters_on_same_line_block():
ui.add_parameter('worker cpu cost', 'min <cpu (cores)> preferred', NodeParameterType.FLOAT, 1.0).set_value_limits(value_min=0)
ui.add_parameter('worker cpu cost preferred', None, NodeParameterType.FLOAT, 0.0).set_value_limits(value_min=0)
with ui.parameters_on_same_line_block():
ui.add_parameter('worker mem cost', 'min <memory (GBs)> preferred', NodeParameterType.FLOAT, 0.5).set_value_limits(value_min=0)
ui.add_parameter('worker mem cost preferred', None, NodeParameterType.FLOAT, 0.0).set_value_limits(value_min=0)
ui.add_parameter('worker groups', 'groups (space or comma separated)', NodeParameterType.STRING, '')
ui.add_parameter('worker type', 'worker type', NodeParameterType.INT, WorkerType.STANDARD.value)\
.add_menu((('standard', WorkerType.STANDARD.value),
('scheduler helper', WorkerType.SCHEDULER_HELPER.value)))
with ui.collapsable_group_block('gpu main worker requirements', 'gpu requirements'):
with ui.parameters_on_same_line_block():
ui.add_parameter('worker gpu cost', 'min <gpus> preferred', NodeParameterType.FLOAT, 0.0).set_value_limits(value_min=0)
ui.add_parameter('worker gpu cost preferred', None, NodeParameterType.FLOAT, 0.0).set_value_limits(value_min=0)
with ui.parameters_on_same_line_block():
ui.add_parameter('worker gpu mem cost', 'min <memory (GBs)> preferred', NodeParameterType.FLOAT, 0.0).set_value_limits(value_min=0)
ui.add_parameter('worker gpu mem cost preferred', None, NodeParameterType.FLOAT, 0.0).set_value_limits(value_min=0)

def __apply_requirements(self, task_dict: dict, result: ProcessingResult):
if result.invocation_job is not None:
context = ProcessingContext(self, task_dict)
raw_groups = context.param_value('worker groups').strip()
reqs = result.invocation_job.requirements()
if raw_groups != '':
reqs.add_groups(re.split(r'[ ,]+', raw_groups))
reqs.set_min_cpu_count(context.param_value('worker cpu cost'))
reqs.set_min_memory_bytes(context.param_value('worker mem cost') * 10**9)
reqs.set_min_gpu_count(context.param_value('worker gpu cost'))
reqs.set_min_gpu_memory_bytes(context.param_value('worker gpu mem cost') * 10**9)
# preferred
pref_cpu_count = context.param_value('worker cpu cost preferred')
pref_mem_bytes = context.param_value('worker mem cost preferred') * 10**9
pref_gpu_count = context.param_value('worker gpu cost preferred')
pref_gpu_mem_bytes = context.param_value('worker gpu mem cost preferred') * 10**9
if pref_cpu_count > 0:
reqs.set_preferred_cpu_count(pref_cpu_count)
if pref_mem_bytes > 0:
reqs.set_preferred_memory_bytes(pref_mem_bytes)
if pref_gpu_count > 0:
reqs.set_preferred_gpu_count(pref_gpu_count)
if pref_gpu_mem_bytes > 0:
reqs.set_preferred_gpu_memory_bytes(pref_gpu_mem_bytes)

reqs.set_worker_type(WorkerType(context.param_value('worker type')))
result.invocation_job.set_requirements(reqs)
result.invocation_job.set_priority(context.param_value('priority adjustment'))
return result

def _process_task_wrapper(self, task_dict) -> ProcessingResult:
result = super(BaseNodeWithTaskRequirements, self)._process_task_wrapper(task_dict)
return self.__apply_requirements(task_dict, result)

def _postprocess_task_wrapper(self, task_dict) -> ProcessingResult:
result = super(BaseNodeWithTaskRequirements, self)._postprocess_task_wrapper(task_dict)
return self.__apply_requirements(task_dict, result)


# class BaseNodeWithEnvironmentRequirements(BaseNode):
# def __init__(self, name: str):
# super(BaseNodeWithEnvironmentRequirements, self).__init__(name)
# ui = self.get_ui()
# with ui.initializing_interface_lock():
# with ui.collapsable_group_block('main environment resolver', 'task environment resolver additional requirements'):
# ui.add_parameter('main env resolver name', 'resolver name', NodeParameterType.STRING, 'StandardEnvironmentResolver')
# with ui.multigroup_parameter_block('main env resolver arguments'):
# with ui.parameters_on_same_line_block():
# type_param = ui.add_parameter('main env resolver arg type', '', NodeParameterType.INT, 0)
# type_param.add_menu((('int', NodeParameterType.INT.value),
# ('bool', NodeParameterType.BOOL.value),
# ('float', NodeParameterType.FLOAT.value),
# ('string', NodeParameterType.STRING.value),
# ('json', -1)
# ))
#
# ui.add_parameter('main env resolver arg svalue', 'val', NodeParameterType.STRING, '').append_visibility_condition(type_param, '==', NodeParameterType.STRING.value)
# ui.add_parameter('main env resolver arg ivalue', 'val', NodeParameterType.INT, 0).append_visibility_condition(type_param, '==', NodeParameterType.INT.value)
# ui.add_parameter('main env resolver arg fvalue', 'val', NodeParameterType.FLOAT, 0.0).append_visibility_condition(type_param, '==', NodeParameterType.FLOAT.value)
# ui.add_parameter('main env resolver arg bvalue', 'val', NodeParameterType.BOOL, False).append_visibility_condition(type_param, '==', NodeParameterType.BOOL.value)
# ui.add_parameter('main env resolver arg jvalue', 'val', NodeParameterType.STRING, '').append_visibility_condition(type_param, '==', -1)
#
# def _process_task_wrapper(self, task_dict) -> ProcessingResult:
# result = super(BaseNodeWithEnvironmentRequirements, self)._process_task_wrapper(task_dict)
# result.invocation_job.environment_resolver_arguments()
# return result
7 changes: 3 additions & 4 deletions src/lifeblood/basenode_serialization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from .basenode import BaseNode
from .node_dataprovider_base import NodeDataProvider
from .nodegraph_holder_base import NodeGraphHolderBase

from typing import Iterable, List, Optional, Tuple

Expand Down Expand Up @@ -31,11 +30,11 @@ def serialize(self, node: BaseNode) -> Tuple[bytes, Optional[bytes]]:
def serialize_state_only(self, node: BaseNode) -> Optional[bytes]:
raise NotImplementedError()

def deserialize(self, parent: NodeGraphHolderBase, node_id: int, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:
def deserialize(self, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:
raise NotImplementedError()

async def deserialize_async(self, parent: NodeGraphHolderBase, node_id: int, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:
return await asyncio.get_event_loop().run_in_executor(None, self.deserialize, parent, node_id, node_data_provider, data, state)
async def deserialize_async(self, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:
return await asyncio.get_event_loop().run_in_executor(None, self.deserialize, node_data_provider, data, state)

async def serialize_async(self, node: BaseNode) -> Tuple[bytes, Optional[bytes]]:
return await asyncio.get_event_loop().run_in_executor(None, self.serialize, node)
12 changes: 5 additions & 7 deletions src/lifeblood/basenode_serializer_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from io import BytesIO
from dataclasses import dataclass
from .basenode_serialization import NodeSerializerBase, IncompatibleDeserializationMethod
from .basenode import BaseNode, NodeParameterType
from .basenode import BaseNode
from .enums import NodeParameterType

from typing import Callable, Optional, Tuple, Union

from .node_dataprovider_base import NodeDataProvider
from .nodegraph_holder_base import NodeGraphHolderBase


@dataclass
Expand All @@ -18,10 +18,9 @@ class ParameterData:
expression: Optional[str]


def create_node_maker(node_data_provider: NodeDataProvider) -> Callable[[str, str, NodeGraphHolderBase, int], BaseNode]:
def create_node(type_name: str, name: str, sched_parent, node_id) -> BaseNode:
def create_node_maker(node_data_provider: NodeDataProvider) -> Callable[[str, str], BaseNode]:
def create_node(type_name: str, name: str, *args, **kwargs) -> BaseNode: # *args, **kwargs there - for compatibility. extra args should be safely ignored
node = node_data_provider.node_factory(type_name)(name)
node.set_parent(sched_parent, node_id)
return node
return create_node

Expand All @@ -30,7 +29,7 @@ class NodeSerializerV1(NodeSerializerBase):
def serialize(self, node: BaseNode) -> Tuple[bytes, Optional[bytes]]:
raise DeprecationWarning('no use this!')

def deserialize(self, parent: NodeGraphHolderBase, node_id: int, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:
def deserialize(self, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:
# this be pickled
# we do hacky things here fo backward compatibility
class Unpickler(pickle.Unpickler):
Expand All @@ -47,5 +46,4 @@ def find_class(self, module, name):
except Exception as e:
raise IncompatibleDeserializationMethod(f'error loading pickle: {e}') from None

newobj.set_parent(parent, node_id)
return newobj
41 changes: 36 additions & 5 deletions src/lifeblood/basenode_serializer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import json
from .common_serialization import AttribSerializer, AttribDeserializer
from .basenode_serialization import NodeSerializerBase, IncompatibleDeserializationMethod, FailedToApplyNodeState, FailedToApplyParameters
from .basenode import BaseNode, NodeParameterType
from .basenode import BaseNode
from .enums import NodeParameterType
from .uidata import ParameterFullValue

from typing import Optional, Tuple, Union

from .node_dataprovider_base import NodeDataProvider
from .nodegraph_holder_base import NodeGraphHolderBase


@dataclass
Expand Down Expand Up @@ -85,7 +85,7 @@
state = node.get_state()
return None if state is None else json.dumps(state, cls=NodeSerializerV2.Serializer).encode('latin1')

def deserialize(self, parent: NodeGraphHolderBase, node_id: int, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:
def deserialize(self, node_data_provider: NodeDataProvider, data: bytes, state: Optional[bytes]) -> BaseNode:

Check warning on line 88 in src/lifeblood/basenode_serializer_v2.py

View workflow job for this annotation

GitHub Actions / flake8

C901 'NodeSerializerV2.deserialize' is too complex (11)
try:
data_dict = json.loads(data.decode('latin1'), cls=NodeSerializerV2.Deserializer)
except json.JSONDecodeError:
Expand All @@ -96,10 +96,11 @@
if (fv := data_dict['format_version']) != 2:
raise IncompatibleDeserializationMethod(f'format_version {fv} is not supported')
new_node = node_data_provider.node_factory(data_dict['type_name'])(data_dict['name'])
new_node.set_parent(parent, node_id)
try:
with new_node.get_ui().block_ui_callbacks():
new_node.get_ui().set_parameters_batch({name: ParameterFullValue(val.unexpanded_value, val.expression) for name, val in data_dict['parameters'].items()})
param_dict = {name: ParameterFullValue(val.unexpanded_value, val.expression) for name, val in data_dict['parameters'].items()}
self.__resource_compatibility_filter(param_dict) # TODO: remove this a couple of months in the future
new_node.get_ui().set_parameters_batch(param_dict)
except Exception:
# actually set_parameters_batch catches all reasonable exceptions and treats them as warnings,
# so this seems unreachable, but if something does happen - we treat it as fail to set all params
Expand All @@ -111,3 +112,33 @@
raise FailedToApplyNodeState(wrapped_expection=e)

return new_node

@staticmethod
def __resource_compatibility_filter(param_dict: dict):
rename_params = {
'priority adjustment': '__requirements__.priority_adjustment',
'worker groups': '__requirements__.worker_groups',
'worker type': '__requirements__.worker_type',
'worker cpu cost': '__requirements__.f_min_res_0',
'worker cpu cost preferred': '__requirements__.f_pref_res_0',
'worker mem cost': '__requirements__.f_min_res_1',
'worker mem cost preferred': '__requirements__.f_pref_res_1',
}
for param_name in (
'worker gpu cost', 'worker gpu cost preferred', 'worker gpu mem cost', 'worker gpu mem cost preferred'
):
if param_name in param_dict:
param_dict.pop(param_name)

if 'worker cpu cost' in param_dict and '__requirements__.res' not in param_dict:
param_dict['__requirements__.res'] = ParameterData(
'__requirements__.res',
NodeParameterType.INT,
2,
None,
)

for old_name, new_name in rename_params.items():
if new_name in param_dict or old_name not in param_dict:
continue
param_dict[new_name] = param_dict.pop(old_name)
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/attribute_splitter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import copy
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.enums import NodeParameterType
from lifeblood.nodethings import ProcessingResult, ProcessingError
import math
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/del_attrib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult
from lifeblood.enums import NodeParameterType

Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/environment_resolver_setter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode, ProcessingError
from lifeblood.node_plugin_base import BaseNode, ProcessingError
from lifeblood.nodethings import ProcessingResult
from lifeblood.processingcontext import ProcessingContext
from lifeblood.enums import NodeParameterType
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/kill.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult

from typing import Iterable
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/mod_attrib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult
from lifeblood.taskspawn import TaskSpawn
from lifeblood.exceptions import NodeNotReadyToProcess
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/null.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult

from typing import Iterable
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/parent_children_waiter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dataclasses
from dataclasses import dataclass
from lifeblood.attribute_serialization import deserialize_attributes_core
from lifeblood.basenode import BaseNode, ProcessingError
from lifeblood.node_plugin_base import BaseNode, ProcessingError
from lifeblood.nodethings import ProcessingResult
from lifeblood.taskspawn import TaskSpawn
from lifeblood.exceptions import NodeNotReadyToProcess
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/python.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import time

from lifeblood.basenode import BaseNodeWithTaskRequirements
from lifeblood.node_plugin_base import BaseNodeWithTaskRequirements
from lifeblood.invocationjob import InvocationJob, InvocationEnvironment
from lifeblood.processingcontext import ProcessingContext
from lifeblood.nodethings import ProcessingResult, ProcessingError
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/rename_attrib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult
from lifeblood.taskspawn import TaskSpawn
from lifeblood.exceptions import NodeNotReadyToProcess
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/set_attrib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult
from lifeblood.enums import NodeParameterType

Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/spawn_children.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult
from lifeblood.enums import NodeParameterType
from lifeblood.taskspawn import TaskSpawn
Expand Down
2 changes: 1 addition & 1 deletion src/lifeblood/core_nodes/split_waiter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from lifeblood.basenode import BaseNode
from lifeblood.node_plugin_base import BaseNode
from lifeblood.nodethings import ProcessingResult
from lifeblood.taskspawn import TaskSpawn
from lifeblood.exceptions import NodeNotReadyToProcess
Expand Down
Loading
Loading