diff --git a/controllers/__init__.py b/controllers/__init__.py index e0de76a..14e69c2 100644 --- a/controllers/__init__.py +++ b/controllers/__init__.py @@ -1,11 +1,11 @@ class BaseController: - def update(self, target_lataccel, current_lataccel, state, target_future): + def update(self, target_lataccel, current_lataccel, state, future_plan): """ Args: target_lataccel: The target lateral acceleration. current_lataccel: The current lateral acceleration. state: The current state of the vehicle. - target_future: The future target lateral acceleration plan for the next N frames. + future_plan: The future plan for the next N frames. Returns: The control signal to be applied to the vehicle. """ diff --git a/controllers/open.py b/controllers/open.py index 7bed29e..014c631 100644 --- a/controllers/open.py +++ b/controllers/open.py @@ -5,5 +5,5 @@ class Controller(BaseController): """ An open-loop controller """ - def update(self, target_lataccel, current_lataccel, state, target_future): + def update(self, target_lataccel, current_lataccel, state, future_plan): return target_lataccel diff --git a/controllers/simple.py b/controllers/simple.py index 45f0d6f..d009535 100644 --- a/controllers/simple.py +++ b/controllers/simple.py @@ -5,5 +5,5 @@ class Controller(BaseController): """ A simple controller that is the error between the target and current lateral acceleration times some factor """ - def update(self, target_lataccel, current_lataccel, state, target_future): + def update(self, target_lataccel, current_lataccel, state, future_plan): return (target_lataccel - current_lataccel) * 0.3 diff --git a/tinyphysics.py b/tinyphysics.py index 91af41a..fcc5b32 100644 --- a/tinyphysics.py +++ b/tinyphysics.py @@ -34,6 +34,7 @@ FUTURE_PLAN_STEPS = FPS * 5 # 5 secs State = namedtuple('State', ['roll_lataccel', 'v_ego', 'a_ego']) +FuturePlan = namedtuple('FuturePlan', ['lataccel', 'roll_lataccel', 'v_ego', 'a_ego']) class LataccelTokenizer: @@ -99,11 +100,11 @@ def __init__(self, model: TinyPhysicsModel, data_path: str, controller: BaseCont def reset(self) -> None: self.step_idx = CONTEXT_LENGTH - state_targetfutures = [self.get_state_targetfuture(i) for i in range(self.step_idx)] - self.state_history = [x['state'] for x in state_targetfutures] + state_target_futureplans = [self.get_state_target_futureplan(i) for i in range(self.step_idx)] + self.state_history = [x[0] for x in state_target_futureplans] self.action_history = self.data['steer_command'].values[:self.step_idx].tolist() - self.current_lataccel_history = [x['targetfuture'][0] for x in state_targetfutures] - self.target_lataccel_history = [x['targetfuture'][0] for x in state_targetfutures] + self.current_lataccel_history = [x[1] for x in state_target_futureplans] + self.target_lataccel_history = [x[1] for x in state_target_futureplans] self.target_future = None self.current_lataccel = self.current_lataccel_history[-1] seed = int(md5(self.data_path.encode()).hexdigest(), 16) % 10**4 @@ -130,29 +131,35 @@ def sim_step(self, step_idx: int) -> None: if step_idx >= CONTROL_START_IDX: self.current_lataccel = pred else: - self.current_lataccel = self.get_state_targetfuture(step_idx)['targetfuture'][0] + self.current_lataccel = self.get_state_target_futureplan(step_idx)[1] self.current_lataccel_history.append(self.current_lataccel) def control_step(self, step_idx: int) -> None: - action = self.controller.update(self.target_lataccel_history[step_idx], self.current_lataccel, self.state_history[step_idx], target_future=self.target_future) + action = self.controller.update(self.target_lataccel_history[step_idx], self.current_lataccel, self.state_history[step_idx], future_plan=self.futureplan) if step_idx < CONTROL_START_IDX: action = self.data['steer_command'].values[step_idx] action = np.clip(action, STEER_RANGE[0], STEER_RANGE[1]) self.action_history.append(action) - def get_state_targetfuture(self, step_idx: int) -> Tuple[State, float]: + def get_state_target_futureplan(self, step_idx: int) -> Tuple[State, float]: state = self.data.iloc[step_idx] - return { - 'state': State(roll_lataccel=state['roll_lataccel'], v_ego=state['v_ego'], a_ego=state['a_ego']), - 'targetfuture': self.data['target_lataccel'].values[step_idx:step_idx + FUTURE_PLAN_STEPS].tolist() - } + return ( + State(roll_lataccel=state['roll_lataccel'], v_ego=state['v_ego'], a_ego=state['a_ego']), + state['target_lataccel'], + FuturePlan( + lataccel=self.data['target_lataccel'].values[step_idx + 1 :step_idx + FUTURE_PLAN_STEPS].tolist(), + roll_lataccel=self.data['roll_lataccel'].values[step_idx + 1 :step_idx + FUTURE_PLAN_STEPS].tolist(), + v_ego=self.data['v_ego'].values[step_idx + 1 :step_idx + FUTURE_PLAN_STEPS].tolist(), + a_ego=self.data['a_ego'].values[step_idx + 1 :step_idx + FUTURE_PLAN_STEPS].tolist() + ) + ) def step(self) -> None: - state_targetfuture = self.get_state_targetfuture(self.step_idx) - self.state_history.append(state_targetfuture['state']) - self.target_lataccel_history.append(state_targetfuture['targetfuture'][0]) - self.target_future = state_targetfuture['targetfuture'][1:] + state, target, futureplan = self.get_state_target_futureplan(self.step_idx) + self.state_history.append(state) + self.target_lataccel_history.append(target) + self.futureplan = futureplan self.control_step(self.step_idx) self.sim_step(self.step_idx) self.step_idx += 1