From 23d7d0c19e0b43d26d077dcafcd2190c9a9d6910 Mon Sep 17 00:00:00 2001 From: pseudo-rnd-thoughts Date: Fri, 9 Aug 2024 17:38:54 +0100 Subject: [PATCH] Update pre-commit configuration --- .github/workflows/build-docs-version.yml | 2 +- .../workflows/manual-build-docs-version.yml | 2 +- .github/workflows/release.yml | 2 +- .gitignore | 1 - .pre-commit-config.yaml | 37 +- README.md | 2 - docs/404.md | 2 +- docs/bibliography/biblio.bib | 2 +- docs/conf.py | 1 + docs/index.md | 2 +- docs/observations/index.md | 22 +- docs/scripts/move_404.py | 1 + docs/user_guide.md | 2 +- highway_env/__init__.py | 40 +- highway_env/envs/__init__.py | 35 +- highway_env/envs/common/abstract.py | 42 +- highway_env/envs/common/action.py | 54 +- highway_env/envs/common/finite_mdp.py | 13 +- highway_env/envs/common/graphics.py | 18 +- highway_env/envs/common/observation.py | 77 +- highway_env/envs/exit_env.py | 10 +- highway_env/envs/highway_env.py | 5 +- highway_env/envs/intersection_env.py | 12 +- highway_env/envs/lane_keeping_env.py | 6 +- highway_env/envs/merge_env.py | 5 +- highway_env/envs/parking_env.py | 7 +- highway_env/envs/racetrack_env.py | 4 +- highway_env/envs/roundabout_env.py | 4 +- highway_env/envs/two_way_env.py | 5 +- highway_env/envs/u_turn_env.py | 5 +- highway_env/interval.py | 16 +- highway_env/road/graphics.py | 31 +- highway_env/road/lane.py | 44 +- highway_env/road/regulation.py | 8 +- highway_env/road/road.py | 54 +- highway_env/road/spline.py | 18 +- highway_env/utils.py | 23 +- highway_env/vehicle/behavior.py | 14 +- highway_env/vehicle/controller.py | 1 - highway_env/vehicle/dynamics.py | 16 +- highway_env/vehicle/graphics.py | 17 +- highway_env/vehicle/kinematics.py | 24 +- highway_env/vehicle/objects.py | 24 +- highway_env/vehicle/uncertainty/estimation.py | 9 +- highway_env/vehicle/uncertainty/prediction.py | 18 +- pyproject.toml | 10 +- scripts/parking_model_based.ipynb | 1232 ++++++++--------- scripts/sb3_highway_dqn.ipynb | 344 ++--- scripts/sb3_highway_dqn.py | 1 + scripts/sb3_highway_ppo.py | 1 + scripts/sb3_highway_ppo_transformer.py | 7 +- scripts/sb3_racetracks_ppo.py | 1 + scripts/utils.py | 1 + setup.py | 1 + tests/envs/test_actions.py | 1 + tests/envs/test_env_preprocessors.py | 1 + tests/envs/test_gym.py | 1 + tests/envs/test_time.py | 3 +- tests/graphics/test_render.py | 1 + tests/vehicle/test_behavior.py | 1 + tests/vehicle/test_control.py | 1 + tests/vehicle/test_dynamics.py | 1 + tests/vehicle/test_uncertainty.py | 1 + 63 files changed, 1207 insertions(+), 1139 deletions(-) diff --git a/.github/workflows/build-docs-version.yml b/.github/workflows/build-docs-version.yml index 42f048605..4e8cf14f8 100644 --- a/.github/workflows/build-docs-version.yml +++ b/.github/workflows/build-docs-version.yml @@ -55,4 +55,4 @@ jobs: folder: _build clean-exclude: | v*.*/ - main \ No newline at end of file + main diff --git a/.github/workflows/manual-build-docs-version.yml b/.github/workflows/manual-build-docs-version.yml index 7584ffe8d..f9730ba69 100644 --- a/.github/workflows/manual-build-docs-version.yml +++ b/.github/workflows/manual-build-docs-version.yml @@ -67,4 +67,4 @@ jobs: folder: _build clean-exclude: | v*.*/ - main \ No newline at end of file + main diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7a3123289..7f3721ccf 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,4 +23,4 @@ jobs: uses: pypa/gh-action-pypi-publish@master with: user: __token__ - password: ${{ secrets.pypi_password }} \ No newline at end of file + password: ${{ secrets.pypi_password }} diff --git a/.gitignore b/.gitignore index b44828aa0..2683462d1 100644 --- a/.gitignore +++ b/.gitignore @@ -136,4 +136,3 @@ dmypy.json # Cython debug symbols cython_debug/ - diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cc95bb980..f4672b7ab 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,24 +1,49 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.6.0 + hooks: + - id: check-symlinks + - id: destroyed-symlinks + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-toml + - id: check-ast + - id: check-added-large-files + - id: check-merge-conflict + - id: check-executables-have-shebangs + - id: check-shebang-scripts-are-executable + - id: detect-private-key + - id: debug-statements + - repo: https://github.com/codespell-project/codespell + rev: v2.3.0 + hooks: + - id: codespell + args: + - --ignore-words-list=ser,theses,bu,ois - repo: https://github.com/PyCQA/flake8 - rev: 6.0.0 + rev: 7.0.0 hooks: - id: flake8 args: - - '--per-file-ignores=**/__init__.py:F401,F403,E402' +# - '--per-file-ignores=*/__init__.py:F401' - --ignore=E203,W503,E741,E731 - --max-complexity=30 - --max-line-length=456 - --show-source - --statistics + - repo: https://github.com/asottile/pyupgrade + rev: v3.16.0 + hooks: + - id: pyupgrade + args: ["--py38-plus"] - repo: https://github.com/PyCQA/isort - rev: 5.12.0 + rev: 5.13.2 hooks: - id: isort - args: ["--profile", "black"] - exclude: "__init__.py" - repo: https://github.com/python/black - rev: 23.3.0 + rev: 24.4.2 hooks: - id: black diff --git a/README.md b/README.md index 0d6cb63a4..b5b5ef2c8 100644 --- a/README.md +++ b/README.md @@ -235,5 +235,3 @@ Master theses * [Multi-Agent Reinforcement Learning with Application on Traffic Flow Control](https://www.diva-portal.org/smash/get/diva2:1573441/FULLTEXT01.pdf) (Jun 2021) * [Deep Reinforcement Learning for Automated Parking](https://repositorio-aberto.up.pt/bitstream/10216/136074/2/494682.pdf) (Aug 2021) * [Deep Reinforcement Learning and Imitation Learning for Autonomous Driving in a Minimalist Environment](https://www.academia.edu/107587654/Deep_Reinforcement_Learning_and_Imitation_Learning_for_Autonomous_Driving_in_a_Minimalist_Environment) (Jun 2021) - - diff --git a/docs/404.md b/docs/404.md index 1c5fd74e7..ff59d69dc 100644 --- a/docs/404.md +++ b/docs/404.md @@ -5,4 +5,4 @@ hide-toc: true # 404 -## Page Not Found \ No newline at end of file +## Page Not Found diff --git a/docs/bibliography/biblio.bib b/docs/bibliography/biblio.bib index 55e506c8d..18318cbf9 100644 --- a/docs/bibliography/biblio.bib +++ b/docs/bibliography/biblio.bib @@ -77,4 +77,4 @@ @misc{Qi2017pointnet eprint={1612.00593}, archivePrefix={arXiv}, primaryClass={cs.CV} -} \ No newline at end of file +} diff --git a/docs/conf.py b/docs/conf.py index efc6e3a74..610cb933d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -20,6 +20,7 @@ import highway_env # noqa: F401 + project = "highway-env" copyright = "2023 Farama Foundation" author = "Farama Foundation" diff --git a/docs/index.md b/docs/index.md index 94ea89166..02ce7eec1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -42,4 +42,4 @@ bibliography/index Github Contribute to the Docs -``` \ No newline at end of file +``` diff --git a/docs/observations/index.md b/docs/observations/index.md index 693190349..6ec4b7e9e 100644 --- a/docs/observations/index.md +++ b/docs/observations/index.md @@ -245,10 +245,10 @@ For instance, consider a vehicle at 25m on the right-lane of the ego-vehicle and ```{eval-rst} .. table:: $15$ m/s -== == == == == == == == == == -0 0 0 0 0 0 0 0 0 0 -0 0 0 0 0 0 0 0 0 0 -0 0 0 0 0 0 0 0 0 0 +== == == == == == == == == == +0 0 0 0 0 0 0 0 0 0 +0 0 0 0 0 0 0 0 0 0 +0 0 0 0 0 0 0 0 0 0 == == == == == == == == == == ``` @@ -287,18 +287,18 @@ The top row corresponds to the left-lane, the middle row corresponds to the lane The {py:class}`~highway_env.envs.common.observation.LidarObservation` divides the space around the vehicle into angular sectors, and returns an array with one row per angular sector and two columns: - distance to the nearest collidable object (vehicles or obstacles) - - component of the objects's relative velocity along that direction + - component of the objects's relative velocity along that direction -The angular sector of index 0 corresponds to an angle 0 (east), and then each index/sector increases the angle (south, west, north). +The angular sector of index 0 corresponds to an angle 0 (east), and then each index/sector increases the angle (south, west, north). For example, for a grid of 8 cells, an obstacle 10 meters away in the south and moving towards the north at 1m/s would lead to the following observation: - + ```{eval-rst} -.. table:: the Lidar observation +.. table:: the Lidar observation === === - 0 0 - 0 0 + 0 0 + 0 0 10 -1 0 0 0 0 @@ -324,7 +324,7 @@ Here is an example of what the distance grid may look like in the parking env: "vehicles_count": 3, }) env.reset() - + plt.imshow(env.render()) plt.show() ``` diff --git a/docs/scripts/move_404.py b/docs/scripts/move_404.py index ef6a18037..e46e1682c 100644 --- a/docs/scripts/move_404.py +++ b/docs/scripts/move_404.py @@ -2,6 +2,7 @@ import sys + if __name__ == "__main__": if len(sys.argv) < 2: print("Provide a path") diff --git a/docs/user_guide.md b/docs/user_guide.md index 712498a07..d1d7b2517 100644 --- a/docs/user_guide.md +++ b/docs/user_guide.md @@ -11,4 +11,4 @@ rewards/index graphics/index multi_agent make_your_own -``` \ No newline at end of file +``` diff --git a/highway_env/__init__.py b/highway_env/__init__.py index 28fc83265..1b4659fda 100644 --- a/highway_env/__init__.py +++ b/highway_env/__init__.py @@ -1,6 +1,9 @@ import os import sys +import gymnasium as gym + + __version__ = "1.9.0" try: @@ -15,99 +18,98 @@ # Hide pygame support prompt os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" -from gymnasium.envs.registration import register -from highway_env.envs.common.abstract import MultiAgentWrapper - def _register_highway_envs(): """Import the envs module so that envs register themselves.""" + from highway_env.envs.common.abstract import MultiAgentWrapper + # exit_env.py - register( + gym.register( id="exit-v0", entry_point="highway_env.envs:ExitEnv", ) # highway_env.py - register( + gym.register( id="highway-v0", entry_point="highway_env.envs:HighwayEnv", ) - register( + gym.register( id="highway-fast-v0", entry_point="highway_env.envs:HighwayEnvFast", ) # intersection_env.py - register( + gym.register( id="intersection-v0", entry_point="highway_env.envs:IntersectionEnv", ) - register( + gym.register( id="intersection-v1", entry_point="highway_env.envs:ContinuousIntersectionEnv", ) - register( + gym.register( id="intersection-multi-agent-v0", entry_point="highway_env.envs:MultiAgentIntersectionEnv", ) - register( + gym.register( id="intersection-multi-agent-v1", entry_point="highway_env.envs:MultiAgentIntersectionEnv", additional_wrappers=(MultiAgentWrapper.wrapper_spec(),), ) # lane_keeping_env.py - register( + gym.register( id="lane-keeping-v0", entry_point="highway_env.envs:LaneKeepingEnv", max_episode_steps=200, ) # merge_env.py - register( + gym.register( id="merge-v0", entry_point="highway_env.envs:MergeEnv", ) # parking_env.py - register( + gym.register( id="parking-v0", entry_point="highway_env.envs:ParkingEnv", ) - register( + gym.register( id="parking-ActionRepeat-v0", entry_point="highway_env.envs:ParkingEnvActionRepeat", ) - register( + gym.register( id="parking-parked-v0", entry_point="highway_env.envs:ParkingEnvParkedVehicles" ) # racetrack_env.py - register( + gym.register( id="racetrack-v0", entry_point="highway_env.envs:RacetrackEnv", ) # roundabout_env.py - register( + gym.register( id="roundabout-v0", entry_point="highway_env.envs:RoundaboutEnv", ) # two_way_env.py - register( + gym.register( id="two-way-v0", entry_point="highway_env.envs:TwoWayEnv", max_episode_steps=15 ) # u_turn_env.py - register(id="u-turn-v0", entry_point="highway_env.envs:UTurnEnv") + gym.register(id="u-turn-v0", entry_point="highway_env.envs:UTurnEnv") _register_highway_envs() diff --git a/highway_env/envs/__init__.py b/highway_env/envs/__init__.py index 6d6f5c9a7..a9515963e 100644 --- a/highway_env/envs/__init__.py +++ b/highway_env/envs/__init__.py @@ -1,10 +1,25 @@ -from highway_env.envs.highway_env import * -from highway_env.envs.merge_env import * -from highway_env.envs.parking_env import * -from highway_env.envs.roundabout_env import * -from highway_env.envs.two_way_env import * -from highway_env.envs.intersection_env import * -from highway_env.envs.lane_keeping_env import * -from highway_env.envs.u_turn_env import * -from highway_env.envs.exit_env import * -from highway_env.envs.racetrack_env import * +from highway_env.envs.exit_env import ExitEnv +from highway_env.envs.highway_env import HighwayEnv, HighwayEnvFast +from highway_env.envs.intersection_env import IntersectionEnv +from highway_env.envs.lane_keeping_env import LaneKeepingEnv +from highway_env.envs.merge_env import MergeEnv +from highway_env.envs.parking_env import ParkingEnv +from highway_env.envs.racetrack_env import RacetrackEnv +from highway_env.envs.roundabout_env import RoundaboutEnv +from highway_env.envs.two_way_env import TwoWayEnv +from highway_env.envs.u_turn_env import UTurnEnv + + +__all__ = [ + "ExitEnv", + "HighwayEnv", + "HighwayEnvFast", + "IntersectionEnv", + "LaneKeepingEnv", + "MergeEnv", + "ParkingEnv", + "RacetrackEnv", + "RoundaboutEnv", + "TwoWayEnv", + "UTurnEnv", +] diff --git a/highway_env/envs/common/abstract.py b/highway_env/envs/common/abstract.py index 1c8b62e7a..c3d2a069a 100644 --- a/highway_env/envs/common/abstract.py +++ b/highway_env/envs/common/abstract.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import copy import os -from typing import Dict, List, Optional, Text, Tuple, TypeVar +from typing import TypeVar import gymnasium as gym import numpy as np @@ -15,11 +17,11 @@ from highway_env.vehicle.behavior import IDMVehicle from highway_env.vehicle.kinematics import Vehicle + Observation = TypeVar("Observation") class AbstractEnv(gym.Env): - """ A generic environment for various tasks involving a vehicle driving on a road. @@ -30,7 +32,7 @@ class AbstractEnv(gym.Env): observation_type: ObservationType action_type: ActionType - _record_video_wrapper: Optional[RecordVideo] + _record_video_wrapper: RecordVideo | None metadata = { "render_modes": ["human", "rgb_array"], } @@ -38,7 +40,7 @@ class AbstractEnv(gym.Env): PERCEPTION_DISTANCE = 5.0 * Vehicle.MAX_SPEED """The maximum distance of any vehicle present in the observation [m]""" - def __init__(self, config: dict = None, render_mode: Optional[str] = None) -> None: + def __init__(self, config: dict = None, render_mode: str | None = None) -> None: super().__init__() # Configuration @@ -135,7 +137,7 @@ def _reward(self, action: Action) -> float: """ raise NotImplementedError - def _rewards(self, action: Action) -> Dict[Text, float]: + def _rewards(self, action: Action) -> dict[str, float]: """ Returns a multi-objective vector of rewards. @@ -163,7 +165,7 @@ def _is_truncated(self) -> bool: """ raise NotImplementedError - def _info(self, obs: Observation, action: Optional[Action] = None) -> dict: + def _info(self, obs: Observation, action: Action | None = None) -> dict: """ Return a dictionary of additional information @@ -185,9 +187,9 @@ def _info(self, obs: Observation, action: Optional[Action] = None) -> dict: def reset( self, *, - seed: Optional[int] = None, - options: Optional[dict] = None, - ) -> Tuple[Observation, dict]: + seed: int | None = None, + options: dict | None = None, + ) -> tuple[Observation, dict]: """ Reset the environment to it's initial configuration @@ -218,7 +220,7 @@ def _reset(self) -> None: """ raise NotImplementedError() - def step(self, action: Action) -> Tuple[Observation, float, bool, bool, dict]: + def step(self, action: Action) -> tuple[Observation, float, bool, bool, dict]: """ Perform an action and step the environment dynamics. @@ -246,7 +248,7 @@ def step(self, action: Action) -> Tuple[Observation, float, bool, bool, dict]: return obs, reward, terminated, truncated, info - def _simulate(self, action: Optional[Action] = None) -> None: + def _simulate(self, action: Action | None = None) -> None: """Perform several steps of simulation with constant action.""" frames = int( self.config["simulation_frequency"] // self.config["policy_frequency"] @@ -278,7 +280,7 @@ def _simulate(self, action: Optional[Action] = None) -> None: self.enable_auto_render = False - def render(self) -> Optional[np.ndarray]: + def render(self) -> np.ndarray | None: """ Render the environment. @@ -316,7 +318,7 @@ def close(self) -> None: self.viewer.close() self.viewer = None - def get_available_actions(self) -> List[int]: + def get_available_actions(self) -> list[int]: return self.action_type.get_available_actions() def set_record_video_wrapper(self, wrapper: RecordVideo): @@ -336,7 +338,7 @@ def _automatic_rendering(self) -> None: else: self.render() - def simplify(self) -> "AbstractEnv": + def simplify(self) -> AbstractEnv: """ Return a simplified copy of the environment where distant vehicles have been removed from the road. @@ -353,7 +355,7 @@ def simplify(self) -> "AbstractEnv": return state_copy - def change_vehicles(self, vehicle_class_path: str) -> "AbstractEnv": + def change_vehicles(self, vehicle_class_path: str) -> AbstractEnv: """ Change the type of all vehicles on the road @@ -370,7 +372,7 @@ def change_vehicles(self, vehicle_class_path: str) -> "AbstractEnv": vehicles[i] = vehicle_class.create_from(v) return env_copy - def set_preferred_lane(self, preferred_lane: int = None) -> "AbstractEnv": + def set_preferred_lane(self, preferred_lane: int = None) -> AbstractEnv: env_copy = copy.deepcopy(self) if preferred_lane: for v in env_copy.road.vehicles: @@ -380,14 +382,14 @@ def set_preferred_lane(self, preferred_lane: int = None) -> "AbstractEnv": v.LANE_CHANGE_MAX_BRAKING_IMPOSED = 1000 return env_copy - def set_route_at_intersection(self, _to: str) -> "AbstractEnv": + def set_route_at_intersection(self, _to: str) -> AbstractEnv: env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): v.set_route_at_intersection(_to) return env_copy - def set_vehicle_field(self, args: Tuple[str, object]) -> "AbstractEnv": + def set_vehicle_field(self, args: tuple[str, object]) -> AbstractEnv: field, value = args env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: @@ -395,7 +397,7 @@ def set_vehicle_field(self, args: Tuple[str, object]) -> "AbstractEnv": setattr(v, field, value) return env_copy - def call_vehicle_method(self, args: Tuple[str, Tuple[object]]) -> "AbstractEnv": + def call_vehicle_method(self, args: tuple[str, tuple[object]]) -> AbstractEnv: method, method_args = args env_copy = copy.deepcopy(self) for i, v in enumerate(env_copy.road.vehicles): @@ -403,7 +405,7 @@ def call_vehicle_method(self, args: Tuple[str, Tuple[object]]) -> "AbstractEnv": env_copy.road.vehicles[i] = getattr(v, method)(*method_args) return env_copy - def randomize_behavior(self) -> "AbstractEnv": + def randomize_behavior(self) -> AbstractEnv: env_copy = copy.deepcopy(self) for v in env_copy.road.vehicles: if isinstance(v, IDMVehicle): diff --git a/highway_env/envs/common/action.py b/highway_env/envs/common/action.py index e52d445bb..e63fdb2cb 100644 --- a/highway_env/envs/common/action.py +++ b/highway_env/envs/common/action.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import functools import itertools -from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Callable, Union import numpy as np from gymnasium import spaces @@ -11,17 +13,17 @@ from highway_env.vehicle.dynamics import BicycleVehicle from highway_env.vehicle.kinematics import Vehicle + if TYPE_CHECKING: from highway_env.envs.common.abstract import AbstractEnv Action = Union[int, np.ndarray] -class ActionType(object): - +class ActionType: """A type of action specifies its definition space, and how actions are executed in the environment""" - def __init__(self, env: "AbstractEnv", **kwargs) -> None: + def __init__(self, env: AbstractEnv, **kwargs) -> None: self.env = env self.__controlled_vehicle = None @@ -69,7 +71,6 @@ def controlled_vehicle(self, vehicle): class ContinuousAction(ActionType): - """ An continuous action space for throttle and/or steering angle. @@ -86,15 +87,15 @@ class ContinuousAction(ActionType): def __init__( self, - env: "AbstractEnv", - acceleration_range: Optional[Tuple[float, float]] = None, - steering_range: Optional[Tuple[float, float]] = None, - speed_range: Optional[Tuple[float, float]] = None, + env: AbstractEnv, + acceleration_range: tuple[float, float] | None = None, + steering_range: tuple[float, float] | None = None, + speed_range: tuple[float, float] | None = None, longitudinal: bool = True, lateral: bool = True, dynamical: bool = False, clip: bool = True, - **kwargs + **kwargs, ) -> None: """ Create a continuous action space. @@ -164,15 +165,15 @@ def act(self, action: np.ndarray) -> None: class DiscreteAction(ContinuousAction): def __init__( self, - env: "AbstractEnv", - acceleration_range: Optional[Tuple[float, float]] = None, - steering_range: Optional[Tuple[float, float]] = None, + env: AbstractEnv, + acceleration_range: tuple[float, float] | None = None, + steering_range: tuple[float, float] | None = None, longitudinal: bool = True, lateral: bool = True, dynamical: bool = False, clip: bool = True, actions_per_axis: int = 3, - **kwargs + **kwargs, ) -> None: super().__init__( env, @@ -196,7 +197,6 @@ def act(self, action: int) -> None: class DiscreteMetaAction(ActionType): - """ An discrete action space of meta-actions: lane changes, and cruise control set-point. """ @@ -212,11 +212,11 @@ class DiscreteMetaAction(ActionType): def __init__( self, - env: "AbstractEnv", + env: AbstractEnv, longitudinal: bool = True, lateral: bool = True, - target_speeds: Optional[Vector] = None, - **kwargs + target_speeds: Vector | None = None, + **kwargs, ) -> None: """ Create a discrete action space of meta-actions. @@ -237,11 +237,11 @@ def __init__( self.actions = ( self.ACTIONS_ALL if longitudinal and lateral - else self.ACTIONS_LONGI - if longitudinal - else self.ACTIONS_LAT - if lateral - else None + else ( + self.ACTIONS_LONGI + if longitudinal + else self.ACTIONS_LAT if lateral else None + ) ) if self.actions is None: raise ValueError( @@ -256,10 +256,10 @@ def space(self) -> spaces.Space: def vehicle_class(self) -> Callable: return functools.partial(MDPVehicle, target_speeds=self.target_speeds) - def act(self, action: Union[int, np.ndarray]) -> None: + def act(self, action: int | np.ndarray) -> None: self.controlled_vehicle.act(self.actions[int(action)]) - def get_available_actions(self) -> List[int]: + def get_available_actions(self) -> list[int]: """ Get the list of currently available actions. @@ -299,7 +299,7 @@ def get_available_actions(self) -> List[int]: class MultiAgentAction(ActionType): - def __init__(self, env: "AbstractEnv", action_config: dict, **kwargs) -> None: + def __init__(self, env: AbstractEnv, action_config: dict, **kwargs) -> None: super().__init__(env) self.action_config = action_config self.agents_action_types = [] @@ -331,7 +331,7 @@ def get_available_actions(self): ) -def action_factory(env: "AbstractEnv", config: dict) -> ActionType: +def action_factory(env: AbstractEnv, config: dict) -> ActionType: if config["type"] == "ContinuousAction": return ContinuousAction(env, **config) if config["type"] == "DiscreteAction": diff --git a/highway_env/envs/common/finite_mdp.py b/highway_env/envs/common/finite_mdp.py index 96ebda00d..e64104719 100644 --- a/highway_env/envs/common/finite_mdp.py +++ b/highway_env/envs/common/finite_mdp.py @@ -1,18 +1,21 @@ +from __future__ import annotations + import importlib from functools import partial -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING import numpy as np from highway_env import utils from highway_env.vehicle.kinematics import Vehicle + if TYPE_CHECKING: from highway_env.envs import AbstractEnv def finite_mdp( - env: "AbstractEnv", time_quantization: float = 1.0, horizon: float = 10.0 + env: AbstractEnv, time_quantization: float = 1.0, horizon: float = 10.0 ) -> object: """ Time-To-Collision (TTC) representation of the state. @@ -94,15 +97,15 @@ def finite_mdp( return mdp except ModuleNotFoundError as e: raise ModuleNotFoundError( - "The finite_mdp module is required for conversion. {}".format(e) + f"The finite_mdp module is required for conversion. {e}" ) def compute_ttc_grid( - env: "AbstractEnv", + env: AbstractEnv, time_quantization: float, horizon: float, - vehicle: Optional[Vehicle] = None, + vehicle: Vehicle | None = None, ) -> np.ndarray: """ Compute the grid of predicted time-to-collision to each vehicle within the lane diff --git a/highway_env/envs/common/graphics.py b/highway_env/envs/common/graphics.py index bcda69599..d3113991f 100644 --- a/highway_env/envs/common/graphics.py +++ b/highway_env/envs/common/graphics.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import os -from typing import TYPE_CHECKING, Callable, List, Optional +from typing import TYPE_CHECKING, Callable import numpy as np import pygame @@ -12,19 +14,19 @@ from highway_env.road.graphics import RoadGraphics, WorldSurface from highway_env.vehicle.graphics import VehicleGraphics + if TYPE_CHECKING: from highway_env.envs import AbstractEnv from highway_env.envs.common.abstract import Action -class EnvViewer(object): - +class EnvViewer: """A viewer to render a highway driving environment.""" SAVE_IMAGES = False agent_display = None - def __init__(self, env: "AbstractEnv", config: Optional[dict] = None) -> None: + def __init__(self, env: AbstractEnv, config: dict | None = None) -> None: self.env = env self.config = config or env.config self.offscreen = self.config["offscreen_rendering"] @@ -86,7 +88,7 @@ def extend_display(self) -> None: (self.config["screen_width"], self.config["screen_height"]) ) - def set_agent_action_sequence(self, actions: List["Action"]) -> None: + def set_agent_action_sequence(self, actions: list[Action]) -> None: """ Set the sequence of actions chosen by the agent, so that it can be displayed @@ -160,7 +162,7 @@ def display(self) -> None: if self.SAVE_IMAGES and self.directory: pygame.image.save( self.sim_surface, - str(self.directory / "highway-env_{}.png".format(self.frame)), + str(self.directory / f"highway-env_{self.frame}.png"), ) self.frame += 1 @@ -192,7 +194,7 @@ def close(self) -> None: pygame.quit() -class EventHandler(object): +class EventHandler: @classmethod def handle_event( cls, action_type: ActionType, event: pygame.event.EventType @@ -249,7 +251,7 @@ def handle_continuous_action_event( action_type.act(action) -class ObservationGraphics(object): +class ObservationGraphics: COLOR = (0, 0, 0) @classmethod diff --git a/highway_env/envs/common/observation.py b/highway_env/envs/common/observation.py index 02f9de5a3..e94fd8ed2 100644 --- a/highway_env/envs/common/observation.py +++ b/highway_env/envs/common/observation.py @@ -1,6 +1,8 @@ +from __future__ import annotations + from collections import OrderedDict from itertools import product -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING import numpy as np import pandas as pd @@ -13,12 +15,13 @@ from highway_env.utils import Vector from highway_env.vehicle.kinematics import Vehicle + if TYPE_CHECKING: from highway_env.envs.common.abstract import AbstractEnv -class ObservationType(object): - def __init__(self, env: "AbstractEnv", **kwargs) -> None: +class ObservationType: + def __init__(self, env: AbstractEnv, **kwargs) -> None: self.env = env self.__observer_vehicle = None @@ -45,7 +48,6 @@ def observer_vehicle(self, vehicle): class GrayscaleObservation(ObservationType): - """ An observation class that collects directly what the simulator renders. @@ -64,13 +66,13 @@ class GrayscaleObservation(ObservationType): def __init__( self, - env: "AbstractEnv", - observation_shape: Tuple[int, int], + env: AbstractEnv, + observation_shape: tuple[int, int], stack_size: int, - weights: List[float], - scaling: Optional[float] = None, - centering_position: Optional[List[float]] = None, - **kwargs + weights: list[float], + scaling: float | None = None, + centering_position: list[float] | None = None, + **kwargs, ) -> None: super().__init__(env) self.observation_shape = observation_shape @@ -110,7 +112,7 @@ def _render_to_grayscale(self) -> np.ndarray: class TimeToCollisionObservation(ObservationType): - def __init__(self, env: "AbstractEnv", horizon: int = 10, **kwargs: dict) -> None: + def __init__(self, env: AbstractEnv, horizon: int = 10, **kwargs: dict) -> None: super().__init__(env) self.horizon = horizon @@ -150,17 +152,16 @@ def observe(self) -> np.ndarray: class KinematicObservation(ObservationType): - """Observe the kinematics of nearby vehicles.""" - FEATURES: List[str] = ["presence", "x", "y", "vx", "vy"] + FEATURES: list[str] = ["presence", "x", "y", "vx", "vy"] def __init__( self, - env: "AbstractEnv", - features: List[str] = None, + env: AbstractEnv, + features: list[str] = None, vehicles_count: int = 5, - features_range: Dict[str, List[float]] = None, + features_range: dict[str, list[float]] = None, absolute: bool = False, order: str = "sorted", normalize: bool = True, @@ -168,7 +169,7 @@ def __init__( see_behind: bool = False, observe_intentions: bool = False, include_obstacles: bool = True, - **kwargs: dict + **kwargs: dict, ) -> None: """ :param env: The environment to observe @@ -275,25 +276,24 @@ def observe(self) -> np.ndarray: class OccupancyGridObservation(ObservationType): - """Observe an occupancy grid of nearby vehicles.""" - FEATURES: List[str] = ["presence", "vx", "vy", "on_road"] - GRID_SIZE: List[List[float]] = [[-5.5 * 5, 5.5 * 5], [-5.5 * 5, 5.5 * 5]] - GRID_STEP: List[int] = [5, 5] + FEATURES: list[str] = ["presence", "vx", "vy", "on_road"] + GRID_SIZE: list[list[float]] = [[-5.5 * 5, 5.5 * 5], [-5.5 * 5, 5.5 * 5]] + GRID_STEP: list[int] = [5, 5] def __init__( self, - env: "AbstractEnv", - features: Optional[List[str]] = None, - grid_size: Optional[Tuple[Tuple[float, float], Tuple[float, float]]] = None, - grid_step: Optional[Tuple[float, float]] = None, - features_range: Dict[str, List[float]] = None, + env: AbstractEnv, + features: list[str] | None = None, + grid_size: tuple[tuple[float, float], tuple[float, float]] | None = None, + grid_step: tuple[float, float] | None = None, + features_range: dict[str, list[float]] = None, absolute: bool = False, align_to_vehicle_axes: bool = False, clip: bool = True, as_image: bool = False, - **kwargs: dict + **kwargs: dict, ) -> None: """ :param env: The environment to observe @@ -411,7 +411,7 @@ def observe(self) -> np.ndarray: return obs - def pos_to_index(self, position: Vector, relative: bool = False) -> Tuple[int, int]: + def pos_to_index(self, position: Vector, relative: bool = False) -> tuple[int, int]: """ Convert a world position to a grid cell index @@ -433,7 +433,7 @@ def pos_to_index(self, position: Vector, relative: bool = False) -> Tuple[int, i int(np.floor((position[1] - self.grid_size[1, 0]) / self.grid_step[1])), ) - def index_to_pos(self, index: Tuple[int, int]) -> np.ndarray: + def index_to_pos(self, index: tuple[int, int]) -> np.ndarray: position = np.array( [ (index[0] + 0.5) * self.grid_step[0] + self.grid_size[0, 0], @@ -499,7 +499,7 @@ def fill_road_layer_by_cell(self, layer_index) -> None: class KinematicsGoalObservation(KinematicObservation): - def __init__(self, env: "AbstractEnv", scales: List[float], **kwargs: dict) -> None: + def __init__(self, env: AbstractEnv, scales: list[float], **kwargs: dict) -> None: self.scales = np.array(scales) super().__init__(env, **kwargs) @@ -531,7 +531,7 @@ def space(self) -> spaces.Space: except AttributeError: return spaces.Space() - def observe(self) -> Dict[str, np.ndarray]: + def observe(self) -> dict[str, np.ndarray]: if not self.observer_vehicle: return OrderedDict( [ @@ -560,9 +560,7 @@ def observe(self) -> Dict[str, np.ndarray]: class AttributesObservation(ObservationType): - def __init__( - self, env: "AbstractEnv", attributes: List[str], **kwargs: dict - ) -> None: + def __init__(self, env: AbstractEnv, attributes: list[str], **kwargs: dict) -> None: self.env = env self.attributes = attributes @@ -580,14 +578,14 @@ def space(self) -> spaces.Space: except AttributeError: return spaces.Space() - def observe(self) -> Dict[str, np.ndarray]: + def observe(self) -> dict[str, np.ndarray]: return OrderedDict( [(attribute, getattr(self.env, attribute)) for attribute in self.attributes] ) class MultiAgentObservation(ObservationType): - def __init__(self, env: "AbstractEnv", observation_config: dict, **kwargs) -> None: + def __init__(self, env: AbstractEnv, observation_config: dict, **kwargs) -> None: super().__init__(env) self.observation_config = observation_config self.agents_observation_types = [] @@ -607,7 +605,7 @@ def observe(self) -> tuple: class TupleObservation(ObservationType): def __init__( - self, env: "AbstractEnv", observation_configs: List[dict], **kwargs + self, env: AbstractEnv, observation_configs: list[dict], **kwargs ) -> None: super().__init__(env) self.observation_types = [ @@ -623,7 +621,6 @@ def observe(self) -> tuple: class ExitObservation(KinematicObservation): - """Specific to exit_env, observe the distance to the next exit lane as part of a KinematicObservation.""" def observe(self) -> np.ndarray: @@ -687,7 +684,7 @@ def __init__( cells: int = 16, maximum_range: float = 60, normalize: bool = True, - **kwargs + **kwargs, ): super().__init__(env, **kwargs) self.cells = cells @@ -771,7 +768,7 @@ def index_to_direction(self, index: int) -> np.ndarray: return np.array([np.cos(index * self.angle), np.sin(index * self.angle)]) -def observation_factory(env: "AbstractEnv", config: dict) -> ObservationType: +def observation_factory(env: AbstractEnv, config: dict) -> ObservationType: if config["type"] == "TimeToCollision": return TimeToCollisionObservation(env, **config) elif config["type"] == "Kinematics": diff --git a/highway_env/envs/exit_env.py b/highway_env/envs/exit_env.py index f42067849..c7f3da861 100644 --- a/highway_env/envs/exit_env.py +++ b/highway_env/envs/exit_env.py @@ -1,12 +1,14 @@ -from typing import Dict, Text, Tuple +from __future__ import annotations import numpy as np from highway_env import utils -from highway_env.envs import CircularLane, HighwayEnv, Vehicle +from highway_env.envs import HighwayEnv from highway_env.envs.common.action import Action +from highway_env.road.lane import CircularLane from highway_env.road.road import Road, RoadNetwork from highway_env.vehicle.controller import ControlledVehicle +from highway_env.vehicle.kinematics import Vehicle class ExitEnv(HighwayEnv): @@ -44,7 +46,7 @@ def _reset(self) -> None: self._create_road() self._create_vehicles() - def step(self, action) -> Tuple[np.ndarray, float, bool, dict]: + def step(self, action) -> tuple[np.ndarray, float, bool, dict]: obs, reward, terminal, info = super().step(action) info.update({"is_success": self._is_success()}) return obs, reward, terminal, info @@ -154,7 +156,7 @@ def _reward(self, action: Action) -> float: reward = np.clip(reward, 0, 1) return reward - def _rewards(self, action: Action) -> Dict[Text, float]: + def _rewards(self, action: Action) -> dict[str, float]: lane_index = ( self.vehicle.target_lane_index if isinstance(self.vehicle, ControlledVehicle) diff --git a/highway_env/envs/highway_env.py b/highway_env/envs/highway_env.py index c5e05a27b..a2985c6e2 100644 --- a/highway_env/envs/highway_env.py +++ b/highway_env/envs/highway_env.py @@ -1,4 +1,4 @@ -from typing import Dict, Text +from __future__ import annotations import numpy as np @@ -10,6 +10,7 @@ from highway_env.vehicle.controller import ControlledVehicle from highway_env.vehicle.kinematics import Vehicle + Observation = np.ndarray @@ -114,7 +115,7 @@ def _reward(self, action: Action) -> float: reward *= rewards["on_road_reward"] return reward - def _rewards(self, action: Action) -> Dict[Text, float]: + def _rewards(self, action: Action) -> dict[str, float]: neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index) lane = ( self.vehicle.target_lane_index[2] diff --git a/highway_env/envs/intersection_env.py b/highway_env/envs/intersection_env.py index d55e32035..80c9df4c0 100644 --- a/highway_env/envs/intersection_env.py +++ b/highway_env/envs/intersection_env.py @@ -1,4 +1,4 @@ -from typing import Dict, Text, Tuple +from __future__ import annotations import numpy as np @@ -11,7 +11,7 @@ class IntersectionEnv(AbstractEnv): - ACTIONS: Dict[int, str] = {0: "SLOWER", 1: "IDLE", 2: "FASTER"} + ACTIONS: dict[int, str] = {0: "SLOWER", 1: "IDLE", 2: "FASTER"} ACTIONS_INDEXES = {v: k for k, v in ACTIONS.items()} @classmethod @@ -64,7 +64,7 @@ def _reward(self, action: int) -> float: self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles ) / len(self.controlled_vehicles) - def _rewards(self, action: int) -> Dict[Text, float]: + def _rewards(self, action: int) -> dict[str, float]: """Multi-objective rewards, for cooperative agents.""" agents_rewards = [ self._agent_rewards(action, vehicle) for vehicle in self.controlled_vehicles @@ -91,7 +91,7 @@ def _agent_reward(self, action: int, vehicle: Vehicle) -> float: ) return reward - def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]: + def _agent_rewards(self, action: int, vehicle: Vehicle) -> dict[str, float]: """Per-agent per-objective reward signal.""" scaled_speed = utils.lmap( vehicle.speed, self.config["reward_speed_range"], [0, 1] @@ -132,7 +132,7 @@ def _reset(self) -> None: self._make_road() self._make_vehicles(self.config["initial_vehicle_count"]) - def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]: + def step(self, action: int) -> tuple[np.ndarray, float, bool, bool, dict]: obs, reward, terminated, truncated, info = super().step(action) self._clear_vehicles() self._spawn_vehicle(spawn_probability=self.config["spawn_probability"]) @@ -286,7 +286,7 @@ def _make_vehicles(self, n_vehicles: int = 10) -> None: self.controlled_vehicles = [] for ego_id in range(0, self.config["controlled_vehicles"]): ego_lane = self.road.network.get_lane( - ("o{}".format(ego_id % 4), "ir{}".format(ego_id % 4), 0) + (f"o{ego_id % 4}", f"ir{ego_id % 4}", 0) ) destination = self.config["destination"] or "o" + str( self.np_random.integers(1, 4) diff --git a/highway_env/envs/lane_keeping_env.py b/highway_env/envs/lane_keeping_env.py index 2d760cb78..e1555f6a7 100644 --- a/highway_env/envs/lane_keeping_env.py +++ b/highway_env/envs/lane_keeping_env.py @@ -1,7 +1,6 @@ -from __future__ import absolute_import, division, print_function +from __future__ import annotations import copy -from typing import Tuple import numpy as np @@ -12,7 +11,6 @@ class LaneKeepingEnv(AbstractEnv): - """A lane keeping control task.""" def __init__(self, config: dict = None) -> None: @@ -51,7 +49,7 @@ def default_config(cls) -> dict: ) return config - def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, bool, dict]: + def step(self, action: np.ndarray) -> tuple[np.ndarray, float, bool, bool, dict]: if self.lanes and not self.lane.on_lane(self.vehicle.position): self.lane = self.lanes.pop(0) self.store_data() diff --git a/highway_env/envs/merge_env.py b/highway_env/envs/merge_env.py index 6a1df275b..77e7c24c0 100644 --- a/highway_env/envs/merge_env.py +++ b/highway_env/envs/merge_env.py @@ -1,4 +1,4 @@ -from typing import Dict, Text +from __future__ import annotations import numpy as np @@ -11,7 +11,6 @@ class MergeEnv(AbstractEnv): - """ A highway merge negotiation environment. @@ -57,7 +56,7 @@ def _reward(self, action: int) -> float: [0, 1], ) - def _rewards(self, action: int) -> Dict[Text, float]: + def _rewards(self, action: int) -> dict[str, float]: scaled_speed = utils.lmap( self.vehicle.speed, self.config["reward_speed_range"], [0, 1] ) diff --git a/highway_env/envs/parking_env.py b/highway_env/envs/parking_env.py index 68a02eb3b..8f702e977 100644 --- a/highway_env/envs/parking_env.py +++ b/highway_env/envs/parking_env.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from abc import abstractmethod -from typing import Optional import numpy as np from gymnasium import Env @@ -77,7 +78,7 @@ class ParkingEnv(AbstractEnv, GoalEnv): } } - def __init__(self, config: dict = None, render_mode: Optional[str] = None) -> None: + def __init__(self, config: dict = None, render_mode: str | None = None) -> None: super().__init__(config, render_mode) self.observation_type_parking = None @@ -121,7 +122,7 @@ def define_spaces(self) -> None: ) def _info(self, obs, action) -> dict: - info = super(ParkingEnv, self)._info(obs, action) + info = super()._info(obs, action) if isinstance(self.observation_type, MultiAgentObservation): success = tuple( self._is_success(agent_obs["achieved_goal"], agent_obs["desired_goal"]) diff --git a/highway_env/envs/racetrack_env.py b/highway_env/envs/racetrack_env.py index b2d68dfdc..538d0c1d3 100644 --- a/highway_env/envs/racetrack_env.py +++ b/highway_env/envs/racetrack_env.py @@ -1,4 +1,4 @@ -from typing import Dict, Text +from __future__ import annotations import numpy as np @@ -65,7 +65,7 @@ def _reward(self, action: np.ndarray) -> float: reward *= rewards["on_road_reward"] return reward - def _rewards(self, action: np.ndarray) -> Dict[Text, float]: + def _rewards(self, action: np.ndarray) -> dict[str, float]: _, lateral = self.vehicle.lane.local_coordinates(self.vehicle.position) return { "lane_centering_reward": 1 diff --git a/highway_env/envs/roundabout_env.py b/highway_env/envs/roundabout_env.py index cc91a570d..5d1f4c330 100644 --- a/highway_env/envs/roundabout_env.py +++ b/highway_env/envs/roundabout_env.py @@ -1,4 +1,4 @@ -from typing import Dict, Text +from __future__ import annotations import numpy as np @@ -54,7 +54,7 @@ def _reward(self, action: int) -> float: reward *= rewards["on_road_reward"] return reward - def _rewards(self, action: int) -> Dict[Text, float]: + def _rewards(self, action: int) -> dict[str, float]: return { "collision_reward": self.vehicle.crashed, "high_speed_reward": MDPVehicle.get_speed_index(self.vehicle) diff --git a/highway_env/envs/two_way_env.py b/highway_env/envs/two_way_env.py index 3fc042fc3..4d4280b49 100644 --- a/highway_env/envs/two_way_env.py +++ b/highway_env/envs/two_way_env.py @@ -1,4 +1,4 @@ -from typing import Dict, Text +from __future__ import annotations import numpy as np @@ -9,7 +9,6 @@ class TwoWayEnv(AbstractEnv): - """ A risk management task: the agent is driving on a two-way lane with icoming traffic. @@ -47,7 +46,7 @@ def _reward(self, action: int) -> float: for name, reward in self._rewards(action).items() ) - def _rewards(self, action: int) -> Dict[Text, float]: + def _rewards(self, action: int) -> dict[str, float]: neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index) return { "high_speed_reward": self.vehicle.speed_index diff --git a/highway_env/envs/u_turn_env.py b/highway_env/envs/u_turn_env.py index ba1dcb52b..c6fd25b5b 100644 --- a/highway_env/envs/u_turn_env.py +++ b/highway_env/envs/u_turn_env.py @@ -1,4 +1,4 @@ -from typing import Dict, Text +from __future__ import annotations import numpy as np @@ -10,7 +10,6 @@ class UTurnEnv(AbstractEnv): - """ U-Turn risk analysis task: the agent overtakes vehicles that are blocking the traffic. High speed overtaking must be balanced with ensuring safety. @@ -58,7 +57,7 @@ def _reward(self, action: int) -> float: reward *= rewards["on_road_reward"] return reward - def _rewards(self, action: int) -> Dict[Text, float]: + def _rewards(self, action: int) -> dict[str, float]: neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index) lane = self.vehicle.lane_index[2] scaled_speed = utils.lmap( diff --git a/highway_env/interval.py b/highway_env/interval.py index be1352675..2a796d7b9 100644 --- a/highway_env/interval.py +++ b/highway_env/interval.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import itertools -from typing import Callable, List, Tuple, Union +from typing import Callable import numpy as np from numpy.linalg import LinAlgError @@ -104,7 +106,7 @@ def vector_interval_section(v_i: Interval, direction: Vector) -> np.ndarray: def interval_absolute_to_local( position_i: Interval, lane: AbstractLane -) -> Tuple[np.ndarray, np.ndarray]: +) -> tuple[np.ndarray, np.ndarray]: """ Converts an interval in absolute x,y coordinates to an interval in local (longiturinal, lateral) coordinates @@ -152,7 +154,7 @@ def interval_local_to_absolute( def polytope( parametrized_f: Callable[[np.ndarray], np.ndarray], params_intervals: np.ndarray -) -> Tuple[np.ndarray, List[np.ndarray]]: +) -> tuple[np.ndarray, list[np.ndarray]]: """ Get a matrix polytope from a parametrized matrix function and parameter box @@ -175,12 +177,12 @@ def is_metzler(matrix: np.ndarray, eps: float = 1e-9) -> bool: return (matrix - np.diag(np.diag(matrix)) >= -eps).all() -class LPV(object): +class LPV: def __init__( self, x0: Vector, a0: Matrix, - da: List[Vector], + da: list[Vector], b: Matrix = None, d: Matrix = None, omega_i: Matrix = None, @@ -268,12 +270,12 @@ def set_control(self, control: np.ndarray, state: np.ndarray = None) -> None: def change_coordinates( self, - value: Union[np.ndarray, List[np.ndarray]], + value: np.ndarray | list[np.ndarray], matrix: bool = False, back: bool = False, interval: bool = False, offset: bool = True, - ) -> Union[np.ndarray, List[np.ndarray]]: + ) -> np.ndarray | list[np.ndarray]: """ Perform a change of coordinate: rotation and centering. diff --git a/highway_env/road/graphics.py b/highway_env/road/graphics.py index 06f7ada7e..043c8e6fe 100644 --- a/highway_env/road/graphics.py +++ b/highway_env/road/graphics.py @@ -1,4 +1,6 @@ -from typing import TYPE_CHECKING, List, Tuple, Union +from __future__ import annotations + +from typing import TYPE_CHECKING, Tuple, Union import numpy as np import pygame @@ -9,6 +11,7 @@ from highway_env.vehicle.graphics import VehicleGraphics from highway_env.vehicle.objects import Landmark, Obstacle + if TYPE_CHECKING: from highway_env.vehicle.objects import RoadObject @@ -16,7 +19,6 @@ class WorldSurface(pygame.Surface): - """A pygame Surface implementing a local coordinate system so that we can move and zoom in the displayed area.""" BLACK = (0, 0, 0) @@ -30,7 +32,7 @@ class WorldSurface(pygame.Surface): MOVING_FACTOR = 0.1 def __init__( - self, size: Tuple[int, int], flags: object, surf: pygame.SurfaceType + self, size: tuple[int, int], flags: object, surf: pygame.SurfaceType ) -> None: super().__init__(size, flags, surf) self.origin = np.array([0, 0]) @@ -46,7 +48,7 @@ def pix(self, length: float) -> int: """ return int(length * self.scaling) - def pos2pix(self, x: float, y: float) -> Tuple[int, int]: + def pos2pix(self, x: float, y: float) -> tuple[int, int]: """ Convert two world coordinates [m] into a position in the surface [px] @@ -56,7 +58,7 @@ def pos2pix(self, x: float, y: float) -> Tuple[int, int]: """ return self.pix(x - self.origin[0]), self.pix(y - self.origin[1]) - def vec2pix(self, vec: PositionType) -> Tuple[int, int]: + def vec2pix(self, vec: PositionType) -> tuple[int, int]: """ Convert a world position [m] into a position in the surface [px]. @@ -108,8 +110,7 @@ def handle_event(self, event: pygame.event.EventType) -> None: self.centering_position[0] += self.MOVING_FACTOR -class LaneGraphics(object): - +class LaneGraphics: """A visualization of a lane.""" # See https://www.researchgate.net/figure/French-road-traffic-lane-description-and-specification_fig4_261170641 @@ -229,9 +230,9 @@ def draw_stripes( cls, lane: AbstractLane, surface: WorldSurface, - starts: List[float], - ends: List[float], - lats: List[float], + starts: list[float], + ends: list[float], + lats: list[float], ) -> None: """ Draw a set of stripes along a lane. @@ -259,7 +260,7 @@ def draw_ground( cls, lane: AbstractLane, surface: WorldSurface, - color: Tuple[float], + color: tuple[float], width: float, draw_surface: pygame.Surface = None, ) -> None: @@ -288,8 +289,7 @@ def draw_ground( pygame.draw.polygon(draw_surface, color, dots, 0) -class RoadGraphics(object): - +class RoadGraphics: """A visualization of a road lanes and vehicles.""" @staticmethod @@ -345,7 +345,6 @@ def display_road_objects( class RoadObjectGraphics: - """A visualization of objects on the road.""" YELLOW = (200, 200, 0) @@ -358,7 +357,7 @@ class RoadObjectGraphics: @classmethod def display( cls, - object_: "RoadObject", + object_: RoadObject, surface: WorldSurface, transparent: bool = False, offscreen: bool = False, @@ -438,7 +437,7 @@ def blit_rotate( pygame.draw.rect(surf, (255, 0, 0), (*origin, *rotated_image.get_size()), 2) @classmethod - def get_color(cls, object_: "RoadObject", transparent: bool = False): + def get_color(cls, object_: RoadObject, transparent: bool = False): color = cls.DEFAULT_COLOR if isinstance(object_, Obstacle): diff --git a/highway_env/road/lane.py b/highway_env/road/lane.py index 1878231ed..f15e9eb0e 100644 --- a/highway_env/road/lane.py +++ b/highway_env/road/lane.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from abc import ABCMeta, abstractmethod -from typing import List, Optional, Tuple, Union import numpy as np @@ -8,15 +9,14 @@ from highway_env.utils import Vector, class_from_path, get_class_path, wrap_to_pi -class AbstractLane(object): - +class AbstractLane: """A lane on the road, described by its central curve.""" metaclass__ = ABCMeta DEFAULT_WIDTH: float = 4 VEHICLE_LENGTH: float = 5 length: float = 0 - line_types: List["LineType"] + line_types: list[LineType] @abstractmethod def position(self, longitudinal: float, lateral: float) -> np.ndarray: @@ -30,7 +30,7 @@ def position(self, longitudinal: float, lateral: float) -> np.ndarray: raise NotImplementedError() @abstractmethod - def local_coordinates(self, position: np.ndarray) -> Tuple[float, float]: + def local_coordinates(self, position: np.ndarray) -> tuple[float, float]: """ Convert a world position to local lane coordinates. @@ -132,7 +132,7 @@ def distance(self, position: np.ndarray): def distance_with_heading( self, position: np.ndarray, - heading: Optional[float], + heading: float | None, heading_weight: float = 1.0, ): """Compute a weighted distance in position and heading to the lane.""" @@ -148,7 +148,6 @@ def local_angle(self, heading: float, long_offset: float): class LineType: - """A lane side line type.""" NONE = 0 @@ -158,7 +157,6 @@ class LineType: class StraightLane(AbstractLane): - """A lane going in straight line.""" def __init__( @@ -166,7 +164,7 @@ def __init__( start: Vector, end: Vector, width: float = AbstractLane.DEFAULT_WIDTH, - line_types: Tuple[LineType, LineType] = None, + line_types: tuple[LineType, LineType] = None, forbidden: bool = False, speed_limit: float = 20, priority: int = 0, @@ -208,7 +206,7 @@ def heading_at(self, longitudinal: float) -> float: def width_at(self, longitudinal: float) -> float: return self.width - def local_coordinates(self, position: np.ndarray) -> Tuple[float, float]: + def local_coordinates(self, position: np.ndarray) -> tuple[float, float]: delta = position - self.start longitudinal = np.dot(delta, self.direction) lateral = np.dot(delta, self.direction_lateral) @@ -236,7 +234,6 @@ def to_config(self) -> dict: class SineLane(StraightLane): - """A sinusoidal lane.""" def __init__( @@ -247,7 +244,7 @@ def __init__( pulsation: float, phase: float, width: float = StraightLane.DEFAULT_WIDTH, - line_types: List[LineType] = None, + line_types: list[LineType] = None, forbidden: bool = False, speed_limit: float = 20, priority: int = 0, @@ -282,7 +279,7 @@ def heading_at(self, longitudinal: float) -> float: * np.cos(self.pulsation * longitudinal + self.phase) ) - def local_coordinates(self, position: np.ndarray) -> Tuple[float, float]: + def local_coordinates(self, position: np.ndarray) -> tuple[float, float]: longitudinal, lateral = super().local_coordinates(position) return longitudinal, lateral - self.amplitude * np.sin( self.pulsation * longitudinal + self.phase @@ -312,7 +309,6 @@ def to_config(self) -> dict: class CircularLane(AbstractLane): - """A lane going in circle arc.""" def __init__( @@ -323,7 +319,7 @@ def __init__( end_phase: float, clockwise: bool = True, width: float = AbstractLane.DEFAULT_WIDTH, - line_types: List[LineType] = None, + line_types: list[LineType] = None, forbidden: bool = False, speed_limit: float = 20, priority: int = 0, @@ -356,7 +352,7 @@ def heading_at(self, longitudinal: float) -> float: def width_at(self, longitudinal: float) -> float: return self.width - def local_coordinates(self, position: np.ndarray) -> Tuple[float, float]: + def local_coordinates(self, position: np.ndarray) -> tuple[float, float]: delta = position - self.center phi = np.arctan2(delta[1], delta[0]) phi = self.start_phase + utils.wrap_to_pi(phi - self.start_phase) @@ -395,9 +391,9 @@ class PolyLaneFixedWidth(AbstractLane): def __init__( self, - lane_points: List[Tuple[float, float]], + lane_points: list[tuple[float, float]], width: float = AbstractLane.DEFAULT_WIDTH, - line_types: Tuple[LineType, LineType] = None, + line_types: tuple[LineType, LineType] = None, forbidden: bool = False, speed_limit: float = 20, priority: int = 0, @@ -415,7 +411,7 @@ def position(self, longitudinal: float, lateral: float) -> np.ndarray: yaw = self.heading_at(longitudinal) return np.array([x - np.sin(yaw) * lateral, y + np.cos(yaw) * lateral]) - def local_coordinates(self, position: np.ndarray) -> Tuple[float, float]: + def local_coordinates(self, position: np.ndarray) -> tuple[float, float]: lon, lat = self.curve.cartesian_to_frenet(position) return lon, lat @@ -453,10 +449,10 @@ class PolyLane(PolyLaneFixedWidth): def __init__( self, - lane_points: List[Tuple[float, float]], - left_boundary_points: List[Tuple[float, float]], - right_boundary_points: List[Tuple[float, float]], - line_types: Tuple[LineType, LineType] = None, + lane_points: list[tuple[float, float]], + left_boundary_points: list[tuple[float, float]], + right_boundary_points: list[tuple[float, float]], + line_types: tuple[LineType, LineType] = None, forbidden: bool = False, speed_limit: float = 20, priority: int = 0, @@ -533,7 +529,7 @@ def to_config(self) -> dict: return config -def _to_serializable(arg: Union[np.ndarray, List]) -> List: +def _to_serializable(arg: np.ndarray | list) -> list: if isinstance(arg, np.ndarray): return arg.tolist() return arg diff --git a/highway_env/road/regulation.py b/highway_env/road/regulation.py index 1a335a38d..5b1b5a445 100644 --- a/highway_env/road/regulation.py +++ b/highway_env/road/regulation.py @@ -1,4 +1,4 @@ -from typing import List, Tuple +from __future__ import annotations import numpy as np @@ -10,15 +10,15 @@ class RegulatedRoad(Road): - YIELDING_COLOR: Tuple[float, float, float] = None + YIELDING_COLOR: tuple[float, float, float] = None REGULATION_FREQUENCY: int = 2 YIELD_DURATION: float = 0.0 def __init__( self, network: RoadNetwork = None, - vehicles: List[Vehicle] = None, - obstacles: List[Obstacle] = None, + vehicles: list[Vehicle] = None, + obstacles: list[Obstacle] = None, np_random: np.random.RandomState = None, record_history: bool = False, ) -> None: diff --git a/highway_env/road/road.py b/highway_env/road/road.py index 998eb017e..c5ade7191 100644 --- a/highway_env/road/road.py +++ b/highway_env/road/road.py @@ -1,11 +1,14 @@ +from __future__ import annotations + import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, List, Tuple import numpy as np from highway_env.road.lane import AbstractLane, LineType, StraightLane, lane_from_config from highway_env.vehicle.objects import Landmark + if TYPE_CHECKING: from highway_env.vehicle import kinematics, objects @@ -15,8 +18,8 @@ Route = List[LaneIndex] -class RoadNetwork(object): - graph: Dict[str, Dict[str, List[AbstractLane]]] +class RoadNetwork: + graph: dict[str, dict[str, list[AbstractLane]]] def __init__(self): self.graph = {} @@ -50,7 +53,7 @@ def get_lane(self, index: LaneIndex) -> AbstractLane: return self.graph[_from][_to][_id] def get_closest_lane_index( - self, position: np.ndarray, heading: Optional[float] = None + self, position: np.ndarray, heading: float | None = None ) -> LaneIndex: """ Get the index of the lane closest to a world position. @@ -140,7 +143,7 @@ def next_lane_given_next_road( next_to: str, next_id: int, position: np.ndarray, - ) -> Tuple[int, float]: + ) -> tuple[int, float]: # If next road has same number of lane, stay on the same lane if len(self.graph[_from][_to]) == len(self.graph[_to][next_to]): if next_id is None: @@ -153,7 +156,7 @@ def next_lane_given_next_road( ) return next_id, self.get_lane((_to, next_to, next_id)).distance(position) - def bfs_paths(self, start: str, goal: str) -> List[List[str]]: + def bfs_paths(self, start: str, goal: str) -> list[list[str]]: """ Breadth-first search of all routes from start to goal. @@ -174,7 +177,7 @@ def bfs_paths(self, start: str, goal: str) -> List[List[str]]: elif _next in self.graph: queue.append((_next, path + [_next])) - def shortest_path(self, start: str, goal: str) -> List[str]: + def shortest_path(self, start: str, goal: str) -> list[str]: """ Breadth-first search of shortest path from start to goal. @@ -184,7 +187,7 @@ def shortest_path(self, start: str, goal: str) -> List[str]: """ return next(self.bfs_paths(start, goal), []) - def all_side_lanes(self, lane_index: LaneIndex) -> List[LaneIndex]: + def all_side_lanes(self, lane_index: LaneIndex) -> list[LaneIndex]: """ :param lane_index: the index of a lane. :return: all lanes belonging to the same road. @@ -194,7 +197,7 @@ def all_side_lanes(self, lane_index: LaneIndex) -> List[LaneIndex]: for i in range(len(self.graph[lane_index[0]][lane_index[1]])) ] - def side_lanes(self, lane_index: LaneIndex) -> List[LaneIndex]: + def side_lanes(self, lane_index: LaneIndex) -> list[LaneIndex]: """ :param lane_index: the index of a lane. :return: indexes of lanes next to a an input lane, to its right or left. @@ -272,12 +275,12 @@ def is_connected_road( ) return False - def lanes_list(self) -> List[AbstractLane]: + def lanes_list(self) -> list[AbstractLane]: return [ lane for to in self.graph.values() for ids in to.values() for lane in ids ] - def lanes_dict(self) -> Dict[str, AbstractLane]: + def lanes_dict(self) -> dict[str, AbstractLane]: return { (from_, to_, i): lane for from_, tos in self.graph.items() @@ -292,9 +295,9 @@ def straight_road_network( length: float = 10000, angle: float = 0, speed_limit: float = 30, - nodes_str: Optional[Tuple[str, str]] = None, - net: Optional["RoadNetwork"] = None, - ) -> "RoadNetwork": + nodes_str: tuple[str, str] | None = None, + net: RoadNetwork | None = None, + ) -> RoadNetwork: net = net or RoadNetwork() nodes_str = nodes_str or ("0", "1") for lane in range(lanes): @@ -313,7 +316,7 @@ def straight_road_network( *nodes_str, StraightLane( origin, end, line_types=line_types, speed_limit=speed_limit - ) + ), ) return net @@ -323,7 +326,7 @@ def position_heading_along_route( longitudinal: float, lateral: float, current_lane_index: LaneIndex, - ) -> Tuple[np.ndarray, float]: + ) -> tuple[np.ndarray, float]: """ Get the absolute position and heading along a route composed of several lanes at some local coordinates. @@ -386,15 +389,14 @@ def to_config(self) -> dict: return graph_dict -class Road(object): - +class Road: """A road is a set of lanes, and a set of vehicles driving on these lanes.""" def __init__( self, network: RoadNetwork = None, - vehicles: List["kinematics.Vehicle"] = None, - road_objects: List["objects.RoadObject"] = None, + vehicles: list[kinematics.Vehicle] = None, + road_objects: list[objects.RoadObject] = None, np_random: np.random.RandomState = None, record_history: bool = False, ) -> None: @@ -415,9 +417,9 @@ def __init__( def close_objects_to( self, - vehicle: "kinematics.Vehicle", + vehicle: kinematics.Vehicle, distance: float, - count: Optional[int] = None, + count: int | None = None, see_behind: bool = True, sort: bool = True, vehicles_only: bool = False, @@ -446,9 +448,9 @@ def close_objects_to( def close_vehicles_to( self, - vehicle: "kinematics.Vehicle", + vehicle: kinematics.Vehicle, distance: float, - count: Optional[int] = None, + count: int | None = None, see_behind: bool = True, sort: bool = True, ) -> object: @@ -476,8 +478,8 @@ def step(self, dt: float) -> None: vehicle.handle_collisions(other, dt) def neighbour_vehicles( - self, vehicle: "kinematics.Vehicle", lane_index: LaneIndex = None - ) -> Tuple[Optional["kinematics.Vehicle"], Optional["kinematics.Vehicle"]]: + self, vehicle: kinematics.Vehicle, lane_index: LaneIndex = None + ) -> tuple[kinematics.Vehicle | None, kinematics.Vehicle | None]: """ Find the preceding and following vehicles of a given vehicle. diff --git a/highway_env/road/spline.py b/highway_env/road/spline.py index ef6ae3d84..4d9a99558 100644 --- a/highway_env/road/spline.py +++ b/highway_env/road/spline.py @@ -1,4 +1,4 @@ -from typing import List, Tuple +from __future__ import annotations import numpy as np from scipy import interpolate @@ -11,7 +11,7 @@ class LinearSpline2D: PARAM_CURVE_SAMPLE_DISTANCE: int = 1 # curve samples are placed 1m apart - def __init__(self, points: List[Tuple[float, float]]): + def __init__(self, points: list[tuple[float, float]]): x_values = np.array([pt[0] for pt in points]) y_values = np.array([pt[1] for pt in points]) x_values_diff = np.diff(x_values) @@ -39,15 +39,15 @@ def __init__(self, points: List[Tuple[float, float]]): self.x_curve, self.y_curve, self.length, self.PARAM_CURVE_SAMPLE_DISTANCE ) - def __call__(self, lon: float) -> Tuple[float, float]: + def __call__(self, lon: float) -> tuple[float, float]: return self.x_curve(lon), self.y_curve(lon) - def get_dx_dy(self, lon: float) -> Tuple[float, float]: + def get_dx_dy(self, lon: float) -> tuple[float, float]: idx_pose = self._get_idx_segment_for_lon(lon) pose = self.poses[idx_pose] return pose.normal - def cartesian_to_frenet(self, position: Tuple[float, float]) -> Tuple[float, float]: + def cartesian_to_frenet(self, position: tuple[float, float]) -> tuple[float, float]: """ Transform the point in Cartesian coordinates into Frenet coordinates of the curve """ @@ -74,7 +74,7 @@ def cartesian_to_frenet(self, position: Tuple[float, float]) -> Tuple[float, flo lat = pose.project_onto_orthonormal(position) return lon, lat - def frenet_to_cartesian(self, lon: float, lat: float) -> Tuple[float, float]: + def frenet_to_cartesian(self, lon: float, lat: float) -> tuple[float, float]: """ Convert the point from Frenet coordinates of the curve into Cartesian coordinates """ @@ -132,19 +132,19 @@ def __init__(self, x: float, y: float, dx: float, dy: float): self.normal = np.array([dx, dy]).flatten() / self.length self.orthonormal = np.array([-self.normal[1], self.normal[0]]).flatten() - def distance_to_origin(self, point: Tuple[float, float]) -> float: + def distance_to_origin(self, point: tuple[float, float]) -> float: """ Compute the distance between the point [x, y] and the pose origin """ return np.sqrt(np.sum((self.position - point) ** 2)) - def project_onto_normal(self, point: Tuple[float, float]) -> float: + def project_onto_normal(self, point: tuple[float, float]) -> float: """ Compute the longitudinal distance from pose origin to point by projecting the point onto the normal vector of the pose """ return self.normal.dot(point - self.position) - def project_onto_orthonormal(self, point: Tuple[float, float]) -> float: + def project_onto_orthonormal(self, point: tuple[float, float]) -> float: """ Compute the lateral distance from pose origin to point by projecting the point onto the orthonormal vector of the pose """ diff --git a/highway_env/utils.py b/highway_env/utils.py index 15ff45cbc..073ff9c49 100644 --- a/highway_env/utils.py +++ b/highway_env/utils.py @@ -1,10 +1,13 @@ +from __future__ import annotations + import copy import importlib import itertools -from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Callable, List, Sequence, Tuple, Union import numpy as np + # Useful types Vector = Union[np.ndarray, Sequence[float]] Matrix = Union[np.ndarray, Sequence[Sequence[float]]] @@ -108,7 +111,7 @@ def point_in_ellipse( def rotated_rectangles_intersect( - rect1: Tuple[Vector, float, float, float], rect2: Tuple[Vector, float, float, float] + rect1: tuple[Vector, float, float, float], rect2: tuple[Vector, float, float, float] ) -> bool: """ Do two rotated rectangles intersect? @@ -127,7 +130,7 @@ def rect_corners( angle: float, include_midpoints: bool = False, include_center: bool = False, -) -> List[np.ndarray]: +) -> list[np.ndarray]: """ Returns the positions of the corners of a rectangle. :param center: the rectangle center @@ -153,7 +156,7 @@ def rect_corners( def has_corner_inside( - rect1: Tuple[Vector, float, float, float], rect2: Tuple[Vector, float, float, float] + rect1: tuple[Vector, float, float, float], rect2: tuple[Vector, float, float, float] ) -> bool: """ Check if rect1 has a corner inside rect2 @@ -169,7 +172,7 @@ def has_corner_inside( ) -def project_polygon(polygon: Vector, axis: Vector) -> Tuple[float, float]: +def project_polygon(polygon: Vector, axis: Vector) -> tuple[float, float]: min_p, max_p = None, None for p in polygon: projected = p.dot(axis) @@ -190,7 +193,7 @@ def interval_distance(min_a: float, max_a: float, min_b: float, max_b: float): def are_polygons_intersecting( a: Vector, b: Vector, displacement_a: Vector, displacement_b: Vector -) -> Tuple[bool, bool, Optional[np.ndarray]]: +) -> tuple[bool, bool, np.ndarray | None]: """ Checks if the two polygons are intersecting. @@ -237,12 +240,12 @@ def are_polygons_intersecting( def confidence_ellipsoid( - data: Dict[str, np.ndarray], + data: dict[str, np.ndarray], lambda_: float = 1e-5, delta: float = 0.1, sigma: float = 0.1, param_bound: float = 1.0, -) -> Tuple[np.ndarray, np.ndarray, float]: +) -> tuple[np.ndarray, np.ndarray, float]: """ Compute a confidence ellipsoid over the parameter theta, where y = theta^T phi @@ -269,7 +272,7 @@ def confidence_ellipsoid( def confidence_polytope( data: dict, parameter_box: np.ndarray -) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]: +) -> tuple[np.ndarray, np.ndarray, np.ndarray, float]: """ Compute a confidence polytope over the parameter theta, where y = theta^T phi @@ -380,7 +383,7 @@ def distance_to_circle(center, radius, direction): return distance -def distance_to_rect(line: Tuple[np.ndarray, np.ndarray], rect: List[np.ndarray]): +def distance_to_rect(line: tuple[np.ndarray, np.ndarray], rect: list[np.ndarray]): """ Compute the intersection between a line segment and a rectangle. diff --git a/highway_env/vehicle/behavior.py b/highway_env/vehicle/behavior.py index 7378f63b8..6fc6b165c 100644 --- a/highway_env/vehicle/behavior.py +++ b/highway_env/vehicle/behavior.py @@ -1,4 +1,4 @@ -from typing import Union +from __future__ import annotations import numpy as np @@ -69,7 +69,7 @@ def randomize_behavior(self): ) @classmethod - def create_from(cls, vehicle: ControlledVehicle) -> "IDMVehicle": + def create_from(cls, vehicle: ControlledVehicle) -> IDMVehicle: """ Create a new vehicle from an existing one. @@ -90,7 +90,7 @@ def create_from(cls, vehicle: ControlledVehicle) -> "IDMVehicle": ) return v - def act(self, action: Union[dict, str] = None): + def act(self, action: dict | str = None): """ Execute an action. @@ -133,9 +133,8 @@ def act(self, action: Union[dict, str] = None): action["acceleration"] = np.clip( action["acceleration"], -self.ACC_MAX, self.ACC_MAX ) - Vehicle.act( - self, action - ) # Skip ControlledVehicle.act(), or the command will be overriden. + # Skip ControlledVehicle.act(), or the command will be overridden. + Vehicle.act(self, action) def step(self, dt: float): """ @@ -349,7 +348,6 @@ def recover_from_stop(self, acceleration: float) -> float: class LinearVehicle(IDMVehicle): - """A Vehicle whose longitudinal and lateral controllers are linear with respect to parameters.""" ACCELERATION_PARAMETERS = [0.3, 0.3, 2.0] @@ -400,7 +398,7 @@ def __init__( self.data = data if data is not None else {} self.collecting_data = True - def act(self, action: Union[dict, str] = None): + def act(self, action: dict | str = None): if self.collecting_data: self.collect_data() super().act(action) diff --git a/highway_env/vehicle/controller.py b/highway_env/vehicle/controller.py index 114a643f0..3731e9741 100644 --- a/highway_env/vehicle/controller.py +++ b/highway_env/vehicle/controller.py @@ -254,7 +254,6 @@ def predict_trajectory_constant_speed( class MDPVehicle(ControlledVehicle): - """A controlled vehicle with a specified discrete range of allowed target speeds.""" DEFAULT_TARGET_SPEEDS = np.linspace(20, 30, 3) diff --git a/highway_env/vehicle/dynamics.py b/highway_env/vehicle/dynamics.py index 16a2e2a3d..c432ba50d 100644 --- a/highway_env/vehicle/dynamics.py +++ b/highway_env/vehicle/dynamics.py @@ -1,4 +1,6 @@ -from typing import Callable, Tuple +from __future__ import annotations + +from typing import Callable import matplotlib.pyplot as plt import numpy as np @@ -37,9 +39,7 @@ class BicycleVehicle(Vehicle): MASS: float = 1 # [kg] LENGTH_A: float = Vehicle.LENGTH / 2 # [m] LENGTH_B: float = Vehicle.LENGTH / 2 # [m] - INERTIA_Z: float = ( - 1 / 12 * MASS * (Vehicle.LENGTH**2 + Vehicle.WIDTH**2) - ) # [kg.m2] + INERTIA_Z: float = 1 / 12 * MASS * (Vehicle.LENGTH**2 + Vehicle.WIDTH**2) # [kg.m2] FRICTION_FRONT: float = 15.0 * MASS # [N] FRICTION_REAR: float = 15.0 * MASS # [N] @@ -161,7 +161,7 @@ def clip_actions(self) -> None: self.yaw_rate, -self.MAX_ANGULAR_SPEED, self.MAX_ANGULAR_SPEED ) - def lateral_lpv_structure(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + def lateral_lpv_structure(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ State: [lateral speed v, yaw rate r] @@ -206,7 +206,7 @@ def lateral_lpv_structure(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: ) return A0, phi, B - def lateral_lpv_dynamics(self) -> Tuple[np.ndarray, np.ndarray]: + def lateral_lpv_dynamics(self) -> tuple[np.ndarray, np.ndarray]: """ State: [lateral speed v, yaw rate r] @@ -217,7 +217,7 @@ def lateral_lpv_dynamics(self) -> Tuple[np.ndarray, np.ndarray]: A = A0 + np.tensordot(self.theta, phi, axes=[0, 0]) return A, B - def full_lateral_lpv_structure(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + def full_lateral_lpv_structure(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ State: [position y, yaw psi, lateral speed v, yaw rate r] @@ -244,7 +244,7 @@ def full_lateral_lpv_structure(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray B = np.concatenate((np.zeros((2, 1)), B_lat)) return A0, phi, B - def full_lateral_lpv_dynamics(self) -> Tuple[np.ndarray, np.ndarray]: + def full_lateral_lpv_dynamics(self) -> tuple[np.ndarray, np.ndarray]: """ State: [position y, yaw psi, lateral speed v, yaw rate r] diff --git a/highway_env/vehicle/graphics.py b/highway_env/vehicle/graphics.py index c6e3fa0cc..85d6dffb6 100644 --- a/highway_env/vehicle/graphics.py +++ b/highway_env/vehicle/graphics.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import itertools -from typing import TYPE_CHECKING, List, Tuple +from typing import TYPE_CHECKING import numpy as np import pygame @@ -10,11 +12,12 @@ from highway_env.vehicle.dynamics import BicycleVehicle from highway_env.vehicle.kinematics import Vehicle + if TYPE_CHECKING: from highway_env.road.graphics import WorldSurface -class VehicleGraphics(object): +class VehicleGraphics: RED = (255, 100, 100) GREEN = (50, 200, 0) BLUE = (100, 200, 255) @@ -28,7 +31,7 @@ class VehicleGraphics(object): def display( cls, vehicle: Vehicle, - surface: "WorldSurface", + surface: WorldSurface, transparent: bool = False, offscreen: bool = False, label: bool = False, @@ -136,7 +139,7 @@ def display( # Label if label: font = pygame.font.Font(None, 15) - text = "#{}".format(id(v) % 1000) + text = f"#{id(v) % 1000}" text = font.render(text, 1, (10, 10, 10), (255, 255, 255)) surface.blit(text, position) @@ -185,7 +188,7 @@ def blit_rotate( @classmethod def display_trajectory( - cls, states: List[Vehicle], surface: "WorldSurface", offscreen: bool = False + cls, states: list[Vehicle], surface: WorldSurface, offscreen: bool = False ) -> None: """ Display the whole trajectory of a vehicle on a pygame surface. @@ -201,7 +204,7 @@ def display_trajectory( def display_history( cls, vehicle: Vehicle, - surface: "WorldSurface", + surface: WorldSurface, frequency: float = 3, duration: float = 2, simulation: int = 15, @@ -226,7 +229,7 @@ def display_history( cls.display(v, surface, transparent=True, offscreen=offscreen) @classmethod - def get_color(cls, vehicle: Vehicle, transparent: bool = False) -> Tuple[int]: + def get_color(cls, vehicle: Vehicle, transparent: bool = False) -> tuple[int]: color = cls.DEFAULT_COLOR if getattr(vehicle, "color", None): color = vehicle.color diff --git a/highway_env/vehicle/kinematics.py b/highway_env/vehicle/kinematics.py index 7a24f0b13..5331dc779 100644 --- a/highway_env/vehicle/kinematics.py +++ b/highway_env/vehicle/kinematics.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import copy from collections import deque -from typing import List, Optional, Tuple, Union import numpy as np @@ -10,7 +11,6 @@ class Vehicle(RoadObject): - """ A moving vehicle on a road, and its kinematics. @@ -52,11 +52,11 @@ def create_random( cls, road: Road, speed: float = None, - lane_from: Optional[str] = None, - lane_to: Optional[str] = None, - lane_id: Optional[int] = None, + lane_from: str | None = None, + lane_to: str | None = None, + lane_id: int | None = None, spacing: float = 1, - ) -> "Vehicle": + ) -> Vehicle: """ Create a random vehicle on the road. @@ -104,7 +104,7 @@ def create_random( return v @classmethod - def create_from(cls, vehicle: "Vehicle") -> "Vehicle": + def create_from(cls, vehicle: Vehicle) -> Vehicle: """ Create a new vehicle from an existing one. @@ -118,7 +118,7 @@ def create_from(cls, vehicle: "Vehicle") -> "Vehicle": v.color = vehicle.color return v - def act(self, action: Union[dict, str] = None) -> None: + def act(self, action: dict | str = None) -> None: """ Store an action to be repeated. @@ -178,7 +178,7 @@ def on_state_update(self) -> None: def predict_trajectory_constant_speed( self, times: np.ndarray - ) -> Tuple[List[np.ndarray], List[float]]: + ) -> tuple[list[np.ndarray], list[float]]: if self.prediction_type == "zero_steering": action = {"acceleration": 0.0, "steering": 0.0} elif self.prediction_type == "constant_steering": @@ -235,7 +235,7 @@ def lane_offset(self) -> np.ndarray: return np.zeros((3,)) def to_dict( - self, origin_vehicle: "Vehicle" = None, observe_intentions: bool = True + self, origin_vehicle: Vehicle = None, observe_intentions: bool = True ) -> dict: d = { "presence": 1, @@ -270,11 +270,11 @@ def __repr__(self): def predict_trajectory( self, - actions: List, + actions: list, action_duration: float, trajectory_timestep: float, dt: float, - ) -> List["Vehicle"]: + ) -> list[Vehicle]: """ Predict the future trajectory of the vehicle given a sequence of actions. diff --git a/highway_env/vehicle/objects.py b/highway_env/vehicle/objects.py index 16655a85e..ece4b6d59 100644 --- a/highway_env/vehicle/objects.py +++ b/highway_env/vehicle/objects.py @@ -1,10 +1,13 @@ +from __future__ import annotations + from abc import ABC -from typing import TYPE_CHECKING, Optional, Sequence, Tuple +from typing import TYPE_CHECKING, Sequence, Tuple import numpy as np from highway_env import utils + if TYPE_CHECKING: from highway_env.road.lane import AbstractLane from highway_env.road.road import Road @@ -13,7 +16,6 @@ class RoadObject(ABC): - """ Common interface for objects that appear on the road. @@ -25,7 +27,7 @@ class RoadObject(ABC): def __init__( self, - road: "Road", + road: Road, position: Sequence[float], heading: float = 0, speed: float = 0, @@ -65,11 +67,11 @@ def __init__( @classmethod def make_on_lane( cls, - road: "Road", + road: Road, lane_index: LaneIndex, longitudinal: float, - speed: Optional[float] = None, - ) -> "RoadObject": + speed: float | None = None, + ) -> RoadObject: """ Create a vehicle on a given lane at a longitudinal position. @@ -86,7 +88,7 @@ def make_on_lane( road, lane.position(longitudinal, 0), lane.heading_at(longitudinal), speed ) - def handle_collisions(self, other: "RoadObject", dt: float = 0) -> None: + def handle_collisions(self, other: RoadObject, dt: float = 0) -> None: """ Check for collision with another vehicle. @@ -177,9 +179,7 @@ def polygon(self) -> np.ndarray: points = (rotation @ points).T + np.tile(self.position, (4, 1)) return np.vstack([points, points[0:1]]) - def lane_distance_to( - self, other: "RoadObject", lane: "AbstractLane" = None - ) -> float: + def lane_distance_to(self, other: RoadObject, lane: AbstractLane = None) -> float: """ Compute the signed distance to another object along a lane. @@ -201,7 +201,7 @@ def on_road(self) -> bool: """Is the object on its current lane, or off-road?""" return self.lane.on_lane(self.position) - def front_distance_to(self, other: "RoadObject") -> float: + def front_distance_to(self, other: RoadObject) -> float: return self.direction.dot(other.position - self.position) def __str__(self): @@ -212,7 +212,6 @@ def __repr__(self): class Obstacle(RoadObject): - """Obstacles on the road.""" def __init__( @@ -223,7 +222,6 @@ def __init__( class Landmark(RoadObject): - """Landmarks of certain areas on the road that must be reached.""" def __init__( diff --git a/highway_env/vehicle/uncertainty/estimation.py b/highway_env/vehicle/uncertainty/estimation.py index 865271528..cf90461a5 100644 --- a/highway_env/vehicle/uncertainty/estimation.py +++ b/highway_env/vehicle/uncertainty/estimation.py @@ -1,4 +1,6 @@ -from typing import Callable, Union +from __future__ import annotations + +from typing import Callable import numpy as np @@ -9,7 +11,6 @@ class RegressionVehicle(IntervalVehicle): - """Estimator for the parameter of a LinearVehicle.""" def longitudinal_matrix_polytope(self) -> Polytope: @@ -65,7 +66,7 @@ def __init__( if not self.data: self.data = [] - def act(self, action: Union[dict, str] = None) -> None: + def act(self, action: dict | str = None) -> None: if self.collecting_data: self.update_possible_routes() super().act(action) @@ -120,7 +121,7 @@ def update_possible_routes(self) -> None: ): self.data.remove((route, data)) - def assume_model_is_valid(self, index: int) -> "LinearVehicle": + def assume_model_is_valid(self, index: int) -> LinearVehicle: """ Get a copy of this vehicle behaving according to one of its possible routes. diff --git a/highway_env/vehicle/uncertainty/prediction.py b/highway_env/vehicle/uncertainty/prediction.py index 2f158f17b..b6862625d 100644 --- a/highway_env/vehicle/uncertainty/prediction.py +++ b/highway_env/vehicle/uncertainty/prediction.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import copy from typing import TYPE_CHECKING, Callable, List, Tuple @@ -21,6 +23,7 @@ from highway_env.vehicle.controller import MDPVehicle from highway_env.vehicle.kinematics import Vehicle + if TYPE_CHECKING: from highway_env.vehicle.objects import RoadObject @@ -28,7 +31,6 @@ class IntervalVehicle(LinearVehicle): - """ Estimator for the interval-membership of a LinearVehicle under parameter uncertainty. @@ -48,8 +50,8 @@ def __init__( route: Route = None, enable_lane_change: bool = True, timer: float = None, - theta_a_i: List[List[float]] = None, - theta_b_i: List[List[float]] = None, + theta_a_i: list[list[float]] = None, + theta_b_i: list[list[float]] = None, data: dict = None, ) -> None: """ @@ -81,7 +83,7 @@ def __init__( self.previous_target_lane_index = self.target_lane_index @classmethod - def create_from(cls, vehicle: LinearVehicle) -> "IntervalVehicle": + def create_from(cls, vehicle: LinearVehicle) -> IntervalVehicle: v = cls( vehicle.road, vehicle.position, @@ -339,7 +341,7 @@ def parameter_box_to_polytope( a_theta = lambda params: a + np.tensordot(phi, params, axes=[0, 0]) return polytope(a_theta, parameter_box) - def get_front_interval(self) -> "VehicleInterval": + def get_front_interval(self) -> VehicleInterval: # TODO: For now, we assume the front vehicle follows the models' front vehicle front_vehicle, _ = self.road.neighbour_vehicles(self) if front_vehicle: @@ -356,7 +358,7 @@ def get_front_interval(self) -> "VehicleInterval": def get_followed_lanes( self, lane_change_model: str = "model", squeeze: bool = True - ) -> List[LaneIndex]: + ) -> list[LaneIndex]: """ Get the list of lanes that could be followed by this vehicle. @@ -445,7 +447,7 @@ def store_trajectories(self) -> None: self.trajectory.append(LinearVehicle.create_from(self)) self.interval_trajectory.append(copy.deepcopy(self.interval)) - def handle_collisions(self, other: "RoadObject", dt: float) -> None: + def handle_collisions(self, other: RoadObject, dt: float) -> None: """ Worst-case collision check. @@ -485,7 +487,7 @@ def handle_collisions(self, other: "RoadObject", dt: float) -> None: self.crashed = other.crashed = True -class VehicleInterval(object): +class VehicleInterval: def __init__(self, vehicle: Vehicle) -> None: self.position = np.array([vehicle.position, vehicle.position], dtype=float) self.speed = np.array([vehicle.speed, vehicle.speed], dtype=float) diff --git a/pyproject.toml b/pyproject.toml index b5a3c468d..b8e65330e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,4 +3,12 @@ requires = [ "setuptools>=42", "wheel" ] -build-backend = "setuptools.build_meta" \ No newline at end of file +build-backend = "setuptools.build_meta" + +[tool.isort] +atomic = true +profile = "black" +extra_standard_library = ["typing_extensions"] +indent = 4 +lines_after_imports = 2 +multi_line_output = 3 diff --git a/scripts/parking_model_based.ipynb b/scripts/parking_model_based.ipynb index 5b62afb38..3b0cdf5b6 100644 --- a/scripts/parking_model_based.ipynb +++ b/scripts/parking_model_based.ipynb @@ -1,624 +1,620 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "id": "5eeje4O8fviH", - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "# Model-Based Reinforcement Learning\n", - "\n", - "## Principle\n", - "We consider the optimal control problem of an MDP with a **known** reward function $R$ and subject to **unknown deterministic** dynamics $s_{t+1} = f(s_t, a_t)$:\n", - "\n", - "$$\\max_{(a_0,a_1,\\dotsc)} \\sum_{t=0}^\\infty \\gamma^t R(s_t,a_t)$$\n", - "\n", - "In **model-based reinforcement learning**, this problem is solved in **two steps**:\n", - "1. **Model learning**:\n", - "We learn a model of the dynamics $f_\\theta \\simeq f$ through regression on interaction data.\n", - "2. **Planning**:\n", - "We leverage the dynamics model $f_\\theta$ to compute the optimal trajectory $$\\max_{(a_0,a_1,\\dotsc)} \\sum_{t=0}^\\infty \\gamma^t R(\\hat{s}_t,a_t)$$ following the learnt dynamics $\\hat{s}_{t+1} = f_\\theta(\\hat{s}_t, a_t)$.\n", - "\n", - "(We can easily extend to unknown rewards and stochastic dynamics, but we consider the simpler case in this notebook for ease of presentation)\n", - "\n", - "\n", - "## Motivation\n", - "\n", - "### Sparse rewards\n", - "* In model-free reinforcement learning, we only obtain a reinforcement signal when encountering rewards. In environment with **sparse rewards**, the chance of obtaining a reward randomly is **negligible**, which prevents any learning.\n", - "* However, even in the **absence of rewards** we still receive a **stream of state transition data**. We can exploit this data to learn about the task at hand.\n", - "\n", - "### Complexity of the policy/value vs dynamics:\n", - "Is it easier to decide which action is best, or to predict what is going to happen?\n", - "* Some problems can have **complex dynamics** but a **simple optimal policy or value function**. For instance, consider the problem of learning to swim. Predicting the movement requires understanding fluid dynamics and vortices while the optimal policy simply consists in moving the limbs in sync.\n", - "* Conversely, other problems can have **simple dynamics** but **complex policies/value functions**. Think of the game of Go, its rules are simplistic (placing a stone merely changes the board state at this location) but the corresponding optimal policy is very complicated.\n", - "\n", - "Intuitively, model-free RL should be applied to the first category of problems and model-based RL to the second category.\n", - "\n", - "### Inductive bias\n", - "Oftentimes, real-world problems exhibit a particular **structure**: for instance, any problem involving motion of physical objects will be **continuous**. It can also be **smooth**, **invariant** to translations, etc. This knowledge can then be incorporated in machine learning models to foster efficient learning. In contrast, there can often be **discontinuities** in the policy decisions or value function: e.g. think of a collision vs near-collision state.\n", - "\n", - "### Sample efficiency\n", - "Overall, it is generally recognized that model-based approaches tend to **learn faster** than model-free techniques (see e.g. [[Sutton, 1990]](http://papersdb.cs.ualberta.ca/~papersdb/uploaded_files/paper_p160-sutton.pdf.stjohn)).\n", - "\n", - "### Interpretability\n", - "In real-world applications, we may want to know **how a policy will behave before actually executing it**, for instance for **safety-check** purposes. However, model-free reinforcement learning only recommends which action to take at current time without being able to predict its consequences. In order to obtain the trajectory, we have no choice but executing the policy. In stark contrast, model-based methods a more interpretable in the sense that we can probe the policy for its intended (and predicted) trajectory." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "2-oVNY_KTw6R" - }, - "source": [ - "## Our challenge: Automated Parking System\n", - "\n", - "We consider the **parking-v0** task of the [highway-env](https://github.com/eleurent/highway-env) environment. It is a **goal-conditioned continuous control** task where an agent **drives a car** by controlling the gaz pedal and steering angle and must **park in a given location** with the appropriate heading.\n", - "\n", - "This MDP has several properties wich justifies using model-based methods:\n", - "* The policy/value is highly dependent on the goal which adds a significant level of complexity to a model-free learning process, whereas the dynamics are completely independent of the goal and hence can be simpler to learn.\n", - "* In the context of an industrial application, we can reasonably expect for safety concerns that the planned trajectory is required to be known in advance, before execution.\n", - "\n", - "### Warming up\n", - "We start with a few useful installs and imports:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "bzMSuJEOfviP", - "pycharm": { - "is_executing": false, - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "# Install environment and visualization dependencies \n", - "!pip install highway-env\n", - "\n", - "# Environment\n", - "import gymnasium as gym\n", - "import highway_env\n", - "\n", - "# Models and computation\n", - "import torch\n", - "import torch.nn as nn\n", - "import torch.nn.functional as F\n", - "import numpy as np\n", - "from collections import namedtuple\n", - "\n", - "# Visualization\n", - "import matplotlib\n", - "import matplotlib.pyplot as plt\n", - "%matplotlib inline\n", - "from tqdm.notebook import trange\n", - "\n", - "# IO\n", - "from pathlib import Path" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "n2Bu_Pqop0E7" - }, - "source": [ - "We also define a simple helper function for visualization of episodes:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "so7yH4ucyB-3" - }, - "outputs": [], - "source": [ - "import sys\n", - "from tqdm.notebook import trange\n", - "!pip install tensorboardx gym pyvirtualdisplay\n", - "!apt-get install -y xvfb ffmpeg\n", - "!git clone https://github.com/Farama-Foundation/HighwayEnv.git 2> /dev/null\n", - "sys.path.insert(0, '/content/HighwayEnv/scripts/')\n", - "from utils import record_videos, show_videos" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "nFtBY6JSqPFa" - }, - "source": [ - "### Let's try it!\n", - "\n", - "Make the environment, and run an episode with random actions:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "jKZt9Cb1rJ6n" - }, - "outputs": [], - "source": [ - "env = gym.make(\"parking-v0\", render_mode=\"rgb_array\")\n", - "env = record_videos(env)\n", - "env.reset()\n", - "done = False\n", - "while not done:\n", - " action = env.action_space.sample()\n", - " obs, reward, done, truncated, info = env.step(action)\n", - "env.close()\n", - "show_videos()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "ewG5f_essAS0" - }, - "source": [ - "The environment is a `GoalEnv`, which means the agent receives a dictionary containing both the current `observation` and the `desired_goal` that conditions its policy." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "XIC98mGhr7v6" - }, - "outputs": [], - "source": [ - "print(\"Observation format:\", obs)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "voagCILztJ3J" - }, - "source": [ - "There is also an `achieved_goal` that won't be useful here (it only serves when the state and goal spaces are different, as a projection from the observation to the goal space).\n", - "\n", - "Alright! We are now ready to apply the model-based reinforcement learning paradigm." - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "I2PuVAvyfvib", - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "## Experience collection\n", - "First, we randomly interact with the environment to produce a batch of experiences \n", - "\n", - "$$D = \\{s_t, a_t, s_{t+1}\\}_{t\\in[1,N]}$$" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "tvUYSL7sfvie", - "pycharm": { - "is_executing": false, - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "Transition = namedtuple('Transition', ['state', 'action', 'next_state'])\n", - "\n", - "def collect_interaction_data(env, size=1000, action_repeat=2):\n", - " data, done = [], True\n", - " for _ in trange(size, desc=\"Collecting interaction data\"):\n", - " action = env.action_space.sample()\n", - " for _ in range(action_repeat):\n", - " if done:\n", - " previous_obs, info = env.reset()\n", - " obs, reward, done, truncated, info = env.step(action)\n", - " data.append(Transition(torch.Tensor(previous_obs[\"observation\"]),\n", - " torch.Tensor(action),\n", - " torch.Tensor(obs[\"observation\"])))\n", - " previous_obs = obs\n", - " return data\n", - "\n", - "env = gym.make(\"parking-v0\")\n", - "data = collect_interaction_data(env)\n", - "print(\"Sample transition:\", data[0])" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "4Th1JezEfvir" - }, - "source": [ - "## Build a dynamics model\n", - "\n", - "We now design a model to represent the system dynamics. We choose a **structured model** inspired from *Linear Time-Invariant (LTI) systems* \n", - "\n", - "$$\\dot{x} = f_\\theta(x, u) = A_\\theta(x, u)x + B_\\theta(x, u)u$$\n", - "\n", - "where the $(x, u)$ notation comes from the Control Theory community and stands for the state and action $(s,a)$. Intuitively, we learn at each point $(x_t, u_t)$ the **linearization** of the true dynamics $f$ with respect to $(x, u)$.\n", - "\n", - "We parametrize $A_\\theta$ and $B_\\theta$ as two fully-connected networks with one hidden layer.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "F7Gl2kKJfviu", - "pycharm": { - "is_executing": false, - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "class DynamicsModel(nn.Module):\n", - " STATE_X = 0\n", - " STATE_Y = 1\n", - "\n", - " def __init__(self, state_size, action_size, hidden_size, dt):\n", - " super().__init__()\n", - " self.state_size, self.action_size, self.dt = state_size, action_size, dt\n", - " A_size, B_size = state_size * state_size, state_size * action_size\n", - " self.A1 = nn.Linear(state_size + action_size, hidden_size)\n", - " self.A2 = nn.Linear(hidden_size, A_size)\n", - " self.B1 = nn.Linear(state_size + action_size, hidden_size)\n", - " self.B2 = nn.Linear(hidden_size, B_size)\n", - "\n", - " def forward(self, x, u):\n", - " \"\"\"\n", - " Predict x_{t+1} = f(x_t, u_t)\n", - " :param x: a batch of states\n", - " :param u: a batch of actions\n", - " \"\"\"\n", - " xu = torch.cat((x, u), -1)\n", - " xu[:, self.STATE_X:self.STATE_Y+1] = 0 # Remove dependency in (x,y)\n", - " A = self.A2(F.relu(self.A1(xu)))\n", - " A = torch.reshape(A, (x.shape[0], self.state_size, self.state_size))\n", - " B = self.B2(F.relu(self.B1(xu)))\n", - " B = torch.reshape(B, (x.shape[0], self.state_size, self.action_size))\n", - " dx = A @ x.unsqueeze(-1) + B @ u.unsqueeze(-1)\n", - " return x + dx.squeeze()*self.dt\n", - "\n", - "\n", - "dynamics = DynamicsModel(state_size=env.observation_space.spaces[\"observation\"].shape[0],\n", - " action_size=env.action_space.shape[0],\n", - " hidden_size=64,\n", - " dt=1/env.unwrapped.config[\"policy_frequency\"])\n", - "print(\"Forward initial model on a sample transition:\", dynamics(data[0].state.unsqueeze(0),\n", - " data[0].action.unsqueeze(0)).detach())" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "FFsgc7gffvi0" - }, - "source": [ - "## Fit the model on data\n", - "We can now train our model $f_\\theta$ in a supervised fashion to minimize an MSE loss $L^2(f_\\theta; D)$ over our experience batch $D$ by stochastic gradient descent:\n", - "\n", - "$$L^2(f_\\theta; D) = \\frac{1}{|D|}\\sum_{s_t,a_t,s_{t+1}\\in D}||s_{t+1}- f_\\theta(s_t, a_t)||^2$$" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "NwCDLD1wfvi2", - "pycharm": { - "is_executing": false, - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "optimizer = torch.optim.Adam(dynamics.parameters(), lr=0.01)\n", - "\n", - "# Split dataset into training and validation\n", - "train_ratio = 0.7\n", - "train_data, validation_data = data[:int(train_ratio * len(data))], \\\n", - " data[int(train_ratio * len(data)):]\n", - "\n", - "def compute_loss(model, data_t, loss_func = torch.nn.MSELoss()):\n", - " states, actions, next_states = data_t\n", - " predictions = model(states, actions)\n", - " return loss_func(predictions, next_states)\n", - "\n", - "def transpose_batch(batch):\n", - " return Transition(*map(torch.stack, zip(*batch)))\n", - "\n", - "def train(model, train_data, validation_data, epochs=1500):\n", - " train_data_t = transpose_batch(train_data)\n", - " validation_data_t = transpose_batch(validation_data)\n", - " losses = np.full((epochs, 2), np.nan)\n", - " for epoch in trange(epochs, desc=\"Train dynamics\"):\n", - " # Compute loss gradient and step optimizer\n", - " loss = compute_loss(model, train_data_t)\n", - " validation_loss = compute_loss(model, validation_data_t)\n", - " losses[epoch] = [loss.detach().numpy(), validation_loss.detach().numpy()]\n", - " optimizer.zero_grad()\n", - " loss.backward()\n", - " optimizer.step()\n", - " # Plot losses\n", - " plt.plot(losses)\n", - " plt.yscale(\"log\")\n", - " plt.xlabel(\"epochs\")\n", - " plt.ylabel(\"loss\")\n", - " plt.legend([\"train\", \"validation\"])\n", - " plt.show()\n", - "\n", - "train(dynamics, data, validation_data)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "NXBODCuYfvi_", - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "## Visualize trained dynamics\n", - "\n", - "In order to qualitatively evaluate our model, we can choose some values of steering angle *(right, center, left)* and acceleration *(slow, fast)* in order to predict and visualize the corresponding trajectories from an initial state. \n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "SMPA55bCfvjB", - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "def predict_trajectory(state, actions, model, action_repeat=1):\n", - " states = []\n", - " for action in actions:\n", - " for _ in range(action_repeat):\n", - " state = model(state, action)\n", - " states.append(state)\n", - " return torch.stack(states, dim=0)\n", - "\n", - "def plot_trajectory(states, color):\n", - " scales = np.array(env.unwrapped.config[\"observation\"][\"scales\"])\n", - " states = np.clip(states.squeeze(1).detach().numpy() * scales, -100, 100)\n", - " plt.plot(states[:, 0], states[:, 1], color=color, marker='.')\n", - " plt.arrow(states[-1,0], states[-1,1], states[-1,4]*1, states[-1,5]*1, color=color)\n", - "\n", - "def visualize_trajectories(model, state, horizon=15):\n", - " plt.cla()\n", - " # Draw a car\n", - " plt.plot(state.numpy()[0]+2.5*np.array([-1, -1, 1, 1, -1]),\n", - " state.numpy()[1]+1.0*np.array([-1, 1, 1, -1, -1]), 'k')\n", - " # Draw trajectories\n", - " state = state.unsqueeze(0)\n", - " colors = iter(plt.get_cmap(\"tab20\").colors)\n", - " # Generate commands\n", - " for steering in np.linspace(-0.5, 0.5, 3):\n", - " for acceleration in np.linspace(0.8, 0.4, 2):\n", - " actions = torch.Tensor([acceleration, steering]).view(1,1,-1)\n", - " # Predict trajectories\n", - " states = predict_trajectory(state, actions, model, action_repeat=horizon)\n", - " plot_trajectory(states, color=next(colors))\n", - " plt.axis(\"equal\")\n", - " plt.show()\n", - " \n", - "visualize_trajectories(dynamics, state=torch.Tensor([0, 0, 0, 0, 1, 0]))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "DOa0j1_muNXi" - }, - "source": [ - "## Reward model\n", - "We assume that the reward $R(s,a)$ is known (chosen by the system designer), and takes the form of a **weighted L1-norm** between the state and the goal." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "cRubRv9buNXj" - }, - "outputs": [], - "source": [ - "def reward_model(states, goal, gamma=None):\n", - " \"\"\"\n", - " The reward is a weighted L1-norm between the state and a goal\n", - " :param Tensor states: a batch of states. shape: [batch_size, state_size].\n", - " :param Tensor goal: a goal state. shape: [state_size].\n", - " :param float gamma: a discount factor\n", - " \"\"\"\n", - " goal = goal.expand(states.shape)\n", - " reward_weigths = torch.Tensor(env.unwrapped.config[\"reward_weights\"])\n", - " rewards = -torch.pow(torch.norm((states-goal)*reward_weigths, p=1, dim=-1), 0.5)\n", - " if gamma:\n", - " time = torch.arange(rewards.shape[0], dtype=torch.float).unsqueeze(-1).expand(rewards.shape)\n", - " rewards *= torch.pow(gamma, time)\n", - " return rewards\n", - "\n", - "obs, info = env.reset()\n", - "print(\"Reward of a sample transition:\", reward_model(torch.Tensor(obs[\"observation\"]).unsqueeze(0),\n", - " torch.Tensor(obs[\"desired_goal\"])))" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "Q5D6W4p7fvjI" - }, - "source": [ - "## Leverage dynamics model for planning\n", - "\n", - "We now use the learnt dynamics model $f_\\theta$ for planning.\n", - "In order to solve the optimal control problem, we use a sampling-based optimization algorithm: the **Cross-Entropy Method** (`CEM`). It is an optimization algorithm applicable to problems that are both **combinatorial** and **continuous**, which is our case: find the best performing sequence of actions.\n", - "\n", - "This method approximates the optimal importance sampling estimator by repeating two phases:\n", - "1. **Draw samples** from a probability distribution. We use Gaussian distributions over sequences of actions.\n", - "2. Minimize the **cross-entropy** between this distribution and a **target distribution** to produce a better sample in the next iteration. We define this target distribution by selecting the top-k performing sampled sequences.\n", - "\n", - "![Credits to Olivier Sigaud](https://github.com/yfletberliac/rlss2019-hands-on/blob/master/imgs/cem.png?raw=1)\n", - "\n", - "Note that as we have a local linear dynamics model, we could instead choose an `Iterative LQR` planner which would be more efficient. We prefer `CEM` in this educational setting for its simplicity and generality." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "bzPKYg23fvjL", - "pycharm": { - "is_executing": false, - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "def cem_planner(state, goal, action_size, horizon=5, population=100, selection=10, iterations=5):\n", - " state = state.expand(population, -1)\n", - " action_mean = torch.zeros(horizon, 1, action_size)\n", - " action_std = torch.ones(horizon, 1, action_size)\n", - " for _ in range(iterations):\n", - " # 1. Draw sample sequences of actions from a normal distribution\n", - " actions = torch.normal(mean=action_mean.repeat(1, population, 1), std=action_std.repeat(1, population, 1))\n", - " actions = torch.clamp(actions, min=env.action_space.low.min(), max=env.action_space.high.max())\n", - " states = predict_trajectory(state, actions, dynamics, action_repeat=5)\n", - " # 2. Fit the distribution to the top-k performing sequences\n", - " returns = reward_model(states, goal).sum(dim=0)\n", - " _, best = returns.topk(selection, largest=True, sorted=False)\n", - " best_actions = actions[:, best, :]\n", - " action_mean = best_actions.mean(dim=1, keepdim=True)\n", - " action_std = best_actions.std(dim=1, unbiased=False, keepdim=True)\n", - " return action_mean[0].squeeze(dim=0)\n", - " \n", - " \n", - "# Run the planner on a sample transition\n", - "action = cem_planner(torch.Tensor(obs[\"observation\"]),\n", - " torch.Tensor(obs[\"desired_goal\"]),\n", - " env.action_space.shape[0])\n", - "print(\"Planned action:\", action)" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "_8L6vEPWyea7" - }, - "source": [ - "## Visualize a few episodes\n", - "\n", - "En voiture, Simone !" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "xOcOP7Of18T2" - }, - "outputs": [], - "source": [ - "env = gym.make(\"parking-v0\", render_mode='rgb_array')\n", - "env = record_videos(env)\n", - "obs, info = env.reset()\n", - "\n", - "for step in trange(3 * env.config[\"duration\"], desc=\"Testing 3 episodes...\"):\n", - " action = cem_planner(torch.Tensor(obs[\"observation\"]),\n", - " torch.Tensor(obs[\"desired_goal\"]),\n", - " env.action_space.shape[0])\n", - " obs, reward, done, truncated, info = env.step(action.numpy())\n", - " if done or truncated:\n", - " obs, info = env.reset()\n", - "env.close()\n", - "show_videos()" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "psBBQIv4fvjT", - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "## Limitations\n", - "\n", - "### Model bias\n", - "\n", - "In model-based reinforcement learning, we replace our original optimal control problem by another problem: optimizing our learnt approximate MDP. When settling for this approximate MDP to plan with, we introduce a **bias** that can only **decrease the true performance** of the corresponding planned policy. This is called the problem of model bias.\n", - "\n", - "In some MDPs, even slight model errors lead to a dramatic drop in performance, as illustrated in the beginning of the following video:\n", - "\n", - "[![Approximate Robust Control of Uncertain Dynamical Systems](https://img.youtube.com/vi/8khqd3BJo0A/0.jpg)](https://www.youtube.com/watch?v=8khqd3BJo0A)\n", - "\n", - "The question of how to address model bias belongs to the field of **Safe Reinforcement Learning**. \n", - "\n", - "### [The call of the void](https://www.urbandictionary.com/define.php?term=the%20call%20of%20the%20void)\n", - "\n", - "The model will be accurate only on some region of the state space that was explored and covered in $D$.\n", - "Outside of $D$, the model may diverge and **hallucinate** important rewards.\n", - "This effect is problematic when the model is used by a planning algorithm, as the latter will try to **exploit** these hallucinated high rewards and will steer the agent towards **unknown** (and thus dangerous) **regions** where the model is erroneously optimistic.\n", - "\n", - "### Computational cost of planning\n", - "\n", - "At test time, the planning step typically requires **sampling a lot of trajectories** to find a near-optimal candidate, wich may turn out to be very costly. This may be prohibitive in a high-frequency real-time setting. The **model-free** methods which directly recommend the best action are **much more efficient** in that regard." - ] + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "5eeje4O8fviH", + "pycharm": { + "name": "#%% md\n" } - ], - "metadata": { - "colab": { - "name": "parking_model_based.ipynb", - "provenance": [] - }, - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.5" - }, + }, + "source": [ + "# Model-Based Reinforcement Learning\n", + "\n", + "## Principle\n", + "We consider the optimal control problem of an MDP with a **known** reward function $R$ and subject to **unknown deterministic** dynamics $s_{t+1} = f(s_t, a_t)$:\n", + "\n", + "$$\\max_{(a_0,a_1,\\dotsc)} \\sum_{t=0}^\\infty \\gamma^t R(s_t,a_t)$$\n", + "\n", + "In **model-based reinforcement learning**, this problem is solved in **two steps**:\n", + "1. **Model learning**:\n", + "We learn a model of the dynamics $f_\\theta \\simeq f$ through regression on interaction data.\n", + "2. **Planning**:\n", + "We leverage the dynamics model $f_\\theta$ to compute the optimal trajectory $$\\max_{(a_0,a_1,\\dotsc)} \\sum_{t=0}^\\infty \\gamma^t R(\\hat{s}_t,a_t)$$ following the learnt dynamics $\\hat{s}_{t+1} = f_\\theta(\\hat{s}_t, a_t)$.\n", + "\n", + "(We can easily extend to unknown rewards and stochastic dynamics, but we consider the simpler case in this notebook for ease of presentation)\n", + "\n", + "\n", + "## Motivation\n", + "\n", + "### Sparse rewards\n", + "* In model-free reinforcement learning, we only obtain a reinforcement signal when encountering rewards. In environment with **sparse rewards**, the chance of obtaining a reward randomly is **negligible**, which prevents any learning.\n", + "* However, even in the **absence of rewards** we still receive a **stream of state transition data**. We can exploit this data to learn about the task at hand.\n", + "\n", + "### Complexity of the policy/value vs dynamics:\n", + "Is it easier to decide which action is best, or to predict what is going to happen?\n", + "* Some problems can have **complex dynamics** but a **simple optimal policy or value function**. For instance, consider the problem of learning to swim. Predicting the movement requires understanding fluid dynamics and vortices while the optimal policy simply consists in moving the limbs in sync.\n", + "* Conversely, other problems can have **simple dynamics** but **complex policies/value functions**. Think of the game of Go, its rules are simplistic (placing a stone merely changes the board state at this location) but the corresponding optimal policy is very complicated.\n", + "\n", + "Intuitively, model-free RL should be applied to the first category of problems and model-based RL to the second category.\n", + "\n", + "### Inductive bias\n", + "Oftentimes, real-world problems exhibit a particular **structure**: for instance, any problem involving motion of physical objects will be **continuous**. It can also be **smooth**, **invariant** to translations, etc. This knowledge can then be incorporated in machine learning models to foster efficient learning. In contrast, there can often be **discontinuities** in the policy decisions or value function: e.g. think of a collision vs near-collision state.\n", + "\n", + "### Sample efficiency\n", + "Overall, it is generally recognized that model-based approaches tend to **learn faster** than model-free techniques (see e.g. [[Sutton, 1990]](http://papersdb.cs.ualberta.ca/~papersdb/uploaded_files/paper_p160-sutton.pdf.stjohn)).\n", + "\n", + "### Interpretability\n", + "In real-world applications, we may want to know **how a policy will behave before actually executing it**, for instance for **safety-check** purposes. However, model-free reinforcement learning only recommends which action to take at current time without being able to predict its consequences. In order to obtain the trajectory, we have no choice but executing the policy. In stark contrast, model-based methods a more interpretable in the sense that we can probe the policy for its intended (and predicted) trajectory." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "2-oVNY_KTw6R" + }, + "source": [ + "## Our challenge: Automated Parking System\n", + "\n", + "We consider the **parking-v0** task of the [highway-env](https://github.com/eleurent/highway-env) environment. It is a **goal-conditioned continuous control** task where an agent **drives a car** by controlling the gaz pedal and steering angle and must **park in a given location** with the appropriate heading.\n", + "\n", + "This MDP has several properties which justifies using model-based methods:\n", + "* The policy/value is highly dependent on the goal which adds a significant level of complexity to a model-free learning process, whereas the dynamics are completely independent of the goal and hence can be simpler to learn.\n", + "* In the context of an industrial application, we can reasonably expect for safety concerns that the planned trajectory is required to be known in advance, before execution.\n", + "\n", + "### Warming up\n", + "We start with a few useful installs and imports:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "bzMSuJEOfviP", "pycharm": { - "stem_cell": { - "cell_type": "raw", - "metadata": { - "collapsed": false - }, - "source": [] - } + "is_executing": false, + "name": "#%%\n" } + }, + "source": [ + "# Install environment and visualization dependencies \n", + "!pip install highway-env\n", + "\n", + "# Environment\n", + "import gymnasium as gym\n", + "import highway_env\n", + "\n", + "gym.register_envs(highway_env)\n", + "\n", + "# Models and computation\n", + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "import numpy as np\n", + "from collections import namedtuple\n", + "\n", + "# Visualization\n", + "import matplotlib.pyplot as plt\n", + "%matplotlib inline" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "n2Bu_Pqop0E7" + }, + "source": [ + "We also define a simple helper function for visualization of episodes:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "so7yH4ucyB-3" + }, + "source": [ + "import sys\n", + "from tqdm.notebook import trange\n", + "!pip install tensorboardx gym pyvirtualdisplay\n", + "!apt-get install -y xvfb ffmpeg\n", + "!git clone https://github.com/Farama-Foundation/HighwayEnv.git 2> /dev/null\n", + "sys.path.insert(0, '/content/HighwayEnv/scripts/')\n", + "from utils import record_videos, show_videos" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "nFtBY6JSqPFa" + }, + "source": [ + "### Let's try it!\n", + "\n", + "Make the environment, and run an episode with random actions:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "jKZt9Cb1rJ6n" + }, + "source": [ + "env = gym.make(\"parking-v0\", render_mode=\"rgb_array\")\n", + "env = record_videos(env)\n", + "env.reset()\n", + "done = False\n", + "while not done:\n", + " action = env.action_space.sample()\n", + " obs, reward, done, truncated, info = env.step(action)\n", + "env.close()\n", + "show_videos()" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ewG5f_essAS0" + }, + "source": [ + "The environment is a `GoalEnv`, which means the agent receives a dictionary containing both the current `observation` and the `desired_goal` that conditions its policy." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "XIC98mGhr7v6" + }, + "source": [ + "print(\"Observation format:\", obs)" + ], + "outputs": [] }, - "nbformat": 4, - "nbformat_minor": 0 + { + "cell_type": "markdown", + "metadata": { + "id": "voagCILztJ3J" + }, + "source": [ + "There is also an `achieved_goal` that won't be useful here (it only serves when the state and goal spaces are different, as a projection from the observation to the goal space).\n", + "\n", + "Alright! We are now ready to apply the model-based reinforcement learning paradigm." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "I2PuVAvyfvib", + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Experience collection\n", + "First, we randomly interact with the environment to produce a batch of experiences \n", + "\n", + "$$D = \\{s_t, a_t, s_{t+1}\\}_{t\\in[1,N]}$$" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "tvUYSL7sfvie", + "pycharm": { + "is_executing": false, + "name": "#%%\n" + } + }, + "source": [ + "Transition = namedtuple('Transition', ['state', 'action', 'next_state'])\n", + "\n", + "def collect_interaction_data(env, size=1000, action_repeat=2):\n", + " data, done = [], True\n", + " for _ in trange(size, desc=\"Collecting interaction data\"):\n", + " action = env.action_space.sample()\n", + " for _ in range(action_repeat):\n", + " if done:\n", + " previous_obs, info = env.reset()\n", + " obs, reward, done, truncated, info = env.step(action)\n", + " data.append(Transition(torch.Tensor(previous_obs[\"observation\"]),\n", + " torch.Tensor(action),\n", + " torch.Tensor(obs[\"observation\"])))\n", + " previous_obs = obs\n", + " return data\n", + "\n", + "env = gym.make(\"parking-v0\")\n", + "data = collect_interaction_data(env)\n", + "print(\"Sample transition:\", data[0])" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "4Th1JezEfvir" + }, + "source": [ + "## Build a dynamics model\n", + "\n", + "We now design a model to represent the system dynamics. We choose a **structured model** inspired from *Linear Time-Invariant (LTI) systems* \n", + "\n", + "$$\\dot{x} = f_\\theta(x, u) = A_\\theta(x, u)x + B_\\theta(x, u)u$$\n", + "\n", + "where the $(x, u)$ notation comes from the Control Theory community and stands for the state and action $(s,a)$. Intuitively, we learn at each point $(x_t, u_t)$ the **linearization** of the true dynamics $f$ with respect to $(x, u)$.\n", + "\n", + "We parametrize $A_\\theta$ and $B_\\theta$ as two fully-connected networks with one hidden layer.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "F7Gl2kKJfviu", + "pycharm": { + "is_executing": false, + "name": "#%%\n" + } + }, + "source": [ + "class DynamicsModel(nn.Module):\n", + " STATE_X = 0\n", + " STATE_Y = 1\n", + "\n", + " def __init__(self, state_size, action_size, hidden_size, dt):\n", + " super().__init__()\n", + " self.state_size, self.action_size, self.dt = state_size, action_size, dt\n", + " A_size, B_size = state_size * state_size, state_size * action_size\n", + " self.A1 = nn.Linear(state_size + action_size, hidden_size)\n", + " self.A2 = nn.Linear(hidden_size, A_size)\n", + " self.B1 = nn.Linear(state_size + action_size, hidden_size)\n", + " self.B2 = nn.Linear(hidden_size, B_size)\n", + "\n", + " def forward(self, x, u):\n", + " \"\"\"\n", + " Predict x_{t+1} = f(x_t, u_t)\n", + " :param x: a batch of states\n", + " :param u: a batch of actions\n", + " \"\"\"\n", + " xu = torch.cat((x, u), -1)\n", + " xu[:, self.STATE_X:self.STATE_Y+1] = 0 # Remove dependency in (x,y)\n", + " A = self.A2(F.relu(self.A1(xu)))\n", + " A = torch.reshape(A, (x.shape[0], self.state_size, self.state_size))\n", + " B = self.B2(F.relu(self.B1(xu)))\n", + " B = torch.reshape(B, (x.shape[0], self.state_size, self.action_size))\n", + " dx = A @ x.unsqueeze(-1) + B @ u.unsqueeze(-1)\n", + " return x + dx.squeeze()*self.dt\n", + "\n", + "\n", + "dynamics = DynamicsModel(state_size=env.observation_space.spaces[\"observation\"].shape[0],\n", + " action_size=env.action_space.shape[0],\n", + " hidden_size=64,\n", + " dt=1/env.unwrapped.config[\"policy_frequency\"])\n", + "print(\"Forward initial model on a sample transition:\",\n", + " dynamics(data[0].state.unsqueeze(0), data[0].action.unsqueeze(0)).detach())" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "FFsgc7gffvi0" + }, + "source": [ + "## Fit the model on data\n", + "We can now train our model $f_\\theta$ in a supervised fashion to minimize an MSE loss $L^2(f_\\theta; D)$ over our experience batch $D$ by stochastic gradient descent:\n", + "\n", + "$$L^2(f_\\theta; D) = \\frac{1}{|D|}\\sum_{s_t,a_t,s_{t+1}\\in D}||s_{t+1}- f_\\theta(s_t, a_t)||^2$$" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "NwCDLD1wfvi2", + "pycharm": { + "is_executing": false, + "name": "#%%\n" + } + }, + "source": [ + "optimizer = torch.optim.Adam(dynamics.parameters(), lr=0.01)\n", + "\n", + "# Split dataset into training and validation\n", + "train_ratio = 0.7\n", + "train_data, validation_data = data[:int(train_ratio * len(data))], data[int(train_ratio * len(data)):]\n", + "\n", + "def compute_loss(model, data_t, loss_func = torch.nn.MSELoss()):\n", + " states, actions, next_states = data_t\n", + " predictions = model(states, actions)\n", + " return loss_func(predictions, next_states)\n", + "\n", + "def transpose_batch(batch):\n", + " return Transition(*map(torch.stack, zip(*batch)))\n", + "\n", + "def train(model, train_data, validation_data, epochs=1500):\n", + " train_data_t = transpose_batch(train_data)\n", + " validation_data_t = transpose_batch(validation_data)\n", + " losses = np.full((epochs, 2), np.nan)\n", + " for epoch in trange(epochs, desc=\"Train dynamics\"):\n", + " # Compute loss gradient and step optimizer\n", + " loss = compute_loss(model, train_data_t)\n", + " validation_loss = compute_loss(model, validation_data_t)\n", + " losses[epoch] = [loss.detach().numpy(), validation_loss.detach().numpy()]\n", + " optimizer.zero_grad()\n", + " loss.backward()\n", + " optimizer.step()\n", + " # Plot losses\n", + " plt.plot(losses)\n", + " plt.yscale(\"log\")\n", + " plt.xlabel(\"epochs\")\n", + " plt.ylabel(\"loss\")\n", + " plt.legend([\"train\", \"validation\"])\n", + " plt.show()\n", + "\n", + "train(dynamics, data, validation_data)" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "NXBODCuYfvi_", + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Visualize trained dynamics\n", + "\n", + "In order to qualitatively evaluate our model, we can choose some values of steering angle *(right, center, left)* and acceleration *(slow, fast)* in order to predict and visualize the corresponding trajectories from an initial state. \n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "SMPA55bCfvjB", + "pycharm": { + "name": "#%%\n" + } + }, + "source": [ + "def predict_trajectory(state, actions, model, action_repeat=1):\n", + " states = []\n", + " for action in actions:\n", + " for _ in range(action_repeat):\n", + " state = model(state, action)\n", + " states.append(state)\n", + " return torch.stack(states, dim=0)\n", + "\n", + "def plot_trajectory(states, color):\n", + " scales = np.array(env.unwrapped.config[\"observation\"][\"scales\"])\n", + " states = np.clip(states.squeeze(1).detach().numpy() * scales, -100, 100)\n", + " plt.plot(states[:, 0], states[:, 1], color=color, marker='.')\n", + " plt.arrow(states[-1,0], states[-1,1], states[-1,4]*1, states[-1,5]*1, color=color)\n", + "\n", + "def visualize_trajectories(model, state, horizon=15):\n", + " plt.cla()\n", + " # Draw a car\n", + " plt.plot(state.numpy()[0]+2.5*np.array([-1, -1, 1, 1, -1]),\n", + " state.numpy()[1]+1.0*np.array([-1, 1, 1, -1, -1]), 'k')\n", + " # Draw trajectories\n", + " state = state.unsqueeze(0)\n", + " colors = iter(plt.get_cmap(\"tab20\").colors)\n", + " # Generate commands\n", + " for steering in np.linspace(-0.5, 0.5, 3):\n", + " for acceleration in np.linspace(0.8, 0.4, 2):\n", + " actions = torch.Tensor([acceleration, steering]).view(1,1,-1)\n", + " # Predict trajectories\n", + " states = predict_trajectory(state, actions, model, action_repeat=horizon)\n", + " plot_trajectory(states, color=next(colors))\n", + " plt.axis(\"equal\")\n", + " plt.show()\n", + " \n", + "visualize_trajectories(dynamics, state=torch.Tensor([0, 0, 0, 0, 1, 0]))" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "DOa0j1_muNXi" + }, + "source": [ + "## Reward model\n", + "We assume that the reward $R(s,a)$ is known (chosen by the system designer), and takes the form of a **weighted L1-norm** between the state and the goal." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "cRubRv9buNXj" + }, + "source": [ + "def reward_model(states, goal, gamma=None):\n", + " \"\"\"\n", + " The reward is a weighted L1-norm between the state and a goal\n", + " :param Tensor states: a batch of states. shape: [batch_size, state_size].\n", + " :param Tensor goal: a goal state. shape: [state_size].\n", + " :param float gamma: a discount factor\n", + " \"\"\"\n", + " goal = goal.expand(states.shape)\n", + " reward_weigths = torch.Tensor(env.unwrapped.config[\"reward_weights\"])\n", + " rewards = -torch.pow(torch.norm((states-goal)*reward_weigths, p=1, dim=-1), 0.5)\n", + " if gamma:\n", + " time = torch.arange(rewards.shape[0], dtype=torch.float).unsqueeze(-1).expand(rewards.shape)\n", + " rewards *= torch.pow(gamma, time)\n", + " return rewards\n", + "\n", + "obs, info = env.reset()\n", + "print(\"Reward of a sample transition:\", reward_model(torch.Tensor(obs[\"observation\"]).unsqueeze(0),\n", + " torch.Tensor(obs[\"desired_goal\"])))" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Q5D6W4p7fvjI" + }, + "source": [ + "## Leverage dynamics model for planning\n", + "\n", + "We now use the learnt dynamics model $f_\\theta$ for planning.\n", + "In order to solve the optimal control problem, we use a sampling-based optimization algorithm: the **Cross-Entropy Method** (`CEM`). It is an optimization algorithm applicable to problems that are both **combinatorial** and **continuous**, which is our case: find the best performing sequence of actions.\n", + "\n", + "This method approximates the optimal importance sampling estimator by repeating two phases:\n", + "1. **Draw samples** from a probability distribution. We use Gaussian distributions over sequences of actions.\n", + "2. Minimize the **cross-entropy** between this distribution and a **target distribution** to produce a better sample in the next iteration. We define this target distribution by selecting the top-k performing sampled sequences.\n", + "\n", + "![Credits to Olivier Sigaud](https://github.com/yfletberliac/rlss2019-hands-on/blob/master/imgs/cem.png?raw=1)\n", + "\n", + "Note that as we have a local linear dynamics model, we could instead choose an `Iterative LQR` planner which would be more efficient. We prefer `CEM` in this educational setting for its simplicity and generality." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "bzPKYg23fvjL", + "pycharm": { + "is_executing": false, + "name": "#%%\n" + } + }, + "source": [ + "def cem_planner(state, goal, action_size, horizon=5, population=100, selection=10, iterations=5):\n", + " state = state.expand(population, -1)\n", + " action_mean = torch.zeros(horizon, 1, action_size)\n", + " action_std = torch.ones(horizon, 1, action_size)\n", + " for _ in range(iterations):\n", + " # 1. Draw sample sequences of actions from a normal distribution\n", + " actions = torch.normal(mean=action_mean.repeat(1, population, 1), std=action_std.repeat(1, population, 1))\n", + " actions = torch.clamp(actions, min=env.action_space.low.min(), max=env.action_space.high.max())\n", + " states = predict_trajectory(state, actions, dynamics, action_repeat=5)\n", + " # 2. Fit the distribution to the top-k performing sequences\n", + " returns = reward_model(states, goal).sum(dim=0)\n", + " _, best = returns.topk(selection, largest=True, sorted=False)\n", + " best_actions = actions[:, best, :]\n", + " action_mean = best_actions.mean(dim=1, keepdim=True)\n", + " action_std = best_actions.std(dim=1, unbiased=False, keepdim=True)\n", + " return action_mean[0].squeeze(dim=0)\n", + " \n", + " \n", + "# Run the planner on a sample transition\n", + "action = cem_planner(torch.Tensor(obs[\"observation\"]),\n", + " torch.Tensor(obs[\"desired_goal\"]),\n", + " env.action_space.shape[0])\n", + "print(\"Planned action:\", action)" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "_8L6vEPWyea7" + }, + "source": [ + "## Visualize a few episodes\n", + "\n", + "En voiture, Simone !" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xOcOP7Of18T2" + }, + "source": [ + "env = gym.make(\"parking-v0\", render_mode='rgb_array')\n", + "env = record_videos(env)\n", + "obs, info = env.reset()\n", + "\n", + "for step in trange(3 * env.config[\"duration\"], desc=\"Testing 3 episodes...\"):\n", + " action = cem_planner(torch.Tensor(obs[\"observation\"]),\n", + " torch.Tensor(obs[\"desired_goal\"]),\n", + " env.action_space.shape[0])\n", + " obs, reward, done, truncated, info = env.step(action.numpy())\n", + " if done or truncated:\n", + " obs, info = env.reset()\n", + "env.close()\n", + "show_videos()" + ], + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "psBBQIv4fvjT", + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "## Limitations\n", + "\n", + "### Model bias\n", + "\n", + "In model-based reinforcement learning, we replace our original optimal control problem by another problem: optimizing our learnt approximate MDP. When settling for this approximate MDP to plan with, we introduce a **bias** that can only **decrease the true performance** of the corresponding planned policy. This is called the problem of model bias.\n", + "\n", + "In some MDPs, even slight model errors lead to a dramatic drop in performance, as illustrated in the beginning of the following video:\n", + "\n", + "[![Approximate Robust Control of Uncertain Dynamical Systems](https://img.youtube.com/vi/8khqd3BJo0A/0.jpg)](https://www.youtube.com/watch?v=8khqd3BJo0A)\n", + "\n", + "The question of how to address model bias belongs to the field of **Safe Reinforcement Learning**. \n", + "\n", + "### [The call of the void](https://www.urbandictionary.com/define.php?term=the%20call%20of%20the%20void)\n", + "\n", + "The model will be accurate only on some region of the state space that was explored and covered in $D$.\n", + "Outside of $D$, the model may diverge and **hallucinate** important rewards.\n", + "This effect is problematic when the model is used by a planning algorithm, as the latter will try to **exploit** these hallucinated high rewards and will steer the agent towards **unknown** (and thus dangerous) **regions** where the model is erroneously optimistic.\n", + "\n", + "### Computational cost of planning\n", + "\n", + "At test time, the planning step typically requires **sampling a lot of trajectories** to find a near-optimal candidate, which may turn out to be very costly. This may be prohibitive in a high-frequency real-time setting. The **model-free** methods which directly recommend the best action are **much more efficient** in that regard." + ] + } + ], + "metadata": { + "colab": { + "name": "parking_model_based.ipynb", + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.5" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "metadata": { + "collapsed": false + }, + "source": [] + } + } + }, + "nbformat": 4, + "nbformat_minor": 0 } diff --git a/scripts/sb3_highway_dqn.ipynb b/scripts/sb3_highway_dqn.ipynb index bf46961cb..6bd865c06 100644 --- a/scripts/sb3_highway_dqn.ipynb +++ b/scripts/sb3_highway_dqn.ipynb @@ -1,177 +1,179 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": { - "id": "5eeje4O8fviH", - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "# Highway with SB3's DQN\n", - "\n", - "## Warming up\n", - "We start with a few useful installs and imports:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "bzMSuJEOfviP", - "pycharm": { - "is_executing": false, - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "# Install environment and agent\n", - "!pip install highway-env\n", - "# TODO: we use the bleeding edge version because the current stable version does not support the latest gym>=0.21 versions. Revert back to stable at the next SB3 release.\n", - "!pip install git+https://github.com/DLR-RM/stable-baselines3\n", - "\n", - "# Environment\n", - "import gymnasium as gym\n", - "import highway_env\n", - "\n", - "# Agent\n", - "from stable_baselines3 import DQN\n", - "\n", - "# Visualization utils\n", - "%load_ext tensorboard\n", - "import sys\n", - "from tqdm.notebook import trange\n", - "!pip install tensorboardx gym pyvirtualdisplay\n", - "!apt-get install -y xvfb ffmpeg\n", - "!git clone https://github.com/Farama-Foundation/HighwayEnv.git 2> /dev/null\n", - "sys.path.insert(0, '/content/HighwayEnv/scripts/')\n", - "from utils import record_videos, show_videos" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "collapsed": false, - "id": "_wACJRDjqP-f", - "pycharm": { - "name": "#%% md\n" - } - }, - "source": [ - "## Training\n", - "Run tensorboard locally to visualize training." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "ZSRTtNNzE5nL", - "pycharm": { - "name": "#%% \n" - } - }, - "outputs": [], - "source": [ - "%tensorboard --logdir \"highway_dqn\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "Y5TOvonYqP-g", - "pycharm": { - "name": "#%% \n" - } - }, - "outputs": [], - "source": [ - "model = DQN('MlpPolicy', 'highway-fast-v0',\n", - " policy_kwargs=dict(net_arch=[256, 256]),\n", - " learning_rate=5e-4,\n", - " buffer_size=15000,\n", - " learning_starts=200,\n", - " batch_size=32,\n", - " gamma=0.8,\n", - " train_freq=1,\n", - " gradient_steps=1,\n", - " target_update_interval=50,\n", - " exploration_fraction=0.7,\n", - " verbose=1,\n", - " tensorboard_log='highway_dqn/')\n", - "model.learn(int(2e4))\n" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "id": "n2Bu_Pqop0E7" - }, - "source": [ - "## Testing\n", - "\n", - "Visualize a few episodes" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "xOcOP7Of18T2", - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "env = gym.make('highway-fast-v0', render_mode='rgb_array')\n", - "env = record_videos(env)\n", - "for episode in trange(3, desc='Test episodes'):\n", - " (obs, info), done = env.reset(), False\n", - " while not done:\n", - " action, _ = model.predict(obs, deterministic=True)\n", - " obs, reward, done, truncated, info = env.step(int(action))\n", - "env.close()\n", - "show_videos()" - ] + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "5eeje4O8fviH", + "pycharm": { + "name": "#%% md\n" } - ], - "metadata": { - "accelerator": "GPU", - "colab": { - "name": "sb3_highway_dqn.ipynb", - "provenance": [] - }, - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.5" - }, + }, + "source": [ + "# Highway with SB3's DQN\n", + "\n", + "## Warming up\n", + "We start with a few useful installs and imports:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "bzMSuJEOfviP", + "pycharm": { + "is_executing": false, + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "# Install environment and agent\n", + "!pip install highway-env\n", + "# TODO: we use the bleeding edge version because the current stable version does not support the latest gym>=0.21 versions. Revert back to stable at the next SB3 release.\n", + "!pip install git+https://github.com/DLR-RM/stable-baselines3\n", + "\n", + "# Environment\n", + "import gymnasium as gym\n", + "import highway_env\n", + "\n", + "gym.register_envs(highway_env)\n", + "\n", + "# Agent\n", + "from stable_baselines3 import DQN\n", + "\n", + "# Visualization utils\n", + "%load_ext tensorboard\n", + "import sys\n", + "from tqdm.notebook import trange\n", + "!pip install tensorboardx gym pyvirtualdisplay\n", + "!apt-get install -y xvfb ffmpeg\n", + "!git clone https://github.com/Farama-Foundation/HighwayEnv.git 2> /dev/null\n", + "sys.path.insert(0, '/content/HighwayEnv/scripts/')\n", + "from utils import record_videos, show_videos" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "collapsed": false, + "id": "_wACJRDjqP-f", "pycharm": { - "stem_cell": { - "cell_type": "raw", - "metadata": { - "collapsed": false - }, - "source": [] - } + "name": "#%% md\n" } + }, + "source": [ + "## Training\n", + "Run tensorboard locally to visualize training." + ] }, - "nbformat": 4, - "nbformat_minor": 0 + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "ZSRTtNNzE5nL", + "pycharm": { + "name": "#%% \n" + } + }, + "outputs": [], + "source": [ + "%tensorboard --logdir \"highway_dqn\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "Y5TOvonYqP-g", + "pycharm": { + "name": "#%% \n" + } + }, + "outputs": [], + "source": [ + "model = DQN('MlpPolicy', 'highway-fast-v0',\n", + " policy_kwargs=dict(net_arch=[256, 256]),\n", + " learning_rate=5e-4,\n", + " buffer_size=15000,\n", + " learning_starts=200,\n", + " batch_size=32,\n", + " gamma=0.8,\n", + " train_freq=1,\n", + " gradient_steps=1,\n", + " target_update_interval=50,\n", + " exploration_fraction=0.7,\n", + " verbose=1,\n", + " tensorboard_log='highway_dqn/')\n", + "model.learn(int(2e4))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "n2Bu_Pqop0E7" + }, + "source": [ + "## Testing\n", + "\n", + "Visualize a few episodes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "xOcOP7Of18T2", + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "env = gym.make('highway-fast-v0', render_mode='rgb_array')\n", + "env = record_videos(env)\n", + "for episode in trange(3, desc='Test episodes'):\n", + " (obs, info), done = env.reset(), False\n", + " while not done:\n", + " action, _ = model.predict(obs, deterministic=True)\n", + " obs, reward, done, truncated, info = env.step(int(action))\n", + "env.close()\n", + "show_videos()" + ] + } + ], + "metadata": { + "accelerator": "GPU", + "colab": { + "name": "sb3_highway_dqn.ipynb", + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.5" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "metadata": { + "collapsed": false + }, + "source": [] + } + } + }, + "nbformat": 4, + "nbformat_minor": 0 } diff --git a/scripts/sb3_highway_dqn.py b/scripts/sb3_highway_dqn.py index 8aa964320..a9769934b 100644 --- a/scripts/sb3_highway_dqn.py +++ b/scripts/sb3_highway_dqn.py @@ -4,6 +4,7 @@ import highway_env # noqa: F401 + TRAIN = True if __name__ == "__main__": diff --git a/scripts/sb3_highway_ppo.py b/scripts/sb3_highway_ppo.py index 372c271c8..769784df8 100644 --- a/scripts/sb3_highway_ppo.py +++ b/scripts/sb3_highway_ppo.py @@ -5,6 +5,7 @@ import highway_env # noqa: F401 + # ================================== # Main script # ================================== diff --git a/scripts/sb3_highway_ppo_transformer.py b/scripts/sb3_highway_ppo_transformer.py index aa234de3b..abb7f4ef8 100644 --- a/scripts/sb3_highway_ppo_transformer.py +++ b/scripts/sb3_highway_ppo_transformer.py @@ -17,6 +17,7 @@ import highway_env # noqa: F401 from highway_env.utils import lmap + # ================================== # Policy Architecture # ================================== @@ -30,7 +31,7 @@ def activation_factory(activation_type): elif activation_type == "ELU": return nn.ELU() else: - raise ValueError("Unknown activation_type: {}".format(activation_type)) + raise ValueError(f"Unknown activation_type: {activation_type}") class BaseModule(torch.nn.Module): @@ -69,7 +70,7 @@ def __init__( out_size=None, activation="RELU", is_policy=False, - **kwargs + **kwargs, ): super().__init__(**kwargs) self.reshape = reshape @@ -170,7 +171,7 @@ def __init__( presence_feature_idx=0, embedding_layer_kwargs=None, attention_layer_kwargs=None, - **kwargs + **kwargs, ): super().__init__(**kwargs) self.out_size = out_size diff --git a/scripts/sb3_racetracks_ppo.py b/scripts/sb3_racetracks_ppo.py index 029528fe9..acc4e9cab 100644 --- a/scripts/sb3_racetracks_ppo.py +++ b/scripts/sb3_racetracks_ppo.py @@ -6,6 +6,7 @@ import highway_env # noqa: F401 + TRAIN = True if __name__ == "__main__": diff --git a/scripts/utils.py b/scripts/utils.py index 8b5af6f3f..941c01106 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -5,6 +5,7 @@ from IPython import display as ipythondisplay from pyvirtualdisplay import Display + display = Display(visible=0, size=(1400, 900)) display.start() diff --git a/setup.py b/setup.py index e5e5d6041..995e47204 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,7 @@ from setuptools import setup + CWD = pathlib.Path(__file__).absolute().parent diff --git a/tests/envs/test_actions.py b/tests/envs/test_actions.py index 451b84550..6eded99c2 100644 --- a/tests/envs/test_actions.py +++ b/tests/envs/test_actions.py @@ -3,6 +3,7 @@ import highway_env + gym.register_envs(highway_env) diff --git a/tests/envs/test_env_preprocessors.py b/tests/envs/test_env_preprocessors.py index 4f78643fd..84ad3849c 100644 --- a/tests/envs/test_env_preprocessors.py +++ b/tests/envs/test_env_preprocessors.py @@ -2,6 +2,7 @@ import highway_env + gym.register_envs(highway_env) diff --git a/tests/envs/test_gym.py b/tests/envs/test_gym.py index a478d8ed4..e59b90532 100644 --- a/tests/envs/test_gym.py +++ b/tests/envs/test_gym.py @@ -4,6 +4,7 @@ import highway_env from highway_env.envs.highway_env import HighwayEnv + gym.register_envs(highway_env) diff --git a/tests/envs/test_time.py b/tests/envs/test_time.py index 4de14aa75..3e94c400c 100644 --- a/tests/envs/test_time.py +++ b/tests/envs/test_time.py @@ -4,6 +4,7 @@ import highway_env + gym.register_envs(highway_env) @@ -34,7 +35,7 @@ def test_running_time(repeat=1): env = gym.make(env_name) time_simulated = steps / env.unwrapped.config["policy_frequency"] real_time_ratio = time_simulated / time_spent - print("Real time ratio for {}: {}".format(env_name, real_time_ratio)) + print(f"Real time ratio for {env_name}: {real_time_ratio}") assert real_time_ratio > 0.5 # let's not be too ambitious for now diff --git a/tests/graphics/test_render.py b/tests/graphics/test_render.py index f11062493..ba4c5deed 100644 --- a/tests/graphics/test_render.py +++ b/tests/graphics/test_render.py @@ -4,6 +4,7 @@ import highway_env + gym.register_envs(highway_env) diff --git a/tests/vehicle/test_behavior.py b/tests/vehicle/test_behavior.py index e34e64b15..94b2eac8a 100644 --- a/tests/vehicle/test_behavior.py +++ b/tests/vehicle/test_behavior.py @@ -4,6 +4,7 @@ from highway_env.vehicle.behavior import IDMVehicle, LinearVehicle from highway_env.vehicle.objects import Obstacle + FPS = 15 diff --git a/tests/vehicle/test_control.py b/tests/vehicle/test_control.py index 68856a040..ef3ab6563 100644 --- a/tests/vehicle/test_control.py +++ b/tests/vehicle/test_control.py @@ -4,6 +4,7 @@ from highway_env.road.road import Road, RoadNetwork from highway_env.vehicle.controller import ControlledVehicle + FPS = 15 diff --git a/tests/vehicle/test_dynamics.py b/tests/vehicle/test_dynamics.py index 18682c5ec..a5f93009a 100644 --- a/tests/vehicle/test_dynamics.py +++ b/tests/vehicle/test_dynamics.py @@ -4,6 +4,7 @@ from highway_env.vehicle.kinematics import Vehicle from highway_env.vehicle.objects import Landmark, Obstacle + FPS = 15 diff --git a/tests/vehicle/test_uncertainty.py b/tests/vehicle/test_uncertainty.py index cd1848d4a..ad4f45d7a 100644 --- a/tests/vehicle/test_uncertainty.py +++ b/tests/vehicle/test_uncertainty.py @@ -1,6 +1,7 @@ from highway_env.road.road import Road, RoadNetwork from highway_env.vehicle.uncertainty.prediction import IntervalVehicle + FPS = 15