Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-117348: Refactored RawConfigParser._read for simplicity and comprehensibility #117372

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a25ac00
Extract method for _read_inner, reducing complexity and indentation b…
jaraco Mar 27, 2024
1e69aae
Extract method for _raise_all and yield ParseErrors from _read_inner.
jaraco Mar 27, 2024
8c47781
Prefer iterators to splat expansion and literal indexing.
jaraco Mar 27, 2024
7479814
Extract method for _strip_comments. Reduces complexity by 7.
jaraco Mar 27, 2024
a2fffee
Model the file lines in a class to encapsulate the comment status and…
jaraco Mar 28, 2024
23468cb
Encapsulate the read state as a dataclass
jaraco Mar 28, 2024
3d1ef0a
Extract _handle_continuation_line and _handle_rest methods. Reduces c…
jaraco Mar 28, 2024
071baeb
Reindent
jaraco Mar 28, 2024
81f4ce2
At least for now, collect errors in the ReadState
jaraco Mar 28, 2024
8942cc1
Check for missing section header separately.
jaraco Mar 28, 2024
1e72168
Extract methods for _handle_header and _handle_option. Reduces comple…
jaraco Mar 28, 2024
0dfd797
Remove unreachable code. Reduces complexity by 4.
jaraco Mar 28, 2024
77ed897
Remove unreachable branch
jaraco Mar 28, 2024
76f42d3
Handle error condition early. Reduces complexity by 1.
jaraco Mar 28, 2024
c18a2bb
Add blurb
jaraco Mar 29, 2024
97aa785
Move _raise_all to ParsingError, as its behavior is most closely rela…
jaraco Mar 29, 2024
d310cb4
Split _strip* into separate methods.
jaraco Mar 29, 2024
f2a355c
Refactor _strip_full to compute the strip just once and use 'not any'…
jaraco Mar 29, 2024
2492614
Replace use of 'sys.maxsize' with direct computation of the stripped …
jaraco Mar 29, 2024
4968591
Extract has_comments as a dynamic property.
jaraco Mar 29, 2024
7d807bb
Implement clean as a cached property.
jaraco Mar 29, 2024
29cb20f
Model comment prefixes in the RawConfigParser within a prefixes names…
jaraco Mar 29, 2024
c834c35
Use a regular expression to search for the first match.
jaraco Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 183 additions & 147 deletions Lib/configparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@

from collections.abc import MutableMapping
from collections import ChainMap as _ChainMap
import contextlib
from dataclasses import dataclass, field
import functools
import io
import itertools
import os
import re
import sys
from typing import Iterable

__all__ = ("NoSectionError", "DuplicateOptionError", "DuplicateSectionError",
"NoOptionError", "InterpolationError", "InterpolationDepthError",
Expand Down Expand Up @@ -302,15 +305,33 @@ def __init__(self, option, section, rawval):
class ParsingError(Error):
"""Raised when a configuration file does not follow legal syntax."""

def __init__(self, source):
def __init__(self, source, *args):
super().__init__(f'Source contains parsing errors: {source!r}')
self.source = source
self.errors = []
self.args = (source, )
if args:
self.append(*args)

def append(self, lineno, line):
self.errors.append((lineno, line))
self.message += '\n\t[line %2d]: %s' % (lineno, line)
self.message += '\n\t[line %2d]: %s' % (lineno, repr(line))

def combine(self, others):
for other in others:
for error in other.errors:
self.append(*error)
return self

@staticmethod
def _raise_all(exceptions: Iterable['ParsingError']):
"""
Combine any number of ParsingErrors into one and raise it.
"""
exceptions = iter(exceptions)
with contextlib.suppress(StopIteration):
raise next(exceptions).combine(exceptions)



class MissingSectionHeaderError(ParsingError):
Expand Down Expand Up @@ -517,6 +538,55 @@ def _interpolate_some(self, parser, option, accum, rest, section, map,
"found: %r" % (rest,))


@dataclass
class _ReadState:
elements_added : set[str] = field(default_factory=set)
cursect : dict[str, str] | None = None
sectname : str | None = None
optname : str | None = None
lineno : int = 0
indent_level : int = 0
errors : list[ParsingError] = field(default_factory=list)


@dataclass
class _Prefixes:
full : Iterable[str]
inline : Iterable[str]


class _Line(str):

def __new__(cls, val, *args, **kwargs):
return super().__new__(cls, val)

def __init__(self, val, prefixes: _Prefixes):
self.prefixes = prefixes

@functools.cached_property
def clean(self):
return self._strip_full() and self._strip_inline()

@property
def has_comments(self):
return self.strip() != self.clean

def _strip_inline(self):
"""
Search for the earliest prefix at the beginning of the line or following a space.
"""
matcher = re.compile(
'|'.join(fr'(^|\s)({re.escape(prefix)})' for prefix in self.prefixes.inline)
# match nothing if no prefixes
or '(?!)'
)
match = matcher.search(self)
return self[:match.start() if match else None].strip()

def _strip_full(self):
return '' if any(map(self.strip().startswith, self.prefixes.full)) else True


class RawConfigParser(MutableMapping):
"""ConfigParser that does not do interpolation."""

Expand Down Expand Up @@ -583,8 +653,10 @@ def __init__(self, defaults=None, dict_type=_default_dict,
else:
self._optcre = re.compile(self._OPT_TMPL.format(delim=d),
re.VERBOSE)
self._comment_prefixes = tuple(comment_prefixes or ())
self._inline_comment_prefixes = tuple(inline_comment_prefixes or ())
self._prefixes = _Prefixes(
full=tuple(comment_prefixes or ()),
inline=tuple(inline_comment_prefixes or ()),
)
self._strict = strict
self._allow_no_value = allow_no_value
self._empty_lines_in_values = empty_lines_in_values
Expand Down Expand Up @@ -975,147 +1047,117 @@ def _read(self, fp, fpname):
in an otherwise empty line or may be entered in lines holding values or
section names. Please note that comments get stripped off when reading configuration files.
"""
elements_added = set()
cursect = None # None, or a dictionary
sectname = None
optname = None
lineno = 0
indent_level = 0
e = None # None, or an exception

try:
for lineno, line in enumerate(fp, start=1):
comment_start = sys.maxsize
# strip inline comments
inline_prefixes = {p: -1 for p in self._inline_comment_prefixes}
while comment_start == sys.maxsize and inline_prefixes:
next_prefixes = {}
for prefix, index in inline_prefixes.items():
index = line.find(prefix, index+1)
if index == -1:
continue
next_prefixes[prefix] = index
if index == 0 or (index > 0 and line[index-1].isspace()):
comment_start = min(comment_start, index)
inline_prefixes = next_prefixes
# strip full line comments
for prefix in self._comment_prefixes:
if line.strip().startswith(prefix):
comment_start = 0
break
if comment_start == sys.maxsize:
comment_start = None
value = line[:comment_start].strip()
if not value:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (comment_start is None and
cursect is not None and
optname and
cursect[optname] is not None):
cursect[optname].append('') # newlines added at join
else:
# empty line marks end of value
indent_level = sys.maxsize
continue
# continuation line?
first_nonspace = self.NONSPACECRE.search(line)
cur_indent_level = first_nonspace.start() if first_nonspace else 0
if (cursect is not None and optname and
cur_indent_level > indent_level):
if cursect[optname] is None:
raise MultilineContinuationError(fpname, lineno, line)
cursect[optname].append(value)
# a section header or option header?
else:
if self._allow_unnamed_section and cursect is None:
sectname = UNNAMED_SECTION
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)

indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self._optcre.match(value)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if not optname:
e = self._handle_error(e, fpname, lineno, line)
optname = self.optionxform(optname.rstrip())
if (self._strict and
(sectname, optname) in elements_added):
raise DuplicateOptionError(sectname, optname,
fpname, lineno)
elements_added.add((sectname, optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
cursect[optname] = [optval]
else:
# valueless option handling
cursect[optname] = None
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
e = self._handle_error(e, fpname, lineno, line)
ParsingError._raise_all(self._read_inner(fp, fpname))
finally:
self._join_multiline_values()
# if any parsing errors occurred, raise an exception
if e:
raise e

def _read_inner(self, fp, fpname):
st = _ReadState()

Line = functools.partial(_Line, prefixes=self._prefixes)
for st.lineno, line in enumerate(map(Line, fp), start=1):
if not line.clean:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (not line.has_comments and
st.cursect is not None and
st.optname and
st.cursect[st.optname] is not None):
st.cursect[st.optname].append('') # newlines added at join
else:
# empty line marks end of value
st.indent_level = sys.maxsize
continue

first_nonspace = self.NONSPACECRE.search(line)
st.cur_indent_level = first_nonspace.start() if first_nonspace else 0

if self._handle_continuation_line(st, line, fpname):
continue

self._handle_rest(st, line, fpname)

return st.errors

def _handle_continuation_line(self, st, line, fpname):
# continuation line?
is_continue = (st.cursect is not None and st.optname and
st.cur_indent_level > st.indent_level)
if is_continue:
if st.cursect[st.optname] is None:
raise MultilineContinuationError(fpname, st.lineno, line)
st.cursect[st.optname].append(line.clean)
return is_continue

def _handle_rest(self, st, line, fpname):
# a section header or option header?
if self._allow_unnamed_section and st.cursect is None:
st.sectname = UNNAMED_SECTION
st.cursect = self._dict()
self._sections[st.sectname] = st.cursect
self._proxies[st.sectname] = SectionProxy(self, st.sectname)
st.elements_added.add(st.sectname)

st.indent_level = st.cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(line.clean)

if not mo and st.cursect is None:
raise MissingSectionHeaderError(fpname, st.lineno, line)

self._handle_header(st, mo, fpname) if mo else self._handle_option(st, line, fpname)

def _handle_header(self, st, mo, fpname):
st.sectname = mo.group('header')
if st.sectname in self._sections:
if self._strict and st.sectname in st.elements_added:
raise DuplicateSectionError(st.sectname, fpname,
st.lineno)
st.cursect = self._sections[st.sectname]
st.elements_added.add(st.sectname)
elif st.sectname == self.default_section:
st.cursect = self._defaults
else:
st.cursect = self._dict()
self._sections[st.sectname] = st.cursect
self._proxies[st.sectname] = SectionProxy(self, st.sectname)
st.elements_added.add(st.sectname)
# So sections can't start with a continuation line
st.optname = None

def _handle_option(self, st, line, fpname):
# an option line?
st.indent_level = st.cur_indent_level

mo = self._optcre.match(line.clean)
if not mo:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
st.errors.append(ParsingError(fpname, st.lineno, line))
return

st.optname, vi, optval = mo.group('option', 'vi', 'value')
if not st.optname:
st.errors.append(ParsingError(fpname, st.lineno, line))
st.optname = self.optionxform(st.optname.rstrip())
if (self._strict and
(st.sectname, st.optname) in st.elements_added):
raise DuplicateOptionError(st.sectname, st.optname,
fpname, st.lineno)
st.elements_added.add((st.sectname, st.optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
st.cursect[st.optname] = [optval]
else:
# valueless option handling
st.cursect[st.optname] = None

def _join_multiline_values(self):
defaults = self.default_section, self._defaults
Expand All @@ -1135,12 +1177,6 @@ def _read_defaults(self, defaults):
for key, value in defaults.items():
self._defaults[self.optionxform(key)] = value

def _handle_error(self, exc, fpname, lineno, line):
if not exc:
exc = ParsingError(fpname)
exc.append(lineno, repr(line))
return exc

def _unify_values(self, section, vars):
"""Create a sequence of lookups with 'vars' taking priority over
the 'section' which takes priority over the DEFAULTSECT.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Refactored :meth:`configparser.RawConfigParser._read` to reduce cyclometric
complexity and improve comprehensibility.
Loading