Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel process features #2216

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion catboost/python-package/catboost/_catboost.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ from util.generic.array_ref cimport TArrayRef, TConstArrayRef
from util.generic.hash cimport THashMap
from util.generic.hash_set cimport THashSet
from util.generic.maybe cimport TMaybe
from util.generic.ptr cimport THolder, TIntrusivePtr, MakeHolder
from util.generic.ptr cimport TAtomicSharedPtr, THolder, TIntrusivePtr, MakeHolder
from util.generic.string cimport TString, TStringBuf
from util.generic.vector cimport TVector
from util.system.types cimport ui8, ui16, ui32, ui64, i32, i64
Expand Down Expand Up @@ -954,6 +954,8 @@ cdef inline float _FloatOrNanFromString(const TString& s) except *:
cdef extern from "catboost/libs/gpu_config/interface/get_gpu_device_count.h" namespace "NCB":
cdef int GetGpuDeviceCount() except +ProcessException

ctypedef void (*task_callback_ptr) (int) nogil
ctypedef void (*callback_ptr) (int, int) nogil

cdef extern from "catboost/python-package/catboost/helpers.h":
cdef void SetPythonInterruptHandler() nogil
Expand All @@ -979,6 +981,11 @@ cdef extern from "catboost/python-package/catboost/helpers.h":
int threadCount,
ui64 cpuUsedRamLimit
) except +ProcessException
cdef void CallInParallel(ILocalExecutor* executor, task_callback_ptr, int block_count) nogil
cdef size_t column_block_size
cdef size_t objects_in_column
cdef callback_ptr CallbackForColumnProcessing
cdef TVector[TString]* processing_result


cdef extern from "catboost/python-package/catboost/helpers.h":
Expand Down Expand Up @@ -2691,6 +2698,46 @@ cdef _set_features_order_data_pd_data_frame_categorical_column(
)


class ColumnProcessor:
def __init__(self, object callback, column_array):
self.callback = callback
self.column_array = column_array

def __call__(self, start, end):
for idx in range(start, end):
processing_result[idx] = self.callback(self.column_array[idx])


cdef void call_python_code(int block_id) nogil :
CallbackForColumnProcessing(
block_id * column_block_size,
min(
(block_id + 1) * column_block_size,
objects_in_column
)
)


cdef parallel_process_features_column_to_vector(
TAtomicSharedPtr[TTbbLocalExecutor] executor,
column_array,
TVector[TString]* result,
object object_process_callback,
size_t block_size = 1024 * 16
) :
processing_result = result;
column_block_size = block_size
CallbackForColumnProcessing = ColumnProcessor(object_process_callback, column_array)
objects_in_column = len(column_array)
cdef size_t block_count = (objects_in_column + block_size - 1) // block_size
with nogil:
CallInParallel(
<ILocalExecutor*>executor.Get(),
call_python_code,
block_count
)


# returns new data holders array
cdef object _set_features_order_data_pd_data_frame(
data_frame,
Expand Down
12 changes: 12 additions & 0 deletions catboost/python-package/catboost/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,15 @@ void TrainEvalSplit(
*evalDataProvider = getSubset(postShuffleTestIndices);
}
}

thread_local size_t objects_in_column;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я бы занес эти переменные в структурку или класс и использовал специальные макросы/функции для доступа к нему.
Можно использовать Y_STATIC_THREAD & Y_THREAD из файла util/system/tls.h
или использовать хелпер-функцию FastTlsSingleton<T>() из util/thread/singleton.h

thread_local size_t column_block_size;
thread_local callback_ptr CallbackForColumnProcessing;
thread_local TVector<TString>* processing_result;

void CallInParallel(NPar::ILocalExecutor* executor, void(*callback) (int block_id), size_t block_count) {
auto task = [&](int id) {
callback(id);
};
executor->ExecRange(task, 0, block_count, NPar::TLocalExecutor::WAIT_COMPLETE);
}
8 changes: 8 additions & 0 deletions catboost/python-package/catboost/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,11 @@ void TrainEvalSplit(
);


extern thread_local size_t objects_in_column;
extern thread_local size_t column_block_size;
extern thread_local TVector<TString>* processing_result;

typedef void (*callback_ptr) (int, int);
extern thread_local callback_ptr CallbackForColumnProcessing;

void CallInParallel(NPar::ILocalExecutor* executor, void(*callback) (int block_id), size_t block_count);