Skip to content

Commit

Permalink
locker: refactor to reduce code complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
abn committed Oct 14, 2020
1 parent 733736c commit ed43d94
Showing 1 changed file with 97 additions and 77 deletions.
174 changes: 97 additions & 77 deletions poetry/packages/locker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

from copy import deepcopy
from hashlib import sha256
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union

from tomlkit import array
Expand Down Expand Up @@ -185,36 +188,107 @@ def locked_repository(

return packages

@staticmethod
def __get_locked_package(
_dependency, packages_by_name
): # type: (Dependency, Dict[str, List[Package]]) -> Optional[Package]
"""
Internal helper to identify corresponding locked package using dependency
version constraints.
"""
for _package in packages_by_name.get(_dependency.name, []):
if _dependency.constraint.allows(_package.version):
return _package
return None

@classmethod
def __walk_dependency_level(
cls,
dependencies,
level,
pinned_versions,
packages_by_name,
project_level_dependencies,
nested_dependencies,
): # type: (List[Dependency], int, bool, Dict[str, List[Package]], Set[str], Dict[Tuple[str, str], Dependency]) -> Dict[Tuple[str, str], Dependency]
if not dependencies:
return nested_dependencies

next_level_dependencies = []

for requirement in dependencies:
locked_package = cls.__get_locked_package(requirement, packages_by_name)

if locked_package:
for require in locked_package.requires:
if require.marker.is_empty():
require.marker = requirement.marker
else:
require.marker = require.marker.intersect(requirement.marker)

require.marker = require.marker.intersect(locked_package.marker)
next_level_dependencies.append(require)

if requirement.name in project_level_dependencies and level == 0:
# project level dependencies take precedence
continue

if locked_package:
# create dependency from locked package to retain dependency metadata
# if this is not done, we can end-up with incorrect nested dependencies
marker = requirement.marker
requirement = locked_package.to_dependency()
requirement.marker = requirement.marker.intersect(marker)
else:
# we make a copy to avoid any side-effects
requirement = deepcopy(requirement)

if pinned_versions:
requirement.set_constraint(
cls.__get_locked_package(requirement, packages_by_name)
.to_dependency()
.constraint
)

# dependencies use extra to indicate that it was activated via parent
# package's extras, this is not required for nested exports as we assume
# the resolver already selected this dependency
requirement.marker = requirement.marker.without_extras()

key = (requirement.name, requirement.pretty_constraint)
if key not in nested_dependencies:
nested_dependencies[key] = requirement
else:
nested_dependencies[key].marker = nested_dependencies[
key
].marker.intersect(requirement.marker)

return cls.__walk_dependency_level(
dependencies=next_level_dependencies,
level=level + 1,
pinned_versions=pinned_versions,
packages_by_name=packages_by_name,
project_level_dependencies=project_level_dependencies,
nested_dependencies=nested_dependencies,
)

@classmethod
def get_project_dependencies(
cls, project_requires, locked_packages, pinned_versions=False, with_nested=False
): # type: (List[Dependency], List[Package], bool, bool) -> Iterable[Dependency]
# group packages entries by name, this is required because requirement might use
# different constraints
# group packages entries by name, this is required because requirement might use different constraints
packages_by_name = {}
for pkg in locked_packages:
if pkg.name not in packages_by_name:
packages_by_name[pkg.name] = []
packages_by_name[pkg.name].append(pkg)

def __get_locked_package(
_dependency,
): # type: (Dependency) -> Optional[Package]
"""
Internal helper to identify corresponding locked package using dependency
version constraints.
"""
for _package in packages_by_name.get(_dependency.name, []):
if _dependency.constraint.allows(_package.version):
return _package
return None

project_level_dependencies = set()
dependencies = []

for dependency in project_requires:
dependency = deepcopy(dependency)
locked_package = __get_locked_package(dependency)
locked_package = cls.__get_locked_package(dependency, packages_by_name)
if locked_package:
locked_dependency = locked_package.to_dependency()
locked_dependency.marker = dependency.marker.intersect(
Expand All @@ -233,68 +307,14 @@ def __get_locked_package(
# return only with project level dependencies
return dependencies

nested_dependencies = dict()

def __walk_level(
__dependencies, __level
): # type: (List[Dependency], int) -> None
if not __dependencies:
return

__next_level = []

for requirement in __dependencies:
__locked_package = __get_locked_package(requirement)

if __locked_package:
for require in __locked_package.requires:
if require.marker.is_empty():
require.marker = requirement.marker
else:
require.marker = require.marker.intersect(
requirement.marker
)

require.marker = require.marker.intersect(
__locked_package.marker
)
__next_level.append(require)

if requirement.name in project_level_dependencies and __level == 0:
# project level dependencies take precedence
continue

if __locked_package:
# create dependency from locked package to retain dependency metadata
# if this is not done, we can end-up with incorrect nested dependencies
marker = requirement.marker
requirement = __locked_package.to_dependency()
requirement.marker = requirement.marker.intersect(marker)
else:
# we make a copy to avoid any side-effects
requirement = deepcopy(requirement)

if pinned_versions:
requirement.set_constraint(
__get_locked_package(requirement).to_dependency().constraint
)

# dependencies use extra to indicate that it was activated via parent
# package's extras, this is not required for nested exports as we assume
# the resolver already selected this dependency
requirement.marker = requirement.marker.without_extras()

key = (requirement.name, requirement.pretty_constraint)
if key not in nested_dependencies:
nested_dependencies[key] = requirement
else:
nested_dependencies[key].marker = nested_dependencies[
key
].marker.intersect(requirement.marker)

return __walk_level(__next_level, __level + 1)

__walk_level(dependencies, 0)
nested_dependencies = cls.__walk_dependency_level(
dependencies=dependencies,
level=0,
pinned_versions=pinned_versions,
packages_by_name=packages_by_name,
project_level_dependencies=project_level_dependencies,
nested_dependencies=dict(),
)

# Merge same dependencies using marker union
for requirement in dependencies:
Expand Down

0 comments on commit ed43d94

Please sign in to comment.