-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support multi-threaded reading for avro #5421
Conversation
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
@tgravescs Could you help review this ? |
build |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
1 similar comment
build |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
fix conflicts and rebase to the top commit |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
2 similar comments
build |
build |
I haven't looked at code yet so maybe it will be answered there, but why is the PERFILE so much worse here, can't you use the same read technique there? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry haven't made it all through yet, just posting what I have so far
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Show resolved
Hide resolved
It is due to the same reason, and I tried to apply this new Avro reader to PERFILE locally, but it got a littel worse perf for reading local files. Since we already have the multi-threaded reader for cloud cases, not sure if it would be good to do this.
We can discuss more in this tracking issue #5458 |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Resolve the conflicts. |
build |
@tgravescs Could you review this again ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main concern here is compression on the avro files and our reading estimation? Maybe I'm missing something there.
Have we tested with different avro files with different compression codecs?
@@ -282,6 +287,10 @@ trait GpuAvroReaderBase extends Arm with Logging { self: FilePartitionReaderBase | |||
|
|||
def readDataSchema: StructType | |||
|
|||
def conf: Configuration | |||
|
|||
val cacheBufferSize = conf.getInt("avro.read.allocation.size", 8 * 1024 * 1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment where this default came from and why 8MB makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just copied it from the Parquet reader code.
val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024)
@@ -221,10 +216,20 @@ case class GpuAvroMultiFilePartitionReaderFactory( | |||
/** | |||
* Build the PartitionReader for cloud reading | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to have a high level description of the approach it took, similar to what we describe in #5458. Really we should have done it for all the readers, about how it goes about filtering, and copying. Either here or in the GpuMultiFileCloudAvroPartitionReader class. The reason I put here is hte coalescing one filters the blocks in the buildBaseColumnarReaderForCoalescing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
AvroBlockMeta(reader.header, reader.headerSize, filteredBlocks) | ||
} | ||
val reader = closeOnExcept(in) { _ => AvroFileReader.openMetaReader(in) } | ||
withResource(reader) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is odd, just do withResource(AvroFileReader.openMetaReader(in))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to ensure the opened in
will be closed if an exception is thrown when calling AvroFileReader.openMetaReader
.
} | ||
|
||
/** | ||
* AvroDataFileReader reads the Avro file data in the iterator pattern. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only used by the multifile cloud reader, correct. It might be good to expand the comment on what we mean by iterator pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, added
*/ | ||
def readNextRawBlock(out: OutputStream): Unit = { | ||
// This is designed to reduce the data copy as much as possible. | ||
// Currently it leverages the BinarayDecoder, and data will be copied twice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
specify where its copied twice, ie once to x, again to y
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
val startingBytesRead = fileSystemBytesRead() | ||
val in = new FsInput(new Path(new URI(partFile.filePath)), config) | ||
val reader = closeOnExcept(in) { _ => AvroFileReader.openDataReader(in) } | ||
withResource(reader) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use withResource(AvroFileReader.openDataReader(in)).. or am I missing something you were trying to catch with the closeOnExcept here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to ensure the opened in
will be closed if an exception is thrown when calling AvroFileReader.openMetaReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the comment, I don't see you using reader outside of withResource and withResource has a finally block that will close it as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, it opens a FsInput as in
, which should be closed.
Next, it tries to open a reader with this in
. If succeeds, a reader now is created, and this in
will be closed when closing this reader by withResouce(reader)
. If fails, closeOnExcept(in)
will make sure to close this in
.
withResource(AvroFileReader.openDataReader(in))
will ensure both the reader and in
being closed only when all succeeds, but the in
will leak if an exception is thrown when opening a reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, thanks I missed that, this is still a weird way to do that because if something fails between the closeOnExcept and the withResources then reader is leaked. To me it makes more sense to have the openDatareader handle closing in if an exception is thrown. What if you changed openDataReader to pass in the filepath and have it create the FSInput and have the closeOnException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I think you can pass the file path all the way into AvroFileReader and have it deal with it. That way its close method can skip closing it if was never opened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, I will take it as a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it is #5554
* Better to check its existence by calling 'hasNextBlock' first. | ||
* This will not move the reader position forward. | ||
*/ | ||
def peekBlock(reuse: MutableBlockInfo): MutableBlockInfo = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this mutable? that is definitely not a normal scala thing to do. did you specifically see issues with it creating new ones? If not please make it immutable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issue is met yet, but this is to reduce the number of temporary objects a lot in JVM, which can reduce the GC times, just like the handling as below, which is from the Avro DataFileStream .
public D next(D reuse) throws IOException {
if (!hasNext())
throw new NoSuchElementException();
D result = reader.read(reuse, datumIn);
if (0 == --blockRemaining) {
blockFinished();
}
return result;
}
Without this reuse
, each block will create a temporary object in JVM. In our tests, that is about 1000 objects for a file. But the number can reduced to 2 with this reuse
. Besides we can reuse an instance because we do not need all the information objects at the same time. And one object will be no longer needed just after a new call to peekBlock
.
Anyway I am fine to make it immutable if you prefer.
// block meta through the file is quite expensive for files in cloud. So we do | ||
// not know the target buffer size ahead. Then we have to do an estimation. | ||
// "the estimated total block size = partFile.length + additional space" | ||
// Letting "additional space = one block length * 1.2" is we may move the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not exactly sure what is meant here. perhaps "is we may" should be "allows us to" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for confusion, updated. It should be "is because we may".
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Not yet, but this will not be a problem. The |
build |
ok, can we make sure we add tests for all the compression types, we should have them in parquet and orc, especially want to make sure if its a type we don't support that we fail and don't return wrong data or something like that. If that is a followup thats fine since avro is off by default. |
Sure, added it as a follow-up in #4831 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, GTME
* | ||
* When reading a file, it | ||
* - seeks to the start position of the first block located in this partition. | ||
* - next, parses the meta and sync, rewirtes the meta and sync, and copies the data to a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: rewirtes -> rewrites
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
// Search for the sequence of bytes in the stream using Knuth-Morris-Pratt | ||
var i = 0L | ||
var j = 0 | ||
var b = in.read() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can create a followup for improving this. in.read()
each reads only 1 byte, maybe reading into some buffer could be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the underlying StreamSource in BinaryDecoder does this.
withResource(reader) { _ => | ||
// Go to the start of the first block after the start position | ||
reader.sync(partFile.start) | ||
if (!reader.hasNextBlock || isDone) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks this piece of code is redundant, since the below "while" can cover this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is for the following estimation of buffer size. Besides, here checks the isDone
flag at the same time.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala
Show resolved
Hide resolved
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
1 similar comment
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR is to enable the multi-threaded reading for avro.
It has mainly
AvroDataFileReader
toAvroMetaFileReader
to indicate its real behavior, that is collecting blocks' metadata.AvroDataFileReader
to read the data block by block in the iterator pattern.AvroFileReader
being the parent of theAvroDataFileReader
andAvroMetaFileReader
.GpuMultiFileCloudAvroPartitionReader
, who leverages the new addedAvroDataFileReader
to read the data from cloud files directly instead of collecting blocks' metadata first. We do this because the Avro file has no special section for blocks' metadata, and collecting the metadata through a file will take a very long time if the file is on the cloud and has many blocks, leading to quite bad performance.Performance for s3 compatible storage (seconds)
Data files are on the cloud
The test ran on the local machine (CPU 12 cores, and one GPU (Titan V, with 12GB memory))
closes #5148
closes #5304
Signed-off-by: Firestarman firestarmanllc@gmail.com