Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create FR and FP controllers inheriting from OdinController #362

Open
wants to merge 3 commits into
base: fastcs-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/frameReceiver/src/DummyTCPFrameDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void DummyTCPFrameDecoder::monitor_buffers(void) {}

void DummyTCPFrameDecoder::get_status(const std::string param_prefix,
OdinData::IpcMessage &status_msg) {
status_msg.set_param(param_prefix + "name",
status_msg.set_param(param_prefix + "class",
std::string("DummyTCPFrameDecoder"));
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/frameReceiver/src/DummyUDPFrameDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ void DummyUDPFrameDecoder::get_status(const std::string param_prefix,
OdinData::IpcMessage& status_msg)
{
status_get_count_++;
status_msg.set_param(param_prefix + "name", std::string("DummyUDPFrameDecoder"));
status_msg.set_param(param_prefix + "class", std::string("DummyUDPFrameDecoder"));
status_msg.set_param(param_prefix + "status_get_count", status_get_count_);
status_msg.set_param(param_prefix + "packets_received", packets_received_);
status_msg.set_param(param_prefix + "packets_lost", packets_lost_);
Expand Down
152 changes: 152 additions & 0 deletions python/src/odin_data/control/frame_processor_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from odin_data.control.odin_data_adapter import OdinDataAdapter
from odin_data.control.odin_data_controller import OdinDataController
import logging

class FrameProcessorController(OdinDataController):
def __init__(self, name, endpoints, update_interval=0.5):
super().__init__(name, endpoints, update_interval)
# If we are a FP controller then we need to track the writing state
self._first_update = False
self._writing = [False]*len(self._clients)

@property
def first_update(self):
return self._first_update

def setup_parameter_tree(self):
super(FrameProcessorController, self).setup_parameter_tree()
self._acquisition_id = ""
self._write = False
self._frames = 0
self._file_path = ""
self._file_prefix = ""
self._file_extension = "h5"
self._tree["config"] = {
"hdf": {
"acquisition_id": (
self._get("_acquisition_id"),
lambda v: self._set("_acquisition_id", v),
{},
),
"frames": (
self._get("_frames"),
lambda v: self._set("_frames", v),
{},
),
"file": {
"path": (
self._get("_file_path"),
lambda v: self._set("_file_path", v),
{},
),
"prefix": (
self._get("_file_prefix"),
lambda v: self._set("_file_prefix", v),
{},
),
"extension": (
self._get("_file_extension"),
lambda v: self._set("_file_extension", v),
{},
),
},
"write": (
self._get("_write"),
self.execute_write,
{}
)
},
}

def execute_write(self, value):
# Queue the write command
logging.debug("Executing write command with value: {}".format(value))
processes = len(self._clients)

if value:
# Before attempting to write files, make some simple error checks

# Check if we have a valid buffer status from the FR adapter

# valid, reason = self.check_controller_status()
# if not valid:
# raise RuntimeError(reason)

# Check the file prefix is not empty
if str(self._file_prefix) == '':
raise RuntimeError("File prefix must not be empty")

# First setup the rank
self.setup_rank()

try:
for rank in range(processes):
# Setup the number of processes and the rank for each client
config = {
'hdf': {
'frames': self._frames
}
}
logging.info("Sending config to FP odin adapter %i: %s", rank, config)
self._clients[rank].send_configuration(config)
config = {
'hdf': {
'acquisition_id': self._acquisition_id,
'file': {
'path': str(self._file_path),
'prefix': str(self._file_prefix),
'extension': str(self._file_extension)
}
}
}
logging.info("Sending config to FP odin adapter %i: %s", rank, config)
self._clients[rank].send_configuration(config)
except Exception as err:
logging.error("Failed to send information to FP applications")
logging.error("Error: %s", err)
try:
config = {'hdf': {'write': value}}
for rank in range(processes):
logging.info("Sending config to FP odin adapter %i: %s", rank, config)
#self._odin_adapter_fps._controller.put(f"{rank}/config", config)
self._clients[rank].send_configuration(config)
except Exception as err:
logging.error("Failed to send write command to FP applications")
logging.error("Error: %s", err)

def handle_client(self, client, index):
if "hdf" in client.parameters["status"]:
self._writing[index] = client.parameters["status"]["hdf"]["writing"]
# self._params.set("{}/config/hdf/write".format(index), writing[index])

def setup_rank(self):
# Attempt initialisation of the connected clients
processes = len(self._clients)
logging.info(
"Setting up rank information for {} FP processes".format(processes)
)
rank = 0
try:
for rank in range(processes):
# Setup the number of processes and the rank for each client
config = {"hdf": {"process": {"number": processes, "rank": rank}}}
logging.debug("Sending config to FP odin adapter %i: %s", rank, config)
self._clients[rank].send_configuration(config)

except Exception as err:
logging.debug("Failed to send rank information to FP applications")
logging.error("Error: %s", err)

def process_updates(self):
if not self._first_update:
self.setup_rank()
self._first_update = True
self._write = all(self._writing)

# def check_controller_status(self):
# TODO: Need to check FR buffer status
# return True, ""


class FrameProcessorAdapter(OdinDataAdapter):
_controller_cls = FrameProcessorController
100 changes: 4 additions & 96 deletions python/src/odin_data/control/frame_receiver_adapter.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,9 @@
"""
Created on 2nd August 2018

:author: Alan Greer
"""
import copy
import logging

from odin_data.control.odin_data_adapter import OdinDataAdapter
from odin_data.control.odin_data_controller import OdinDataController


class FrameReceiverAdapter(OdinDataAdapter):
"""
OdinDataAdapter class

This class provides the adapter interface between the ODIN server and the ODIN-DATA detector system,
transforming the REST-like API HTTP verbs into the appropriate frameProcessor ZeroMQ control messages
"""
VERSION_CHECK_CONFIG_ITEMS = ['decoder_path', 'decoder_type']

def __init__(self, **kwargs):
"""
Initialise the OdinDataAdapter object

:param kwargs:
"""
logging.debug("FrameReceiverAdapter init called")

super(FrameReceiverAdapter, self).__init__(**kwargs)

self._decoder_config = []
for ep in self._endpoints:
self._decoder_config.append(None)
FrameReceiverController = OdinDataController

def require_version_check(self, param):
# If the parameter is in the version check list then request a version update
if param in self.VERSION_CHECK_CONFIG_ITEMS:
return True
return False

def send_to_clients(self, request_command, parameters, client_index=-1):
"""
Intercept the base class send_to_clients method.
Keep a record of any decoder specific configuration items and then if a single decoder config
item is later changed send the full decoder configuration to the Frame Receiver application.
This is necessary as often a decoder config change will result in the complete tear down and re-init
of the Decoder class and so a full and consistent set of decoder config parameters are required.

:param request_command:
:param parameters:
:param client_index:
"""
logging.debug("Original index: {} request_command: {} and parameters: {}".format(client_index, request_command, parameters))
command, parameters = self.uri_params_to_dictionary(request_command, parameters)

decoder_config = None
if command is None:
if 'decoder_config' in parameters:
logging.debug("Found decoder config: {}".format(parameters['decoder_config']))
decoder_config = parameters['decoder_config']
elif command == 'decoder_config':
logging.debug("Found decoder config: {}".format(parameters))
decoder_config = parameters

if decoder_config is not None:
if client_index == -1:
for index in range(len(self._decoder_config)):
if self._decoder_config[index] is None:
self._decoder_config[index] = decoder_config
else:
for item in decoder_config:
self._decoder_config[index][item] = decoder_config[item]
else:
if self._decoder_config[client_index] is None:
self._decoder_config[client_index] = decoder_config
else:
for item in decoder_config:
self._decoder_config[client_index][item] = decoder_config[item]

# Now construct the new full message, inserting the full decoder config back into the parameters
if client_index == -1:
new_param_set = []
# Loop over each decoder config item and send a list of commands
for dc in self._decoder_config:
if command is None:
if 'decoder_config' in parameters and dc is not None:
parameters['decoder_config'] = dc
elif command == 'decoder_config':
parameters = dc
new_param_set.append(copy.deepcopy(parameters))
else:
if command is None:
if 'decoder_config' in parameters and self._decoder_config[client_index] is not None:
parameters['decoder_config'] = self._decoder_config[client_index]
elif command == 'decoder_config':
parameters = self._decoder_config[client_index]
new_param_set = parameters

logging.debug("Updated full command: {} and parameter set: {}".format(command, new_param_set))

return super(FrameReceiverAdapter, self).send_to_clients(command, new_param_set, client_index)
class FrameReceiverAdapter(OdinDataAdapter):
_controller_cls = FrameReceiverController
12 changes: 5 additions & 7 deletions python/src/odin_data/control/odin_data_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class OdinDataAdapter(ApiAdapter):
This class provides the adapter interface between the ODIN server and the ODIN-DATA detector system,
transforming the REST-like API HTTP verbs into the appropriate frameProcessor ZeroMQ control messages
"""
_controller_cls = OdinDataController

def __init__(self, **kwargs):
"""
Expand Down Expand Up @@ -57,12 +58,9 @@ def __init__(self, **kwargs):
# Setup the time between client update requests
self._update_interval = float(self.options.get("update_interval", 0.5))

# Check for a Frame Processor flag
self._frame_processor_flag = bool(self.options.get("frame_processor", False))

# Create the Frame Processor Controller object
self._controller = OdinDataController(
self.name, self._endpoint_arg, self._update_interval, self._frame_processor_flag
self._controller = self._controller_cls(
self.name, self._endpoint_arg, self._update_interval
)

@request_types("application/json", "application/vnd.odin-native")
Expand All @@ -85,7 +83,7 @@ def get(self, path, request):
# logging.error("{}".format(response))
except ParameterTreeError as param_error:
response = {
"response": "OdinDatatAdapter GET error: {}".format(param_error)
"response": "OdinDataAdapter GET error: {}".format(param_error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realising this should probably get the class name instead if we're subclassing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Let's see what we decide first.

}
status_code = 400

Expand All @@ -112,7 +110,7 @@ def put(self, path, request): # pylint: disable=W0613
self._controller.put(path, json_decode(request.body))
except ParameterTreeError as param_error:
response = {
"response": "OdinDatatAdapter GET error: {}".format(param_error)
"response": "OdinDataAdapter PUT error: {}".format(param_error)
}
status_code = 400

Expand Down
Loading
Loading