Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core #29471
[SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core #29471
Changes from 28 commits
2d6add1
02ea25d
fded394
f2fdcd7
37253da
e6eee1d
0492bee
138e14a
dace630
20586d3
7bb0770
4eb770a
8a5fd8b
d85c5a4
3b0bf18
8b7a2fa
e0ec9a6
b1bf5e9
5529047
6f5c7e5
43fc5a0
1f9bbbc
0ab104d
2186a66
549f335
2b1aacd
f5e9581
86c2013
bfa37cc
2d8e64d
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add more few words for
areSQLRootPaths
? Seeing SQL incore
is already a bit strange, so it's nicer to let developers can quickly get better idea just from reading doc.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is unfortunate. I think this parameter doesn't have to be visible to the callers though as it is set to true on the initial call and false on subsequent recursive calls. We can potentially add another overloaded method without this parameter and make this one private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that idea @sunchao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put
HiveCatalogMetrics
here looks strange. This is a "Metrics for access to the hive external catalog". Should we skip this or create another metrics?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. It seems inappropriate here. Not sure what is the best way to plugin Hive/SQL metrics here but I'll think over it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could pass in a callback for listing metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or call this
HiveCatalogMetrics.incrementParallelListingJobCount(1)
in SQL side before callingparallelListLeafFiles
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the problem with that is that if we have a root directory with many sub-directories in it, we may initially choose to do non-parallel listing and then as the sub directories build up switch to parallel listing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah move this to SQL will not work - I think we can perhaps add a callback later just like @holdenk suggested (if you don't mind leaving it here in this PR).
On the other hand, I think the
HiveCatalogMetrics
is already misleading -InMemoryFileIndex
is used by non-Hive data sources like file-based ones and this particular metric itself has nothing to do with Hive catalog. Perhaps a better approach is to move the metrics to some place in core that are for storage-specific things. In future we can include more such as listing total time etc.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A callback sounds good. We can do it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me, @sunchao want to file a JIRA for switching this to a callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure filed https://issues.apache.org/jira/browse/SPARK-32880 for the follow-ups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we have a consistent name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copied from the current impl in
InMemoryFileIndex
. I can resolve this in a follow-up PR if you like (as this PR is limited to refactoring).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the is/are is because listLeafFiles takes a single name and this method takes in a list of files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recent hadoop builds declare path Serializable,. FileStatus should be too, but there's always the risk some subclass isn't
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I plan to remove this in a followup, and since
FileStatus
is serializable, any subclass of it should also be, isn't it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"should"; there's no tests for it, and nothing in the java language which stops you adding references to non-serializable objects. Exception has this exact issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a new parameter that did not exist before. Why do we need this? If people want parallelized listing, people can invoke
parallelListLeafFiles
above.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because
listLeafFiles
also recursively callparallelListLeafFiles
inside so we need a way to pass down the arguments. These used to be read from conf inSparkSession
but now made explicit as parameters, as we no longer have a session object.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd question that -and its much better to use the incremental listLocatedStatus for better performance on paged object store listings, especially if you can do useful stuff while it takes place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add
S3AFileSystem
(in a followup) and perhaps a few others here since they also support pagination. The fallback part of the comments confuses me since I don't really see a fallback here forDistributedFileSystem
etc.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, do you think we can add something to CommonPathCapabilities for this? so that we don't have to enumerate all these here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just switch to the incremental one everywhere
HDFS likes it because when you have many, many files in a path they can release the lock on the NN; for object stores they always have to page in (s3, azure, GCS)....which gives the implementors an opportunity to move from the default implementation to exposing the incremental one. Add this and we can just let the relevant teams know.
w.r.t CommonPathCapabilities, I suppose we could add one which declares that the listing ops are paged. But do you really want code to try and be that clever? I'm trying to use that feature to mark up
If you want a performance option, we could add one.
BTW, PathCapabilities is in hadoop-3.2.x now, will be in next release. I might do it for 3.1 too...it makes a good way to programmatically/CLI probe for s3a dir marker policy, see.
Also, cloudstore has some CLI commands for the list calls (and pathcapabilities) to help explore what's going on, including, on s3a, listing cost in #of HTTP requests. Worth playing with to see what is good/bad, though as Mukund has been doing lots of work on 3.3.x s3a list, it will look worse than you'd expect on a dir treewalk,
https://github.com/steveloughran/cloudstore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think changing this in a follow up PR sounds fine to me. I'd like to us to use the faster method by default and fall back on exception, but that could be a follow on. Want to file a JIRA for broadening the scope of using the faster method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w.r.t the faster (incremental) calls, yes, something to consider next. At the very least, you will be able to collect and report/aggregate stats. For example, here is me getting the stats on a large/deep list just by calling toString on the RemoteIterator after it's done its work
Now, who wouldn't like to know things like that. And ideally, collect across threads and merge back in.
As an aside, looked at {{org.apache.hadoop.mapred.LocatedFileStatusFetcher}}. This does multithreaded status fetching and collects those stats. Although it's tagged Private, I've noticed Parquet uses it so my next PR will convert to public/evolving and document the fact.
If you could use that, we could look @ evolving it better, especially returning a RemoteIterator of results which we could incrementally fill in across threads rather than block for the final results. Anything which makes it possible for app code to process data (read footers, etc) while the listing goes on has significant benefit in the World of Object Stores
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, stats will be nice. Looking forward to that 👍 (is it tracked by this PR?)
w.r.t
LocatedFileStatusFetcher
, yes it will be great if this is exposed as a public API. I think we should also consider moving this to another module such as hadoop-common, and perhaps adding an option to turn on/off locality (so that we don't have to get the block locations if they are not needed). Pairing it with batch listing can also potentially help (although this is only available in HDFS currently).Instead of multiple threads, Spark currently use a distributed job so I'm not sure whether there will be any regression by doing that, but we can explore this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd worry about the effect of moving the package as is. It'd have to be a copy and paste, or move and subclass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao -yes, that's the stats API. Now is the time to review it and tell me where it doesn't suit your (perceived future) needs. I've been playing with a Parquet branch which uses it, not in Spark itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the info @steveloughran . I'll check out the PR (it is a very big one though).