Skip to content

Commit

Permalink
[#149] Create module-level loggers instead of logging to the root logger
Browse files Browse the repository at this point in the history
This follows the setup recommended in the Logging How-To here:
https://docs.python.org/3/howto/logging.html#library-config.  This should give
users much more flexibility with how they filter and handle log messages from
hlink. And it makes the logging %(name)s attribute meaningful for hlink as
well.
  • Loading branch information
riley-harper committed Oct 7, 2024
1 parent 914d58a commit 10931d1
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 29 deletions.
4 changes: 3 additions & 1 deletion hlink/linking/core/comparison_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import itertools
import logging

logger = logging.getLogger(__name__)


def create_feature_tables(
link_task, t_ctx_def, advanced_comp_features, hh_comp_features, id_col, table_name
Expand Down Expand Up @@ -518,7 +520,7 @@ def generate_comparison_feature(feature, id_col, include_as=False):
col_range = list(range(1, num_cols + 1))
tuples = list(itertools.product(col_range, col_range))

logging.debug(
logger.debug(
f"multi_jaro_winkler_search with alias {feature.get('alias')}: there are {len(tuples)} subcomparisons"
)
sub_exprs = []
Expand Down
6 changes: 4 additions & 2 deletions hlink/linking/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import hlink.linking.transformers.float_cast_transformer
import logging

logger = logging.getLogger(__name__)


def generate_pipeline_stages(conf, ind_vars, tf, tconf):
"""Creates a Spark ML pipeline from the pipeline features.
Expand Down Expand Up @@ -202,8 +204,8 @@ def _calc_categorical_features(
"categorical", False
):
categorical_pipeline_features.append(pipeline_feature["output_column"])
logging.info(f"Categorical Comparison features: {categorical_comparison_features}")
logging.info(f"Categorical Pipeline features: {categorical_pipeline_features}")
logger.info(f"Categorical Comparison features: {categorical_comparison_features}")
logger.info(f"Categorical Pipeline features: {categorical_pipeline_features}")

return categorical_comparison_features, categorical_pipeline_features

Expand Down
16 changes: 9 additions & 7 deletions hlink/linking/link_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from hlink.errors import SparkError
from hlink.linking.link_step import LinkStep

logger = logging.getLogger(__name__)


class LinkTask:
"""Base class for link tasks.
Expand Down Expand Up @@ -61,16 +63,16 @@ def get_steps(self) -> list[LinkStep]:

def run_all_steps(self) -> None:
"""Run all steps in order."""
logging.info(f"Running all steps for task {self.display_name}")
logger.info(f"Running all steps for task {self.display_name}")
start_all = timer()
for i, step in enumerate(self.get_steps()):
print(f"Running step {i}: {step}")
logging.info(f"Running step {i}: {step}")
logger.info(f"Running step {i}: {step}")
step.run()
end_all = timer()
elapsed_time_all = round(end_all - start_all, 2)
print(f"Finished all in {elapsed_time_all}s")
logging.info(f"Finished all steps in {elapsed_time_all}s")
logger.info(f"Finished all steps in {elapsed_time_all}s")

def run_step(self, step_num: int) -> None:
"""Run a particular step.
Expand All @@ -95,15 +97,15 @@ def run_step(self, step_num: int) -> None:
step = steps[step_num]
step_string = f"step {step_num}: {step}"
print(f"Running {step_string}")
logging.info(f"Starting {step.task.display_name} - {step_string}")
logger.info(f"Starting {step.task.display_name} - {step_string}")

start = timer()
step.run()
end = timer()

elapsed_time = round(end - start, 2)
print(f"Finished {step_string} in {elapsed_time}s")
logging.info(
logger.info(
f"Finished {step.task.display_name} - {step_string} in {elapsed_time}s"
)

Expand Down Expand Up @@ -135,7 +137,7 @@ def run_register_python(
try:
return func(*args)
except Exception as err:
logging.error(err)
logger.error(err)
raise SparkError(str(err))

def run_register_sql(
Expand Down Expand Up @@ -165,7 +167,7 @@ def run_sql() -> pyspark.sql.dataframe.DataFrame:
except Exception as err:
print(f"Exception in Spark SQL: {sql_to_run}")

logging.error(str(err))
logger.error(str(err))
raise SparkError(str(err))

return self.run_register_python(
Expand Down
4 changes: 3 additions & 1 deletion hlink/linking/matching/link_step_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from hlink.linking.link_step import LinkStep

logger = logging.getLogger(__name__)


def extract_or_groups_from_blocking(blocking: list[dict[str, Any]]) -> list[list[str]]:
"""
Expand Down Expand Up @@ -77,7 +79,7 @@ def _run(self):
dataset_size_max = max(dataset_size_a, dataset_size_b)
num_partitions = spark_shuffle_partitions_heuristic(dataset_size_max)
self.task.spark.sql(f"set spark.sql.shuffle.partitions={num_partitions}")
logging.info(
logger.info(
f"Dataset sizes are A={dataset_size_a}, B={dataset_size_b}, so set Spark partitions to {num_partitions} for this step"
)

Expand Down
4 changes: 3 additions & 1 deletion hlink/linking/matching/link_step_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from hlink.linking.link_step import LinkStep

logger = logging.getLogger(__name__)


class LinkStepScore(LinkStep):
def __init__(self, task):
Expand Down Expand Up @@ -43,7 +45,7 @@ def _run(self):
dataset_size = self.task.spark.table(f"{table_prefix}potential_matches").count()
num_partitions = spark_shuffle_partitions_heuristic(dataset_size)
self.task.spark.sql(f"set spark.sql.shuffle.partitions={num_partitions}")
logging.info(
logger.info(
f"Dataset size is {dataset_size}, so set Spark partitions to {num_partitions} for this step"
)

Expand Down
4 changes: 3 additions & 1 deletion hlink/linking/preprocessing/link_step_prep_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from hlink.linking.link_step import LinkStep

logger = logging.getLogger(__name__)


class LinkStepPrepDataframes(LinkStep):
def __init__(self, task):
Expand All @@ -31,7 +33,7 @@ def _run(self):
dataset_size_max = max(dataset_size_a, dataset_size_b)
num_partitions = spark_shuffle_partitions_heuristic(dataset_size_max)
self.task.spark.sql(f"set spark.sql.shuffle.partitions={num_partitions}")
logging.info(
logger.info(
f"Dataset sizes are A={dataset_size_a}, B={dataset_size_b}, so set Spark partitions to {num_partitions} for this step"
)

Expand Down
6 changes: 4 additions & 2 deletions hlink/linking/preprocessing/link_step_register_raw_dfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from hlink.errors import DataError
from hlink.linking.link_step import LinkStep

logger = logging.getLogger(__name__)


def handle_paths(datasource, a_or_b):
if "parquet_file" in datasource:
Expand Down Expand Up @@ -56,15 +58,15 @@ def _run(self):
df_b_filtered = self._filter_dataframe(config, "b")

if config["datasource_a"].get("convert_ints_to_longs", False):
logging.debug(
logger.debug(
"Converting all columns in datasource_a with type 'int' to type 'long'"
)
df_a = self._convert_ints_to_longs(df_a_filtered)
else:
df_a = df_a_filtered

if config["datasource_b"].get("convert_ints_to_longs", False):
logging.debug(
logger.debug(
"Converting all columns in datasource_b with type 'int' to type 'long'"
)
df_b = self._convert_ints_to_longs(df_b_filtered)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from hlink.linking.link_step import LinkStep

logger = logging.getLogger(__name__)


class LinkStepReportR2PercentLinked(LinkStep):
def __init__(self, task):
Expand All @@ -34,7 +36,7 @@ def _run(self):
dataset_size_max = max(dataset_size1, dataset_size2, dataset_size3)
num_partitions = spark_shuffle_partitions_heuristic(dataset_size_max)
self.task.spark.sql(f"set spark.sql.shuffle.partitions={num_partitions}")
logging.info(
logger.info(
f"Dataset sizes are {dataset_size1}, {dataset_size2}, {dataset_size3}, so set Spark partitions to {num_partitions} for this step"
)

Expand Down
4 changes: 3 additions & 1 deletion hlink/scripts/lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import logging
import traceback

logger = logging.getLogger(__name__)


def report_and_log_error(message: str, err: Exception):
print(f"An error occured: {message}")
Expand All @@ -19,7 +21,7 @@ def report_and_log_error(message: str, err: Exception):
# traceback.print_exception("",err,i[2])
multi_line = "\n==========\n"

logging.error(
logger.error(
str(i[0])
+ " : "
+ str(i[1])
Expand Down
20 changes: 11 additions & 9 deletions hlink/scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from hlink.scripts.lib.conf_validations import analyze_conf
from hlink.scripts.lib.table_ops import drop_all_tables

logger = logging.getLogger(__name__)


def load_conf(conf_name, user):
"""Load and return the hlink config dictionary.
Expand Down Expand Up @@ -103,12 +105,12 @@ def cli():

_setup_logging(run_conf)

logging.info("Initializing Spark")
logger.info("Initializing Spark")
spark_init_start = timer()
spark = _get_spark(run_conf, args)
spark_init_end = timer()
spark_init_time = round(spark_init_end - spark_init_start, 2)
logging.info(f"Initialized Spark in {spark_init_time}s")
logger.info(f"Initialized Spark in {spark_init_time}s")

history_file = os.path.expanduser("~/.history_hlink")
_read_history_file(history_file)
Expand Down Expand Up @@ -220,9 +222,9 @@ def _cli_loop(spark, args, run_conf, run_name):
try:
print("Analyzing config file")
analyze_conf(LinkRun(spark, run_conf))
logging.info("Analyzed config file, no errors found")
logger.info("Analyzed config file, no errors found")
except ValueError as err:
logging.error(
logger.error(
"Analysis found an error in the config file. See below for details."
)
report_and_log_error("", err)
Expand All @@ -232,7 +234,7 @@ def _cli_loop(spark, args, run_conf, run_name):
try:
main.cmdloop()
if main.lastcmd == "reload":
logging.info("Reloading config file")
logger.info("Reloading config file")
run_conf = load_conf(args.conf, args.user)
else:
break
Expand All @@ -257,9 +259,9 @@ def _setup_logging(conf):

logging.basicConfig(filename=log_file, level=logging.INFO, format=format_string)

logging.info(f"New session {session_id} by user {user}")
logging.info(f"Configured with {conf['conf_path']}")
logging.info(f"Using hlink version {hlink_version}")
logging.info(
logger.info(f"New session {session_id} by user {user}")
logger.info(f"Configured with {conf['conf_path']}")
logger.info(f"Using hlink version {hlink_version}")
logger.info(
"-------------------------------------------------------------------------------------"
)
8 changes: 5 additions & 3 deletions hlink/scripts/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from hlink.scripts.lib import linking_ops
from hlink.scripts.lib import conf_validations

logger = logging.getLogger(__name__)


def split_and_check_args(expected_count):
"""A parametrized decorator to make handling arguments easier for `Main` methods.
Expand Down Expand Up @@ -75,7 +77,7 @@ def reload_auto_complete_cache(self):

def precmd(self, line: str) -> str:
if line.strip() != "":
logging.info(f"[User Input] {line}")
logger.info(f"[User Input] {line}")
return line

def postcmd(self, stop, line):
Expand Down Expand Up @@ -516,7 +518,7 @@ def do_x_tfam_raw(self, split_args):
elapsed_time = round(end - start, 2)

print(f"Time: {elapsed_time}s")
logging.info(f"Finished: hh_tfam display - {elapsed_time}")
logger.info(f"Finished: hh_tfam display - {elapsed_time}")

@split_and_check_args(2)
def do_x_hh_tfam(self, split_args):
Expand All @@ -535,7 +537,7 @@ def do_x_hh_tfam(self, split_args):
elapsed_time = round(end - start, 2)

print(f"Time: {elapsed_time}s")
logging.info(f"Finished: hh_tfam display - {elapsed_time}")
logger.info(f"Finished: hh_tfam display - {elapsed_time}")

@split_and_check_args(3)
def do_x_hh_tfam_2a(self, split_args):
Expand Down

0 comments on commit 10931d1

Please sign in to comment.