Skip to content

Commit

Permalink
Merge pull request #13815 from jacquesqiao/optimize-pyreader
Browse files Browse the repository at this point in the history
optimize pyreader
  • Loading branch information
jacquesqiao committed Nov 8, 2018
2 parents a270fdf + ce99419 commit 5b7a9dd
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 143 deletions.
1 change: 1 addition & 0 deletions paddle/fluid/API.spec
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True))
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
Expand Down
326 changes: 212 additions & 114 deletions python/paddle/fluid/layers/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

__all__ = [
'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
'random_data_generator', 'py_reader', 'Preprocessor', 'load'
'random_data_generator', 'py_reader', 'create_py_reader_by_data',
'Preprocessor', 'load'
]


Expand Down Expand Up @@ -475,6 +476,159 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var)


def _py_reader(capacity,
shapes,
dtypes,
lod_levels=None,
name=None,
use_double_buffer=True,
feed_list=None):

if feed_list is not None:
if not isinstance(feed_list, list):
raise TypeError("feed_list should be a list of Variable"
" instead of " + str(type(feed_list)))
lod_levels = []
dtypes = []
shape_concat = []
ranks = []
shapes = []

for feed_data in feed_list:
dtypes.append(feed_data.dtype)
shape_concat.extend(feed_data.shape)
ranks.append(len(feed_data.shape))
shapes.append(feed_data.shape)
lod_levels.append(feed_data.lod_level)
else:
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []

for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))

if lod_levels is None:
lod_levels = [0] * len(shapes)

if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])

var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)

startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})

startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True

main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)

reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader

# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False

def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp

array.append(item)

if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()

reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start()

def __set_tensor_provider__(func):
reader.tensor_provider = func

def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
actual_feed_list = feed_list
if actual_feed_list is None:
actual_feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
actual_feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1

data_names = [feed_data.name for feed_data in actual_feed_list]
feeder = DataFeeder(
feed_list=actual_feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)

def __tensor_provider__():
for slots in paddle_reader():
yield [slots[data_name] for data_name in data_names]

__set_tensor_provider__(__tensor_provider__)

def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False

def __start__():
start_provide_thread(reader.tensor_provider)

reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__

return reader


def py_reader(capacity,
shapes,
dtypes,
Expand Down Expand Up @@ -599,128 +753,72 @@ def py_reader(capacity,
>>> except fluid.core.EOFException:
>>> test_reader.reset()
"""
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []

for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))

if lod_levels is None:
lod_levels = [0] * len(shapes)

if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])

var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)

startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})

startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True

main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)

reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader

# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False

def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp

array.append(item)

if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()
return _py_reader(
capacity=capacity,
shapes=shapes,
dtypes=dtypes,
lod_levels=lod_levels,
name=name,
use_double_buffer=use_double_buffer)

reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start()

def __set_tensor_provider__(func):
reader.tensor_provider = func
def create_py_reader_by_data(capacity,
feed_list,
name=None,
use_double_buffer=True):
"""
Create a Python reader for data feeding in Python
def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1

feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)
This layer returns a Reader Variable.
def __tensor_provider__():
for slots in paddle_reader():
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
Works much like py_reader except that it's input is feed_list
instead of shapes, dtypes and lod_levels
__set_tensor_provider__(__tensor_provider__)
Args:
capacity(int): The buffer capacity maintained by :code:`py_reader`.
feed_list(list(Variable)): The data feed list.
name(basestring): The prefix Python queue name and Reader name. None will
be generated automatically.
use_double_buffer(bool): Whether use double buffer or not.
def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False
Returns:
Variable: A Reader from which we can get feeding data.
def __start__():
start_provide_thread(reader.tensor_provider)
Examples:
reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__
1. The basic usage of :code:`py_reader` is as follows:
return reader
>>> import paddle.fluid as fluid
>>> import paddle.dataset.mnist as mnist
>>>
>>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32')
>>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64')
>>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label])
>>> reader.decorate_paddle_reader(
>>> paddle.reader.shuffle(paddle.batch(mnist.train())
>>>
>>> img, label = fluid.layers.read_file(reader)
>>> loss = network(img, label) # some network definition
>>>
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
>>>
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
>>> for epoch_id in range(10):
>>> reader.start()
>>> try:
>>> while True:
>>> exe.run(fetch_list=[loss.name])
>>> except fluid.core.EOFException:
>>> reader.reset()
"""
return _py_reader(
capacity=capacity,
shapes=None,
dtypes=None,
lod_levels=None,
name=name,
use_double_buffer=use_double_buffer,
feed_list=feed_list)


def open_files(filenames,
Expand Down
Loading

0 comments on commit 5b7a9dd

Please sign in to comment.