Skip to content

Commit

Permalink
pythongh-117348: Refactored RawConfigParser._read for similicity and …
Browse files Browse the repository at this point in the history
…comprehensibility (python#117372)

* Extract method for _read_inner, reducing complexity and indentation by 1.

* Extract method for _raise_all and yield ParseErrors from _read_inner.

Reduces complexity by 1 and reduces touch points for handling errors in _read_inner.

* Prefer iterators to splat expansion and literal indexing.

* Extract method for _strip_comments. Reduces complexity by 7.

* Model the file lines in a class to encapsulate the comment status and cleaned value.

* Encapsulate the read state as a dataclass

* Extract _handle_continuation_line and _handle_rest methods. Reduces complexity by 8.

* Reindent

* At least for now, collect errors in the ReadState

* Check for missing section header separately.

* Extract methods for _handle_header and _handle_option. Reduces complexity by 6.

* Remove unreachable code. Reduces complexity by 4.

* Remove unreachable branch

* Handle error condition early. Reduces complexity by 1.

* Add blurb

* Move _raise_all to ParsingError, as its behavior is most closely related to the exception class and not the reader.

* Split _strip* into separate methods.

* Refactor _strip_full to compute the strip just once and use 'not any' to determine the factor.

* Replace use of 'sys.maxsize' with direct computation of the stripped value.

* Extract has_comments as a dynamic property.

* Implement clean as a cached property.

* Model comment prefixes in the RawConfigParser within a prefixes namespace.

* Use a regular expression to search for the first match.

Avoids mutating variables and tricky logic and over-computing all of the starts when only the first is relevant.
  • Loading branch information
jaraco authored and diegorusso committed Apr 17, 2024
1 parent 1a45d0b commit 90d0eeb
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 147 deletions.
330 changes: 183 additions & 147 deletions Lib/configparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@

from collections.abc import MutableMapping
from collections import ChainMap as _ChainMap
import contextlib
from dataclasses import dataclass, field
import functools
import io
import itertools
import os
import re
import sys
from typing import Iterable

__all__ = ("NoSectionError", "DuplicateOptionError", "DuplicateSectionError",
"NoOptionError", "InterpolationError", "InterpolationDepthError",
Expand Down Expand Up @@ -302,15 +305,33 @@ def __init__(self, option, section, rawval):
class ParsingError(Error):
"""Raised when a configuration file does not follow legal syntax."""

def __init__(self, source):
def __init__(self, source, *args):
super().__init__(f'Source contains parsing errors: {source!r}')
self.source = source
self.errors = []
self.args = (source, )
if args:
self.append(*args)

def append(self, lineno, line):
self.errors.append((lineno, line))
self.message += '\n\t[line %2d]: %s' % (lineno, line)
self.message += '\n\t[line %2d]: %s' % (lineno, repr(line))

def combine(self, others):
for other in others:
for error in other.errors:
self.append(*error)
return self

@staticmethod
def _raise_all(exceptions: Iterable['ParsingError']):
"""
Combine any number of ParsingErrors into one and raise it.
"""
exceptions = iter(exceptions)
with contextlib.suppress(StopIteration):
raise next(exceptions).combine(exceptions)



class MissingSectionHeaderError(ParsingError):
Expand Down Expand Up @@ -517,6 +538,55 @@ def _interpolate_some(self, parser, option, accum, rest, section, map,
"found: %r" % (rest,))


@dataclass
class _ReadState:
elements_added : set[str] = field(default_factory=set)
cursect : dict[str, str] | None = None
sectname : str | None = None
optname : str | None = None
lineno : int = 0
indent_level : int = 0
errors : list[ParsingError] = field(default_factory=list)


@dataclass
class _Prefixes:
full : Iterable[str]
inline : Iterable[str]


class _Line(str):

def __new__(cls, val, *args, **kwargs):
return super().__new__(cls, val)

def __init__(self, val, prefixes: _Prefixes):
self.prefixes = prefixes

@functools.cached_property
def clean(self):
return self._strip_full() and self._strip_inline()

@property
def has_comments(self):
return self.strip() != self.clean

def _strip_inline(self):
"""
Search for the earliest prefix at the beginning of the line or following a space.
"""
matcher = re.compile(
'|'.join(fr'(^|\s)({re.escape(prefix)})' for prefix in self.prefixes.inline)
# match nothing if no prefixes
or '(?!)'
)
match = matcher.search(self)
return self[:match.start() if match else None].strip()

def _strip_full(self):
return '' if any(map(self.strip().startswith, self.prefixes.full)) else True


class RawConfigParser(MutableMapping):
"""ConfigParser that does not do interpolation."""

Expand Down Expand Up @@ -583,8 +653,10 @@ def __init__(self, defaults=None, dict_type=_default_dict,
else:
self._optcre = re.compile(self._OPT_TMPL.format(delim=d),
re.VERBOSE)
self._comment_prefixes = tuple(comment_prefixes or ())
self._inline_comment_prefixes = tuple(inline_comment_prefixes or ())
self._prefixes = _Prefixes(
full=tuple(comment_prefixes or ()),
inline=tuple(inline_comment_prefixes or ()),
)
self._strict = strict
self._allow_no_value = allow_no_value
self._empty_lines_in_values = empty_lines_in_values
Expand Down Expand Up @@ -975,147 +1047,117 @@ def _read(self, fp, fpname):
in an otherwise empty line or may be entered in lines holding values or
section names. Please note that comments get stripped off when reading configuration files.
"""
elements_added = set()
cursect = None # None, or a dictionary
sectname = None
optname = None
lineno = 0
indent_level = 0
e = None # None, or an exception

try:
for lineno, line in enumerate(fp, start=1):
comment_start = sys.maxsize
# strip inline comments
inline_prefixes = {p: -1 for p in self._inline_comment_prefixes}
while comment_start == sys.maxsize and inline_prefixes:
next_prefixes = {}
for prefix, index in inline_prefixes.items():
index = line.find(prefix, index+1)
if index == -1:
continue
next_prefixes[prefix] = index
if index == 0 or (index > 0 and line[index-1].isspace()):
comment_start = min(comment_start, index)
inline_prefixes = next_prefixes
# strip full line comments
for prefix in self._comment_prefixes:
if line.strip().startswith(prefix):
comment_start = 0
break
if comment_start == sys.maxsize:
comment_start = None
value = line[:comment_start].strip()
if not value:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (comment_start is None and
cursect is not None and
optname and
cursect[optname] is not None):
cursect[optname].append('') # newlines added at join
else:
# empty line marks end of value
indent_level = sys.maxsize
continue
# continuation line?
first_nonspace = self.NONSPACECRE.search(line)
cur_indent_level = first_nonspace.start() if first_nonspace else 0
if (cursect is not None and optname and
cur_indent_level > indent_level):
if cursect[optname] is None:
raise MultilineContinuationError(fpname, lineno, line)
cursect[optname].append(value)
# a section header or option header?
else:
if self._allow_unnamed_section and cursect is None:
sectname = UNNAMED_SECTION
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)

indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
indent_level = cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(value)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
if self._strict and sectname in elements_added:
raise DuplicateSectionError(sectname, fpname,
lineno)
cursect = self._sections[sectname]
elements_added.add(sectname)
elif sectname == self.default_section:
cursect = self._defaults
else:
cursect = self._dict()
self._sections[sectname] = cursect
self._proxies[sectname] = SectionProxy(self, sectname)
elements_added.add(sectname)
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self._optcre.match(value)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if not optname:
e = self._handle_error(e, fpname, lineno, line)
optname = self.optionxform(optname.rstrip())
if (self._strict and
(sectname, optname) in elements_added):
raise DuplicateOptionError(sectname, optname,
fpname, lineno)
elements_added.add((sectname, optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
cursect[optname] = [optval]
else:
# valueless option handling
cursect[optname] = None
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
e = self._handle_error(e, fpname, lineno, line)
ParsingError._raise_all(self._read_inner(fp, fpname))
finally:
self._join_multiline_values()
# if any parsing errors occurred, raise an exception
if e:
raise e

def _read_inner(self, fp, fpname):
st = _ReadState()

Line = functools.partial(_Line, prefixes=self._prefixes)
for st.lineno, line in enumerate(map(Line, fp), start=1):
if not line.clean:
if self._empty_lines_in_values:
# add empty line to the value, but only if there was no
# comment on the line
if (not line.has_comments and
st.cursect is not None and
st.optname and
st.cursect[st.optname] is not None):
st.cursect[st.optname].append('') # newlines added at join
else:
# empty line marks end of value
st.indent_level = sys.maxsize
continue

first_nonspace = self.NONSPACECRE.search(line)
st.cur_indent_level = first_nonspace.start() if first_nonspace else 0

if self._handle_continuation_line(st, line, fpname):
continue

self._handle_rest(st, line, fpname)

return st.errors

def _handle_continuation_line(self, st, line, fpname):
# continuation line?
is_continue = (st.cursect is not None and st.optname and
st.cur_indent_level > st.indent_level)
if is_continue:
if st.cursect[st.optname] is None:
raise MultilineContinuationError(fpname, st.lineno, line)
st.cursect[st.optname].append(line.clean)
return is_continue

def _handle_rest(self, st, line, fpname):
# a section header or option header?
if self._allow_unnamed_section and st.cursect is None:
st.sectname = UNNAMED_SECTION
st.cursect = self._dict()
self._sections[st.sectname] = st.cursect
self._proxies[st.sectname] = SectionProxy(self, st.sectname)
st.elements_added.add(st.sectname)

st.indent_level = st.cur_indent_level
# is it a section header?
mo = self.SECTCRE.match(line.clean)

if not mo and st.cursect is None:
raise MissingSectionHeaderError(fpname, st.lineno, line)

self._handle_header(st, mo, fpname) if mo else self._handle_option(st, line, fpname)

def _handle_header(self, st, mo, fpname):
st.sectname = mo.group('header')
if st.sectname in self._sections:
if self._strict and st.sectname in st.elements_added:
raise DuplicateSectionError(st.sectname, fpname,
st.lineno)
st.cursect = self._sections[st.sectname]
st.elements_added.add(st.sectname)
elif st.sectname == self.default_section:
st.cursect = self._defaults
else:
st.cursect = self._dict()
self._sections[st.sectname] = st.cursect
self._proxies[st.sectname] = SectionProxy(self, st.sectname)
st.elements_added.add(st.sectname)
# So sections can't start with a continuation line
st.optname = None

def _handle_option(self, st, line, fpname):
# an option line?
st.indent_level = st.cur_indent_level

mo = self._optcre.match(line.clean)
if not mo:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
st.errors.append(ParsingError(fpname, st.lineno, line))
return

st.optname, vi, optval = mo.group('option', 'vi', 'value')
if not st.optname:
st.errors.append(ParsingError(fpname, st.lineno, line))
st.optname = self.optionxform(st.optname.rstrip())
if (self._strict and
(st.sectname, st.optname) in st.elements_added):
raise DuplicateOptionError(st.sectname, st.optname,
fpname, st.lineno)
st.elements_added.add((st.sectname, st.optname))
# This check is fine because the OPTCRE cannot
# match if it would set optval to None
if optval is not None:
optval = optval.strip()
st.cursect[st.optname] = [optval]
else:
# valueless option handling
st.cursect[st.optname] = None

def _join_multiline_values(self):
defaults = self.default_section, self._defaults
Expand All @@ -1135,12 +1177,6 @@ def _read_defaults(self, defaults):
for key, value in defaults.items():
self._defaults[self.optionxform(key)] = value

def _handle_error(self, exc, fpname, lineno, line):
if not exc:
exc = ParsingError(fpname)
exc.append(lineno, repr(line))
return exc

def _unify_values(self, section, vars):
"""Create a sequence of lookups with 'vars' taking priority over
the 'section' which takes priority over the DEFAULTSECT.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Refactored :meth:`configparser.RawConfigParser._read` to reduce cyclometric
complexity and improve comprehensibility.

0 comments on commit 90d0eeb

Please sign in to comment.