Memory leak when using custom DataGenerator #19907

Omitg24 opened this issue Jun 23, 2024

Omitg24 opened this issue Jun 23, 2024


Omitg24 commented Jun 23, 2024

For the past 3 weeks I've been searching nonstop for a solution to this problem, when training a LSTM model with a custom DataGenerator, Keras ends up using all my RAM memory. The context of the project is to predict sleep stages, in this script, its expected to paralelyze 15 different participants with its 10 folds (10 train and 10 validation), and in a following phase test with its respective partition.
Having said that, this is the LSTM Network I'm currently using:

I'm using:

  • Python 3.11.5
  • Tensorflow 2.16.1
  • Keras 3.3.3

This network has been used in this project

def create_lstm1(number_inputs, window_size):
    model = Sequential()

    model.add(Input(shape=(window_size, number_inputs)))

    model.add(LSTM(units=50, return_sequences=True))

    model.add(LSTM(units=100, return_sequences=False))

    model.add(Dense(units=ProjectConfig().n_phases, activation='softmax'))

    model.compile(loss="categorical_crossentropy", optimizer="rmsprop", metrics=['accuracy'])
    return model

Then, I've implemented this custom DataGenerator which suites my problem.

import math
from statistics import mode

import keras
import tracemalloc
import gc
import numpy as np
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler, LabelEncoder

from src.utils import utils

class DataGenerator(keras.utils.Sequence):

    def __init__(self,
                 x_data: pd.DataFrame,
                 y_data: pd.DataFrame,
                 name: str = "DataGenerator",
                 window_size: int = 30,
                 window_overlap: int = 15,
                 batch_size: int = 32,
                 lstm_mode: int = 1,
                 n_clases: int = 5,
                 sample_frequency: int = 10,
                 shuffle: bool = True,
                 is_training: bool = True,
        self.x_data = x_data
        self.y_data = y_data = name
        self.window_size = window_size
        self.window_overlap = window_overlap
        self.batch_size = batch_size
        self.lstm_mode = lstm_mode
        self.sample_frequency = sample_frequency
        self.shuffle = shuffle
        self.is_training = is_training

        current, peak = tracemalloc.get_traced_memory()
        print(f"\t\t\t\t{name} Memory before Oversampling: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
        self.x_data, self.y_data = self._oversample(self.x_data, self.y_data)

        current, peak = tracemalloc.get_traced_memory()
        print(f"\t\t\t\t{name} Memory after Oversampling: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
        self.x_data, self.y_data = self._create_windows(self.x_data, self.y_data)

        current, peak = tracemalloc.get_traced_memory()
        print(f"\t\t\t\t{name} Memory after Windows Creation: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
        encoder = LabelEncoder()
        self.y_data = encoder.fit_transform(self.y_data)
        self.y_data = keras.utils.to_categorical(self.y_data, num_classes=n_clases)
        self.indexes = self.on_epoch_end()

    def __len__(self):
        n = len(self.indexes)
        return math.ceil(n / self.batch_size) if n > 0 else 0

    def __getitem__(self, index):
        windows_x = []
        windows_y = []

        min_val = max(0, min(len(self.indexes), index * self.batch_size))
        max_val = min(len(self.indexes), (index + 1) * self.batch_size)
        for val in range(min_val, max_val):
            x_data = self.x_data[self.indexes[val]].copy()
            y_data = self.y_data[self.indexes[val]]
            x_data = self._scale_data(x_data)
            x_data = self._update_mode(x_data)


        batch_x = np.array(windows_x)
        batch_y = np.array(windows_y)

        del windows_x, windows_y, x_data, y_data

        return batch_x, batch_y

    def _create_windows(self, x_data, y_data):
        windows_x = []
        windows_y = []
        n_windows = math.ceil(len(x_data) / (self.window_overlap * self.sample_frequency))
        for w in range(n_windows):
            start_index = w * self.window_overlap * self.sample_frequency
            end_index = min(len(x_data), start_index + (self.window_size * self.sample_frequency))
            indexes = list(range(start_index, end_index))
            if len(indexes) == (self.window_size * self.sample_frequency):    # All windows must be equal
                X = x_data.iloc[indexes]
                Y = mode(y_data[indexes])


        return windows_x, windows_y

    def _oversample(self, x_data, y_data):
        if self.is_training:
            sm = SMOTE(random_state=0, sampling_strategy='not majority')
            X_resampled, y_resampled = sm.fit_resample(x_data, y_data)
            return X_resampled, y_resampled
        return x_data, y_data

    def _update_mode(self, x_data: pd.DataFrame):
        if self.lstm_mode == 2:
            magacc = utils.calculate_magnitude(x_data['accx'], x_data['accy'], x_data['accz'])
            ret = x_data.copy()
            ret['magacc'] = magacc
        elif self.lstm_mode == 3:
            magacc = utils.calculate_magnitude(x_data['accx'], x_data['accy'], x_data['accz'])
            ret = x_data[['hr']].copy()
            ret['magacc'] = magacc
            ret = x_data.copy()
        return ret

    def on_epoch_end(self):
        indexes = np.arange(len(self.x_data))
        if self.shuffle:
        return indexes

    def _scale_data(x_data):
        scaler = StandardScaler()
        scaled_data = scaler.fit_transform(x_data)
        return pd.DataFrame(scaled_data, columns=x_data.columns)

And finally, the training phase is the following:

import gc
import multiprocessing
import os
import shutil
import tracemalloc

import luigi
import numpy as np
import pandas as pd

from . import DeepPartitioning
from .data_generator import DataGenerator
from .lstm_creation import *
from ..utils import ProjectConfig, utils

class DeepTraining(luigi.Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.results_path = ProjectConfig().deep_training_path
        self.inputs_by_mode = {
            1: 4,  # accx, accy, accz, hr
            2: 5,  # accx, accy, accz, magnitude, hr
            3: 2  # magnitude, hr

    def requires(self):
        return DeepPartitioning()

    def output(self):
        return luigi.LocalTarget(os.path.join(self.results_path, "output_paths.txt"))

    def run(self):
        os.makedirs(self.results_path, exist_ok=True)
        prev_files = utils.get_prev_files_path(self.input().path)

        patient_partitions = self.get_partitions(prev_files)

        path_list = self.run_experiments(patient_partitions)
        utils.create_output_paths_file(self.results_path, path_list)

    def run_experiments(self, partitions):
        path_list = []

        with multiprocessing.Pool(processes=multiprocessing.cpu_count(), maxtasksperchild=1) as pool:
            results =, partitions.items())

        for result in results:

        return path_list

    def process_participant_wrapper(self, participant_partitions):
        participant, partitions = participant_partitions
        return self.process_participant(participant, partitions)

    def process_participant(self, participant, participant_partitions):
        path_list = []
        os.makedirs(os.path.join(self.results_path, participant), exist_ok=True)
        patient_idx = participant.split("_")[1]
        train_folds, validation_folds, test = self._get_folds(participant_partitions, patient_idx)
        model = self.create_neural_network()
        print(f"\tStarting participant {patient_idx}")

        for ep in range(ProjectConfig().n_epochs):
            epoch_path = os.path.join(self.results_path, participant, f"epoch_{ep}")
            os.makedirs(epoch_path, exist_ok=True)
            print(f"\t\tStarting epoch {ep}")
            history = []
            for k in range(ProjectConfig().n_splits):
                train_fold = pd.read_csv(train_folds[k], sep=";")
                validation_fold = pd.read_csv(validation_folds[k], sep=";")
                hist = self.train_fold(model, train_fold, validation_fold, participant, ep, k)

                current, peak = tracemalloc.get_traced_memory()
                    f"\t\t\tMemory usage after fold {k}: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")

                del train_fold
                del validation_fold
            final_history = {}
            for key in history[0].history.keys():
                final_history.update({key: np.concatenate([hist.history[key] for hist in history])})

            current, peak = tracemalloc.get_traced_memory()
            print(f"\t\tMemory usage after epoch {ep}: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
            history_path = os.path.join(epoch_path, f"history_{participant}_{ep}_all_folds.csv")
            pd.DataFrame.from_dict(final_history).to_csv(history_path, sep=";")
            print(f"\t\tFinished epoch {ep}")
            print(f"\t\tSaving lstm ({ProjectConfig().neural_network}) weights")
            weights_path = os.path.join(epoch_path, f"weights_{patient_idx}_{ep}.weights.h5")
        print(f"\tFinished participant {patient_idx}")
        print(f"\tSaving lstm ({ProjectConfig().neural_network}) model")
        model_path = os.path.join(self.results_path, participant, f"model_{patient_idx}.keras")

        test_path = os.path.join(self.results_path, participant, f"test_{patient_idx}.csv")
        shutil.copyfile(test, test_path)

        current, peak = tracemalloc.get_traced_memory()
        print(f"\tMemory usage after all epochs: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")

        del model
        return path_list

    def create_neural_network(self):
        mode = ProjectConfig().lstm_mode
        neural_network = ProjectConfig().neural_network
        number_inputs = self.inputs_by_mode[mode]
        window_size = (ProjectConfig().w_size * ProjectConfig().sample_frequency)

        # All neural networks are already compiled
        if neural_network == 1:
            print("Creating LSTM1 model")
            return create_lstm1(number_inputs, window_size)
        elif neural_network == 2:
            print("Creating LSTM2 model")
            return create_lstm2(number_inputs, window_size)
        elif neural_network == 3:
            print("Creating LSTM3 model")
            return create_lstm3(number_inputs, window_size, ProjectConfig().n_phases)

    def train_fold(self, model, train_fold, validation_fold, participant, ep, k):
        print(f"\t\t\tTraining model for participant {participant} - epoch {ep} - fold {k}")
        X_train, y_train = (train_fold[['accx', 'accy', 'accz', 'hr']], train_fold['stage'])
        X_validation, y_validation = (validation_fold[['accx', 'accy', 'accz', 'hr']], validation_fold['stage'])
        hist = self.train_model(model, X_train, y_train, X_validation, y_validation)
        return hist

    def train_model(model, x_train, y_train, x_validation, y_validation):
        train_generator = DataGenerator(x_data=x_train,
        validation_generator = DataGenerator(x_data=x_validation,

        del x_train
        del y_train
        del x_validation
        del y_validation

        hist =,

        del train_generator
        del validation_generator
        return hist

    def get_partitions(prev_files):
        partitions = {}
        for prev_file in prev_files:
            patient = os.path.basename(os.path.dirname(prev_file))
            file_name = os.path.basename(prev_file).split(".")[0]
            partitions.setdefault(patient, {}).setdefault(file_name, prev_file)
        return partitions

    def _get_folds(partitions, patient_idx):
        train_folds = []
        validation_folds = []
        for i in range(ProjectConfig().n_splits):
        test = partitions[f"test_participant_{patient_idx}"]
        return train_folds, validation_folds, test

With that, I have this output file (I'm showing the first and the last epoch) where we can see how it ends up spending 80GBs of RAM on just one participant with 10 epochs and 10 folds.

Starting participant 0
		Starting epoch 0
			Training model for participant patient_0 - epoch 0 - fold 0
Memory usage after fold 0: Current = 1033.934569 MB; Peak = 1556.608261 MB
			Training model for participant patient_0 - epoch 0 - fold 1
Memory usage after fold 1: Current = 1854.543623 MB; Peak = 2381.11604 MB
			Training model for participant patient_0 - epoch 0 - fold 2
Memory usage after fold 2: Current = 2675.151555 MB; Peak = 3201.725061 MB
			Training model for participant patient_0 - epoch 0 - fold 3
Memory usage after fold 3: Current = 3495.760576 MB; Peak = 4022.326754 MB
			Training model for participant patient_0 - epoch 0 - fold 4
Memory usage after fold 4: Current = 4316.366685 MB; Peak = 4842.94543 MB
			Training model for participant patient_0 - epoch 0 - fold 5
Memory usage after fold 5: Current = 5136.99149 MB; Peak = 5663.567691 MB
			Training model for participant patient_0 - epoch 0 - fold 6
Memory usage after fold 6: Current = 5957.613007 MB; Peak = 6484.199312 MB
			Training model for participant patient_0 - epoch 0 - fold 7
Memory usage after fold 7: Current = 6778.235883 MB; Peak = 7304.812643 MB
			Training model for participant patient_0 - epoch 0 - fold 8
Memory usage after fold 8: Current = 7598.856964 MB; Peak = 8125.438533 MB
			Training model for participant patient_0 - epoch 0 - fold 9
Memory usage after fold 9: Current = 8419.47895 MB; Peak = 8946.065265 MB
Memory usage after fold 9: Current = 8210.1733 MB; Peak = 8946.065265 MB
		Finished epoch 0

		Starting epoch 9
			Training model for participant patient_0 - epoch 9 - fold 0
Memory usage after fold 0: Current = 74889.296057 MB; Peak = 75415.873067 MB
			Training model for participant patient_0 - epoch 9 - fold 1
Memory usage after fold 1: Current = 75709.918092 MB; Peak = 76236.499263 MB
			Training model for participant patient_0 - epoch 9 - fold 2
Memory usage after fold 2: Current = 76530.522405 MB; Peak = 77057.104749 MB
			Training model for participant patient_0 - epoch 9 - fold 3
Memory usage after fold 3: Current = 77351.125716 MB; Peak = 77877.709326 MB
			Training model for participant patient_0 - epoch 9 - fold 4
Memory usage after fold 4: Current = 78171.730176 MB; Peak = 78698.309015 MB
			Training model for participant patient_0 - epoch 9 - fold 5
Memory usage after fold 5: Current = 78992.352293 MB; Peak = 79518.938572 MB
			Training model for participant patient_0 - epoch 9 - fold 6
Memory usage after fold 6: Current = 79812.972282 MB; Peak = 80339.551835 MB
			Training model for participant patient_0 - epoch 9 - fold 7
Memory usage after fold 7: Current = 80633.590744 MB; Peak = 81160.172095 MB
			Training model for participant patient_0 - epoch 9 - fold 8
Memory usage after fold 8: Current = 81454.212892 MB; Peak = 81980.790902 MB
			Training model for participant patient_0 - epoch 9 - fold 9
Memory usage after fold 9: Current = 82274.83347 MB; Peak = 82801.413244 MB
Memory usage after fold 9: Current = 82065.526847 MB; Peak = 82801.413244 MB
		Finished epoch 9

I've tried to explicitly delete variables, also calling garbace collector and using clear_session() after finishing training each model, since its an incremental training, I think I'm not suposed to use it between folds.

Finally, if this could help proving my issue, I've also tried to see what would print a memory_profiler, just in case it was really freeing memory (but not the necessary), this is the result for one epoch 10 folds on one participant.

Line #    Mem usage    Increment  Occurrences   Line Contents
    62    484.1 MiB    484.1 MiB           1       @profile
    63                                             def process_participant(self, participant, participant_partitions):
    64    484.1 MiB      0.0 MiB           1           path_list = []
    65    484.1 MiB      0.0 MiB           1           os.makedirs(os.path.join(self.results_path, participant), exist_ok=True)
    66    484.1 MiB      0.0 MiB           1           patient_idx = participant.split("_")[1]
    67    484.2 MiB      0.1 MiB           1           train_folds, validation_folds, test = self._get_folds(participant_partitions, patient_idx)
    68    550.3 MiB     66.1 MiB           1           model = self.create_neural_network()
    69    550.3 MiB      0.0 MiB           1           print(f"\tStarting participant {patient_idx}")        
    70    550.3 MiB      0.0 MiB           1           tracemalloc.start()
    72   5377.8 MiB      0.0 MiB           2           for ep in range(ProjectConfig().n_epochs):
    73    550.3 MiB      0.0 MiB           1               epoch_path = os.path.join(self.results_path, participant, f"epoch_{ep}")
    74    550.3 MiB      0.0 MiB           1               os.makedirs(epoch_path, exist_ok=True)
    75    550.3 MiB      0.0 MiB           1               print(f"\t\t{participant}Starting epoch {ep}")
    76    550.3 MiB      0.0 MiB           1               history = []
    77   6433.6 MiB -13257.3 MiB          11               for k in range(ProjectConfig().n_splits):
    78   6523.9 MiB  -9093.3 MiB          10                   train_fold = pd.read_csv(train_folds[k], sep=";")
    79   6525.7 MiB -10294.1 MiB          10                   validation_fold = pd.read_csv(validation_folds[k], sep=";")
    80   6433.6 MiB  -8830.9 MiB          10                   hist = self.train_fold(model, train_fold, validation_fold, participant, ep, k)
    81   6433.6 MiB -13358.4 MiB          10                   history.append(hist)
    83   6433.6 MiB -13358.3 MiB          10                   current, peak = tracemalloc.get_traced_memory()
    84   6433.6 MiB -13358.2 MiB          10                   print(f"\t\t\t{participant}Memory usage after fold {k}: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
    86   6433.6 MiB -13168.4 MiB          10                   del train_fold
    87   6433.6 MiB -13156.4 MiB          10                   del validation_fold
    88   6433.6 MiB -13257.3 MiB          10                   gc.collect()
    89   5373.1 MiB  -1060.5 MiB           1               final_history = {}
    90   5373.1 MiB      0.0 MiB           5               for key in history[0].history.keys():
    91   5373.1 MiB      0.1 MiB          52                   final_history.update({key: np.concatenate([hist.history[key] for hist in history])})
    93   5373.1 MiB      0.0 MiB           1               current, peak = tracemalloc.get_traced_memory()
    94   5373.1 MiB      0.0 MiB           1               print(f"\t\t{participant}Memory usage after epoch {ep}: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
    95   5373.1 MiB      0.0 MiB           1               history_path = os.path.join(epoch_path, f"history_{participant}_{ep}_all_folds.csv")
    96   5375.3 MiB      2.2 MiB           1               pd.DataFrame.from_dict(final_history).to_csv(history_path, sep=";")
    97   5375.3 MiB      0.0 MiB           1               path_list.append(history_path)
    98   5375.3 MiB      0.0 MiB           1               print(f"\t\t{participant}Finished epoch {ep}")
    99   5375.3 MiB      0.0 MiB           1               print(f"\t\t{participant}Saving lstm ({ProjectConfig().neural_network}) weights")
   100   5375.3 MiB      0.0 MiB           1               weights_path = os.path.join(epoch_path, f"weights_{patient_idx}_{ep}.weights.h5")
   101   5377.8 MiB      2.4 MiB           1               model.save_weights(weights_path)
   102   5377.8 MiB      0.0 MiB           1               path_list.append(weights_path)
   103   5377.8 MiB      0.0 MiB           1           print(f"\t{participant}Finished participant {patient_idx}")
   104   5377.8 MiB      0.0 MiB           1           print(f"\t{participant}Saving lstm ({ProjectConfig().neural_network}) model")
   105   5377.8 MiB      0.0 MiB           1           model_path = os.path.join(self.results_path, participant, f"model_{patient_idx}.keras")
   106   5378.9 MiB      1.1 MiB           1 
   107   5378.9 MiB      0.0 MiB           1           path_list.append(model_path)
   109   5378.9 MiB      0.0 MiB           1           test_path = os.path.join(self.results_path, participant, f"test_{patient_idx}.csv")
   110   5378.9 MiB      0.0 MiB           1           shutil.copyfile(test, test_path)
   111   5379.0 MiB      0.1 MiB           1           path_list.append(test_path)
   113   5379.0 MiB      0.0 MiB           1           current, peak = tracemalloc.get_traced_memory()
   114   5379.0 MiB      0.0 MiB           1           print(f"\t{participant}Memory usage after all epochs: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
   116   5379.0 MiB      0.0 MiB           1           del model
   117   5379.0 MiB      0.0 MiB           1           gc.collect()
   118   5473.8 MiB     94.9 MiB           1           clear_session()
   119   5473.8 MiB      0.0 MiB           1           current, peak = tracemalloc.get_traced_memory()
   120   5473.8 MiB      0.0 MiB           1           print(f"\tMemory usage after patient {participant}: Current = {current / 10 ** 6} MB; Peak = {peak / 10 ** 6} MB")
   122   5680.4 MiB    206.5 MiB           1           tracemalloc.stop()
   123   5680.4 MiB      0.0 MiB           1           return path_list

Hope someone knows how to fix this issue. Thanks!

What I've tried

I've tried reading the folds just when needed, explicitly freeing memory by deleting variables and calling garbage_collector, using different techniques of paralelization, but I've always faced the issue of one single participant consuming too much memory to handle.

dryglicki commented Jun 25, 2024

I have nothing to provide you but solidarity. I am running into this same problem with a TFRecords data pipeline:

def _parse_function(example_proto):
    feature_description = {
            'ny'  :[], tf.int64, default_value = 0),
            'nx'  :[], tf.int64, default_value = 0),
            'ntp' :[], tf.int64, default_value = 0),
            'ntf' :[], tf.int64, default_value = 0),
            'ncp' :[], tf.int64, default_value = 0),
            'ncf' :[], tf.int64, default_value = 0),
            'priors' :[], tf.string, default_value = ''),
            'forecasts' :[], tf.string, default_value = ''),
    features =, feature_description)
    priors =['priors'], tf.float32)
    forecasts =['forecasts'], tf.float32)

    ny  = features['ny']
    nx  = features['nx']
    ntp = features['ntp']
    ntf = features['ntf']
    ncp = features['ncp']
    ncf = features['ncf']

    priors = tf.reshape(priors, shape = [ntp, ny, nx, ncp])
    forecasts = tf.reshape(forecasts, shape = [ntf, ny, nx, ncf])

    return priors, forecasts

def create_dataset_onr_tfrecords(path,
                                 batch_size = 32,
                                 compression = 'GZIP',
                                 shuffle = True,
                                 deterministic = False):
    return / glob), shuffle = shuffle).interleave(
            lambda x:, compression_type = compression),
                    cycle_length =,
                    num_parallel_calls =,
                    deterministic = deterministic
                        num_parallel_calls =
                        batch_size, drop_remainder = True

I'll spare you the plot, but I am having the same issue with a vanilla TF dataset. I've tried removing interleave, removing GZIP compression, calling TFRecordDataset directly, removed batching, removed prefetching... nothing.

I believe this is a Tensorflow problem and (in particular) a TF Dataset problem: tensorflow/tensorflow#65675

This TF 2.16 + K3 era has been a disaster. Not the Keras part -- just some growing pains. But TF, man...

I am facing the same problem, using scripts from here:

which also involves with certain features from tensorflow dataset. The scripts is intended to do some modifications to an existing tensorflow dataset stored in TFRecord format.

Hi @Omitg24!

Are you facing this issue in the TensorFlow backend? Have you tested other backends (jax or torch)? Do you see this issue with other backends?

github-actions bot commented Aug 3, 2024

Omitg24 commented Aug 4, 2024


Are you facing this issue in the TensorFlow backend? Have you tested other backends (jax or torch)? Do you see this issue with other backends?
I've tried with Tensorflow Backend and its giving me that issue, but I've yet to try with TF 17.0.
I've also tried with Torch, a small trial version, the memory leak does not seem to happen, but it takes much more time than using keras.

ghsanti commented Sep 15, 2024


Does this still happen if you use the model.compile(..., run_eagerly=True) ?

Also use keras backend.clear_session() fn before deleting the model as other posts recommend.

