Skip to content

Commit

Permalink
Mlflow log (openvinotoolkit#243)
Browse files Browse the repository at this point in the history
* mlflow logging

* something

* some changes

* Some fixes and clear up

* Symbolic link update

* Final Updates

* Little fixes

* Little fixes(one more)

* Test mlflow off

* Deleted hardcoded log dir

* Generalization

* Clear up

* Fixes

* code fixes

* Common classification functions carry out

* Metrics logging changes
  • Loading branch information
SKholkin committed Nov 12, 2020
1 parent 4d706a6 commit 792856e
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 46 deletions.
32 changes: 32 additions & 0 deletions examples/classification/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import torch.backends.cudnn as cudnn

from examples.common.sample_config import SampleConfig
from examples.common.execution import ExecutionMode, get_device
from examples.common.distributed import configure_distributed
from examples.common.model_loader import load_resuming_model_state_dict_and_checkpoint_from_path
from nncf.utils import manual_seed


def configure_device(current_gpu, config: SampleConfig):
config.current_gpu = current_gpu
config.distributed = config.execution_mode in (ExecutionMode.DISTRIBUTED, ExecutionMode.MULTIPROCESSING_DISTRIBUTED)
if config.distributed:
configure_distributed(config)

config.device = get_device(config)


def set_seed(config):
if config.seed is not None:
manual_seed(config.seed)
cudnn.deterministic = True
cudnn.benchmark = False


def load_resuming_checkpoint(resuming_checkpoint_path):
resuming_model_sd = None
resuming_checkpoint = None
if resuming_checkpoint_path is not None:
resuming_model_sd, resuming_checkpoint = load_resuming_model_state_dict_and_checkpoint_from_path(
resuming_checkpoint_path)
return resuming_model_sd, resuming_checkpoint
41 changes: 20 additions & 21 deletions examples/classification/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import os.path as osp
import sys
import time
Expand All @@ -36,22 +35,23 @@
from torchvision.models import InceptionOutputs

from examples.common.argparser import get_common_argument_parser
from examples.common.distributed import configure_distributed
from examples.common.example_logger import logger
from examples.common.execution import ExecutionMode, get_device, get_execution_mode, \
from examples.common.execution import ExecutionMode, get_execution_mode, \
prepare_model_for_execution, start_worker
from examples.common.model_loader import load_model, load_resuming_model_state_dict_and_checkpoint_from_path
from examples.common.model_loader import load_model
from examples.common.optimizer import get_parameter_groups, make_optimizer
from examples.common.sample_config import SampleConfig, create_sample_config
from examples.common.utils import configure_logging, configure_paths, create_code_snapshot, \
print_args, make_additional_checkpoints, get_name, is_staged_quantization, print_statistics, \
is_pretrained_model_requested
is_pretrained_model_requested, finish_logging, log_common_mlflow_params
from examples.common.utils import write_metrics
from nncf import create_compressed_model
from nncf.compression_method_api import CompressionLevel
from nncf.dynamic_graph.graph_builder import create_input_infos
from nncf.initialization import register_default_init_args, default_criterion_fn
from nncf.utils import manual_seed, safe_thread_call, is_main_process
from nncf.utils import safe_thread_call, is_main_process
from examples.classification.common import configure_device, set_seed, load_resuming_checkpoint
import mlflow

model_names = sorted(name for name in models.__dict__
if name.islower() and not name.startswith("__")
Expand Down Expand Up @@ -113,21 +113,13 @@ def inception_criterion_fn(model_outputs: Any, target: Any, criterion: _Loss) ->

# pylint:disable=too-many-branches
def main_worker(current_gpu, config: SampleConfig):
config.current_gpu = current_gpu
config.distributed = config.execution_mode in (ExecutionMode.DISTRIBUTED, ExecutionMode.MULTIPROCESSING_DISTRIBUTED)
if config.distributed:
configure_distributed(config)

config.device = get_device(config)
configure_device(current_gpu, config)

if is_main_process():
configure_logging(logger, config)
print_args(config)

if config.seed is not None:
manual_seed(config.seed)
cudnn.deterministic = True
cudnn.benchmark = False
set_seed(config)

# define loss function (criterion)
criterion = nn.CrossEntropyLoss()
Expand Down Expand Up @@ -160,10 +152,7 @@ def main_worker(current_gpu, config: SampleConfig):

model.to(config.device)

resuming_model_sd = None
if resuming_checkpoint_path is not None:
resuming_model_sd, resuming_checkpoint = load_resuming_model_state_dict_and_checkpoint_from_path(
resuming_checkpoint_path)
resuming_model_sd, resuming_checkpoint = load_resuming_checkpoint(resuming_checkpoint_path)

compression_ctrl, model = create_compressed_model(model, nncf_config, resuming_state_dict=resuming_model_sd)

Expand Down Expand Up @@ -193,6 +182,9 @@ def main_worker(current_gpu, config: SampleConfig):
else:
logger.info("=> loaded checkpoint '{}'".format(resuming_checkpoint_path))

logger.info(config.nncf_config)
log_common_mlflow_params(config)

if config.execution_mode != ExecutionMode.CPU_ONLY:
cudnn.benchmark = True

Expand All @@ -204,6 +196,8 @@ def main_worker(current_gpu, config: SampleConfig):
train(config, compression_ctrl, model, criterion, train_criterion_fn, lr_scheduler, model_name, optimizer,
train_loader, train_sampler, val_loader, best_acc1)

finish_logging(config)


def train(config, compression_ctrl, model, criterion, criterion_fn, lr_scheduler, model_name, optimizer,
train_loader, train_sampler, val_loader, best_acc1=0):
Expand Down Expand Up @@ -234,11 +228,12 @@ def train(config, compression_ctrl, model, criterion, criterion_fn, lr_scheduler
# remember best acc@1, considering compression level. If current acc@1 less then the best acc@1, checkpoint
# still can be best if current compression level is bigger then best one. Compression levels in ascending
# order: NONE, PARTIAL, FULL.

is_best_by_accuracy = acc1 > best_acc1 and compression_level == best_compression_level
is_best = is_best_by_accuracy or compression_level > best_compression_level
if is_best:
best_acc1 = acc1
if is_main_process():
mlflow.log_metric("best_acc1", best_acc1)
best_compression_level = max(compression_level, best_compression_level)
acc = best_acc1 / 100
if config.metrics_dump is not None:
Expand All @@ -263,6 +258,7 @@ def train(config, compression_ctrl, model, criterion, criterion_fn, lr_scheduler

for key, value in stats.items():
if isinstance(value, (int, float)):
mlflow.log_metric("compression/statistics/{0}".format(key), value, epoch)
config.tb.add_scalar("compression/statistics/{0}".format(key), value, len(train_loader) * epoch)


Expand Down Expand Up @@ -482,6 +478,9 @@ def validate(val_loader, model, criterion, config):
config.tb.add_scalar("val/loss", losses.avg, len(val_loader) * config.get('cur_epoch', 0))
config.tb.add_scalar("val/top1", top1.avg, len(val_loader) * config.get('cur_epoch', 0))
config.tb.add_scalar("val/top5", top5.avg, len(val_loader) * config.get('cur_epoch', 0))
mlflow.log_metric("val/loss", float(losses.avg), config.get('cur_epoch', 0))
mlflow.log_metric("val/top1", float(top1.avg), config.get('cur_epoch', 0))
mlflow.log_metric("val/top5", float(top5.avg), config.get('cur_epoch', 0))

logger.info(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}\n'.format(top1=top1, top5=top5))

Expand Down
35 changes: 14 additions & 21 deletions examples/classification/staged_quantization_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@
import torch.utils.data
import torch.utils.data.distributed
from torchvision.models import InceptionOutputs
import mlflow

from examples.classification.main import create_data_loaders, validate, AverageMeter, accuracy, get_lr, \
create_datasets, inception_criterion_fn
from examples.common.distributed import configure_distributed
from examples.common.example_logger import logger
from examples.common.execution import ExecutionMode, get_device, prepare_model_for_execution
from examples.common.model_loader import load_model, \
load_resuming_model_state_dict_and_checkpoint_from_path
from examples.common.execution import ExecutionMode, prepare_model_for_execution
from examples.common.model_loader import load_model
from examples.common.utils import configure_logging, print_args, make_additional_checkpoints, get_name, \
print_statistics, is_pretrained_model_requested
print_statistics, is_pretrained_model_requested, log_common_mlflow_params, finish_logging
from nncf.binarization.algo import BinarizationController
from nncf.compression_method_api import CompressionLevel
from nncf.initialization import register_default_init_args, default_criterion_fn
from nncf.model_creation import create_compressed_model
from nncf.quantization.algo import QuantizationController
from nncf.utils import manual_seed, is_main_process
from nncf.utils import is_main_process
from examples.classification.common import configure_device, set_seed, load_resuming_checkpoint


class KDLossCalculator:
Expand Down Expand Up @@ -105,21 +105,13 @@ def state_dict(self):
# pylint:disable=too-many-branches
# pylint:disable=too-many-statements
def staged_quantization_main_worker(current_gpu, config):
config.current_gpu = current_gpu
config.distributed = config.execution_mode in (ExecutionMode.DISTRIBUTED, ExecutionMode.MULTIPROCESSING_DISTRIBUTED)
if config.distributed:
configure_distributed(config)

config.device = get_device(config)
configure_device(current_gpu, config)

if is_main_process():
configure_logging(logger, config)
print_args(config)

if config.seed is not None:
manual_seed(config.seed)
cudnn.deterministic = True
cudnn.benchmark = False
set_seed(config)

# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss()
Expand Down Expand Up @@ -155,11 +147,7 @@ def staged_quantization_main_worker(current_gpu, config):

model.to(config.device)

resuming_model_sd = None
resuming_checkpoint = None
if resuming_checkpoint_path is not None:
resuming_model_sd, resuming_checkpoint = load_resuming_model_state_dict_and_checkpoint_from_path(
resuming_checkpoint_path)
resuming_model_sd, resuming_checkpoint = load_resuming_checkpoint(resuming_checkpoint_path)

compression_ctrl, model = create_compressed_model(model, nncf_config, resuming_model_sd)
if not isinstance(compression_ctrl, (BinarizationController, QuantizationController)):
Expand Down Expand Up @@ -197,6 +185,8 @@ def staged_quantization_main_worker(current_gpu, config):
else:
logger.info("=> loaded checkpoint '{}'".format(resuming_checkpoint_path))

log_common_mlflow_params(config)

if config.to_onnx:
compression_ctrl.export_model(config.to_onnx)
logger.info("Saved to {}".format(config.to_onnx))
Expand All @@ -215,6 +205,8 @@ def staged_quantization_main_worker(current_gpu, config):
optimizer,
train_loader, train_sampler, val_loader, kd_loss_calculator, batch_multiplier, best_acc1)

finish_logging(config)


def train_staged(config, compression_ctrl, model, criterion, criterion_fn, optimizer_scheduler, model_name, optimizer,
train_loader, train_sampler, val_loader, kd_loss_calculator, batch_multiplier, best_acc1=0):
Expand Down Expand Up @@ -274,6 +266,7 @@ def train_staged(config, compression_ctrl, model, criterion, criterion_fn, optim

for key, value in stats.items():
if isinstance(value, (int, float)):
mlflow.log_metric("compression/statistics/{0}".format(key), value, epoch)
config.tb.add_scalar("compression/statistics/{0}".format(key), value, len(train_loader) * epoch)


Expand Down
27 changes: 27 additions & 0 deletions examples/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
from examples.common.sample_config import SampleConfig
from tensorboardX import SummaryWriter
from texttable import Texttable
import mlflow

from examples.common.example_logger import logger as default_logger
from nncf.utils import is_main_process

GENERAL_LOG_FILE_NAME = "output.log"
NNCF_LOG_FILE_NAME = "nncf_output.log"
Expand Down Expand Up @@ -97,6 +99,15 @@ def configure_paths(config):
def configure_logging(sample_logger, config):
config.tb = SummaryWriter(config.log_dir)

if is_mlflow_logging_enabled(config):
root_log_dir = osp.dirname(osp.dirname(config.log_dir))
mlflow.set_tracking_uri(osp.join(root_log_dir, 'mlruns'))
if mlflow.get_experiment_by_name(config.name) is None:
mlflow.create_experiment(config.name)
mlflow.set_experiment(config.name)
mlflow.start_run()
os.symlink(config.log_dir, osp.join(mlflow.active_run().info.artifact_uri, osp.basename(config.log_dir)))

training_pipeline_log_file_handler = logging.FileHandler(osp.join(config.log_dir, GENERAL_LOG_FILE_NAME))
training_pipeline_log_file_handler.setFormatter(logging.Formatter("%(message)s"))
sample_logger.addHandler(training_pipeline_log_file_handler)
Expand All @@ -107,6 +118,22 @@ def configure_logging(sample_logger, config):
nncf_logger.addHandler(nncf_log_file_handler)


def log_common_mlflow_params(config):
if is_mlflow_logging_enabled(config):
mlflow.log_param('epochs', config.get('epochs', 'None'))
mlflow.log_param('schedule_type', config.nncf_config.get('optimizer', {}).get('schedule_type', 'None'))
mlflow.log_param('lr', config.nncf_config.get('optimizer', {}).get('base_lr', 'None'))
mlflow.set_tag('Log Dir Path', config.log_dir)


def finish_logging(config):
if is_main_process() and is_mlflow_logging_enabled(config):
mlflow.end_run()

def is_mlflow_logging_enabled(config):
return config.mode.lower() == 'train' and config.to_onnx is None


def is_on_first_rank(config):
return not config.multiprocessing_distributed or (config.multiprocessing_distributed
and config.rank % config.ngpus_per_node == 0)
Expand Down
7 changes: 5 additions & 2 deletions examples/object_detection/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from nncf.initialization import register_default_init_args
from examples.common.optimizer import get_parameter_groups, make_optimizer
from examples.common.utils import get_name, make_additional_checkpoints, print_statistics, configure_paths, \
create_code_snapshot, is_on_first_rank, configure_logging, print_args, is_pretrained_model_requested
create_code_snapshot, is_on_first_rank, configure_logging, print_args, is_pretrained_model_requested,\
log_common_mlflow_params, finish_logging
from examples.common.utils import write_metrics
from examples.object_detection.dataset import detection_collate, get_testing_dataset, get_training_dataset
from examples.object_detection.eval import test_net
Expand Down Expand Up @@ -172,6 +173,8 @@ def criterion_fn(model_outputs, target, criterion):
optimizer.load_state_dict(resuming_checkpoint.get('optimizer', optimizer.state_dict()))
config.start_iter = resuming_checkpoint.get('iter', 0) + 1

log_common_mlflow_params(config)

if config.to_onnx:
compression_ctrl.export_model(config.to_onnx)
logger.info("Saved to {}".format(config.to_onnx))
Expand All @@ -187,7 +190,7 @@ def criterion_fn(model_outputs, target, criterion):
return

train(net, compression_ctrl, train_data_loader, test_data_loader, criterion, optimizer, config, lr_scheduler)

finish_logging(config)

def create_dataloaders(config):
logger.info('Loading Dataset...')
Expand Down
5 changes: 4 additions & 1 deletion examples/semantic_segmentation/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from examples.common.model_loader import load_model, load_resuming_model_state_dict_and_checkpoint_from_path
from examples.common.optimizer import make_optimizer
from examples.common.utils import configure_logging, configure_paths, make_additional_checkpoints, print_args, \
write_metrics, print_statistics, is_pretrained_model_requested
write_metrics, print_statistics, is_pretrained_model_requested, log_common_mlflow_params, finish_logging
from examples.semantic_segmentation.metric import IoU
from examples.semantic_segmentation.test import Test
from examples.semantic_segmentation.train import Train
Expand Down Expand Up @@ -504,6 +504,8 @@ def criterion_fn(model_outputs, target, criterion_):
if config.distributed:
compression_ctrl.distributed()

log_common_mlflow_params(config)

if config.to_onnx:
compression_ctrl.export_model(config.to_onnx)
logger.info("Saved to {}".format(config.to_onnx))
Expand All @@ -525,6 +527,7 @@ def criterion_fn(model_outputs, target, criterion_):
raise RuntimeError(
"\"{0}\" is not a valid choice for execution mode.".format(
config.mode))
finish_logging(config)


def main(argv):
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def find_version(*file_paths):
"yattag",
"jsonschema",
"wheel",
"defusedxml"]
"defusedxml",
"mlflow"]

DEPENDENCY_LINKS = []

Expand Down

0 comments on commit 792856e

Please sign in to comment.