Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-threaded reading for avro #5421

Merged
merged 9 commits into from
May 20, 2022

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented May 4, 2022

This PR is to enable the multi-threaded reading for avro.

It has mainly

  • renamed the original AvroDataFileReader to AvroMetaFileReader to indicate its real behavior, that is collecting blocks' metadata.
  • created a new reader named AvroDataFileReader to read the data block by block in the iterator pattern.
  • created a new AvroFileReader being the parent of the AvroDataFileReader and AvroMetaFileReader.
  • implemented the core GpuMultiFileCloudAvroPartitionReader, who leverages the new added AvroDataFileReader to read the data from cloud files directly instead of collecting blocks' metadata first. We do this because the Avro file has no special section for blocks' metadata, and collecting the metadata through a file will take a very long time if the file is on the cloud and has many blocks, leading to quite bad performance.
  • added the tests for it.

Performance for s3 compatible storage (seconds)

  • Data files are on the cloud

  • The test ran on the local machine (CPU 12 cores, and one GPU (Titan V, with 12GB memory))

    Data Size CPU PERFILE MULTI-THREADED
    147MB 103.027 830.735 81.51
    1GB 679.514 3632.736 229.859

closes #5148
closes #5304

Signed-off-by: Firestarman firestarmanllc@gmail.com

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

@sameerz sameerz added the performance A performance related task/issue label May 4, 2022
@sameerz sameerz added this to the May 2 - May 20 milestone May 4, 2022
@firestarman
Copy link
Collaborator Author

@tgravescs Could you help review this ?

@firestarman firestarman requested a review from jlowe May 5, 2022 01:59
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

1 similar comment
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

fix conflicts and rebase to the top commit

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

2 similar comments
@firestarman
Copy link
Collaborator Author

build

@pxLi
Copy link
Collaborator

pxLi commented May 9, 2022

build

@tgravescs
Copy link
Collaborator

I haven't looked at code yet so maybe it will be answered there, but why is the PERFILE so much worse here, can't you use the same read technique there?

Copy link
Collaborator

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry haven't made it all through yet, just posting what I have so far

@firestarman
Copy link
Collaborator Author

firestarman commented May 11, 2022

I haven't looked at code yet so maybe it will be answered there, but why is the PERFILE so much worse here, can't you use the same read technique there?

It is due to the same reason, and I tried to apply this new Avro reader to PERFILE locally, but it got a littel worse perf for reading local files. Since we already have the multi-threaded reader for cloud cases, not sure if it would be good to do this.

Local:
            Files     CPU        PERFILE-new   PERFILE-old
            400       25.023     10.172        9.582 
            1000      27.141     16.441        12.722 
            2000      27.179     26.146        21.882 

Cloud:                                     
            128GB     94.134     88.834        830.753 
            1GB       402.612    341.626       3632.736

We can discuss more in this tracking issue #5458

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

Resolve the conflicts.

@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

@tgravescs Could you review this again ?

Copy link
Collaborator

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main concern here is compression on the avro files and our reading estimation? Maybe I'm missing something there.
Have we tested with different avro files with different compression codecs?

@@ -282,6 +287,10 @@ trait GpuAvroReaderBase extends Arm with Logging { self: FilePartitionReaderBase

def readDataSchema: StructType

def conf: Configuration

val cacheBufferSize = conf.getInt("avro.read.allocation.size", 8 * 1024 * 1024)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment where this default came from and why 8MB makes sense

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just copied it from the Parquet reader code.
val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024)

@@ -221,10 +216,20 @@ case class GpuAvroMultiFilePartitionReaderFactory(
/**
* Build the PartitionReader for cloud reading
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have a high level description of the approach it took, similar to what we describe in #5458. Really we should have done it for all the readers, about how it goes about filtering, and copying. Either here or in the GpuMultiFileCloudAvroPartitionReader class. The reason I put here is hte coalescing one filters the blocks in the buildBaseColumnarReaderForCoalescing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

AvroBlockMeta(reader.header, reader.headerSize, filteredBlocks)
}
val reader = closeOnExcept(in) { _ => AvroFileReader.openMetaReader(in) }
withResource(reader) { _ =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is odd, just do withResource(AvroFileReader.openMetaReader(in))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure the opened in will be closed if an exception is thrown when calling AvroFileReader.openMetaReader.

}

/**
* AvroDataFileReader reads the Avro file data in the iterator pattern.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used by the multifile cloud reader, correct. It might be good to expand the comment on what we mean by iterator pattern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added

*/
def readNextRawBlock(out: OutputStream): Unit = {
// This is designed to reduce the data copy as much as possible.
// Currently it leverages the BinarayDecoder, and data will be copied twice.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specify where its copied twice, ie once to x, again to y

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val startingBytesRead = fileSystemBytesRead()
val in = new FsInput(new Path(new URI(partFile.filePath)), config)
val reader = closeOnExcept(in) { _ => AvroFileReader.openDataReader(in) }
withResource(reader) { _ =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use withResource(AvroFileReader.openDataReader(in)).. or am I missing something you were trying to catch with the closeOnExcept here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure the opened in will be closed if an exception is thrown when calling AvroFileReader.openMetaReader.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the comment, I don't see you using reader outside of withResource and withResource has a finally block that will close it as well.

Copy link
Collaborator Author

@firestarman firestarman May 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, it opens a FsInput as in, which should be closed.
Next, it tries to open a reader with this in. If succeeds, a reader now is created, and this in will be closed when closing this reader by withResouce(reader). If fails, closeOnExcept(in) will make sure to close this in.

withResource(AvroFileReader.openDataReader(in)) will ensure both the reader and in being closed only when all succeeds, but the in will leak if an exception is thrown when opening a reader.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, thanks I missed that, this is still a weird way to do that because if something fails between the closeOnExcept and the withResources then reader is leaked. To me it makes more sense to have the openDatareader handle closing in if an exception is thrown. What if you changed openDataReader to pass in the filepath and have it create the FSInput and have the closeOnException?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I think you can pass the file path all the way into AvroFileReader and have it deal with it. That way its close method can skip closing it if was never opened.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, I will take it as a follow up.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is #5554

* Better to check its existence by calling 'hasNextBlock' first.
* This will not move the reader position forward.
*/
def peekBlock(reuse: MutableBlockInfo): MutableBlockInfo = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this mutable? that is definitely not a normal scala thing to do. did you specifically see issues with it creating new ones? If not please make it immutable

Copy link
Collaborator Author

@firestarman firestarman May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issue is met yet, but this is to reduce the number of temporary objects a lot in JVM, which can reduce the GC times, just like the handling as below, which is from the Avro DataFileStream .

  public D next(D reuse) throws IOException {
    if (!hasNext())
      throw new NoSuchElementException();
    D result = reader.read(reuse, datumIn);
    if (0 == --blockRemaining) {
      blockFinished();
    }
    return result;
  }

Without this reuse, each block will create a temporary object in JVM. In our tests, that is about 1000 objects for a file. But the number can reduced to 2 with this reuse. Besides we can reuse an instance because we do not need all the information objects at the same time. And one object will be no longer needed just after a new call to peekBlock.

Anyway I am fine to make it immutable if you prefer.

// block meta through the file is quite expensive for files in cloud. So we do
// not know the target buffer size ahead. Then we have to do an estimation.
// "the estimated total block size = partFile.length + additional space"
// Letting "additional space = one block length * 1.2" is we may move the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not exactly sure what is meant here. perhaps "is we may" should be "allows us to" ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for confusion, updated. It should be "is because we may".

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

firestarman commented May 17, 2022

Main concern here is compression on the avro files and our reading estimation? Maybe I'm missing something there.
Have we tested with different avro files with different compression codecs?

Not yet, but this will not be a problem. The partFile.length is the compressed size, and the reader also copies the compressed data to batch buffers as is.

@firestarman
Copy link
Collaborator Author

build

@tgravescs
Copy link
Collaborator

ok, can we make sure we add tests for all the compression types, we should have them in parquet and orc, especially want to make sure if its a type we don't support that we fail and don't return wrong data or something like that. If that is a followup thats fine since avro is off by default.

@firestarman
Copy link
Collaborator Author

ok, can we make sure we add tests for all the compression types, we should have them in parquet and orc, especially want to make sure if its a type we don't support that we fail and don't return wrong data or something like that. If that is a followup thats fine since avro is off by default.

Sure, added it as a follow-up in #4831

Copy link
Collaborator

@wbo4958 wbo4958 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, GTME

*
* When reading a file, it
* - seeks to the start position of the first block located in this partition.
* - next, parses the meta and sync, rewirtes the meta and sync, and copies the data to a
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: rewirtes -> rewrites

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

// Search for the sequence of bytes in the stream using Knuth-Morris-Pratt
var i = 0L
var j = 0
var b = in.read()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can create a followup for improving this. in.read() each reads only 1 byte, maybe reading into some buffer could be better.

Copy link
Collaborator Author

@firestarman firestarman May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the underlying StreamSource in BinaryDecoder does this.

withResource(reader) { _ =>
// Go to the start of the first block after the start position
reader.sync(partFile.start)
if (!reader.hasNextBlock || isDone) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks this piece of code is redundant, since the below "while" can cover this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is for the following estimation of buffer size. Besides, here checks the isDone flag at the same time.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

1 similar comment
@firestarman
Copy link
Collaborator Author

build

Copy link
Collaborator

@wbo4958 wbo4958 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@firestarman firestarman merged commit aabe71a into NVIDIA:branch-22.06 May 20, 2022
@firestarman firestarman deleted the avro-multi branch May 20, 2022 02:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Optimize remote Avro reading for a PartitionFile Add the MULTI-THREADED reading support for avro
5 participants