Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize pyreader #13815

Merged
merged 7 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paddle/fluid/API.spec
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True))
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@ endif(NOT WIN32)
if(WITH_INFERENCE)
# NOTE: please add subdirectory inference at last.
add_subdirectory(inference)
add_subdirectory(train)
endif()

add_subdirectory(train)
326 changes: 212 additions & 114 deletions python/paddle/fluid/layers/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

__all__ = [
'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
'random_data_generator', 'py_reader', 'Preprocessor', 'load'
'random_data_generator', 'py_reader', 'create_py_reader_by_data',
'Preprocessor', 'load'
]


Expand Down Expand Up @@ -474,6 +475,159 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var)


def _py_reader(capacity,
shapes,
dtypes,
lod_levels=None,
name=None,
use_double_buffer=True,
feed_list=None):

if feed_list is not None:
if not isinstance(feed_list, list):
raise TypeError("feed_list should be a list of Variable"
" instead of " + str(type(feed_list)))
lod_levels = []
dtypes = []
shape_concat = []
ranks = []
shapes = []

for feed_data in feed_list:
dtypes.append(feed_data.dtype)
shape_concat.extend(feed_data.shape)
ranks.append(len(feed_data.shape))
shapes.append(feed_data.shape)
lod_levels.append(feed_data.lod_level)
else:
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []

for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))

if lod_levels is None:
lod_levels = [0] * len(shapes)

if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])

var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)

startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})

startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True

main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)

reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader

# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False

def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp

array.append(item)

if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()

reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start()

def __set_tensor_provider__(func):
reader.tensor_provider = func

def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
actual_feed_list = feed_list
if actual_feed_list is None:
actual_feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
actual_feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1

data_names = [feed_data.name for feed_data in actual_feed_list]
feeder = DataFeeder(
feed_list=actual_feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)

def __tensor_provider__():
for slots in paddle_reader():
yield [slots[data_name] for data_name in data_names]

__set_tensor_provider__(__tensor_provider__)

def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False

def __start__():
start_provide_thread(reader.tensor_provider)

reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__

return reader


def py_reader(capacity,
shapes,
dtypes,
Expand Down Expand Up @@ -598,128 +752,72 @@ def py_reader(capacity,
>>> except fluid.core.EOFException:
>>> test_reader.reset()
"""
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []

for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))

if lod_levels is None:
lod_levels = [0] * len(shapes)

if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])

var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)

startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})

startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True

main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)

reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader

# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False

def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp

array.append(item)

if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()
return _py_reader(
capacity=capacity,
shapes=shapes,
dtypes=dtypes,
lod_levels=lod_levels,
name=name,
use_double_buffer=use_double_buffer)

reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start()

def __set_tensor_provider__(func):
reader.tensor_provider = func
def create_py_reader_by_data(capacity,
feed_list,
name=None,
use_double_buffer=True):
"""
Create a Python reader for data feeding in Python

def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1

feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)
This layer returns a Reader Variable.

def __tensor_provider__():
for slots in paddle_reader():
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
Works much like py_reader except that it's input is feed_list
instead of shapes, dtypes and lod_levels

__set_tensor_provider__(__tensor_provider__)
Args:
capacity(int): The buffer capacity maintained by :code:`py_reader`.
feed_list(list(Variable)): The data feed list.
name(basestring): The prefix Python queue name and Reader name. None will
be generated automatically.
use_double_buffer(bool): Whether use double buffer or not.

def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False
Returns:
Variable: A Reader from which we can get feeding data.

def __start__():
start_provide_thread(reader.tensor_provider)
Examples:

reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__
1. The basic usage of :code:`py_reader` is as follows:

return reader
>>> import paddle.fluid as fluid
>>> import paddle.dataset.mnist as mnist
>>>
>>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32')
>>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64')
>>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label])
>>> reader.decorate_paddle_reader(
>>> paddle.reader.shuffle(paddle.batch(mnist.train())
>>>
>>> img, label = fluid.layers.read_file(reader)
>>> loss = network(img, label) # some network definition
>>>
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
>>>
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
>>> for epoch_id in range(10):
>>> reader.start()
>>> try:
>>> while True:
>>> exe.run(fetch_list=[loss.name])
>>> except fluid.core.EOFException:
>>> reader.reset()
"""
return _py_reader(
capacity=capacity,
shapes=None,
dtypes=None,
lod_levels=None,
name=name,
use_double_buffer=use_double_buffer,
feed_list=feed_list)


def open_files(filenames,
Expand Down
Loading