-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support group concat #2539
Merged
Merged
support group concat #2539
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
2cc18bb
rebase master
fzhedu a033f74
init run ok
fzhedu 7b40cb9
remove tuple with any of null
fzhedu e4cf89c
support distinct and sort
fzhedu 58dd6c2
null is ok
fzhedu bce81bf
runs ok after test null
fzhedu f540e6e
compile ok
fzhedu b741a86
remove useless comments
fzhedu 9c93031
set group_concat_max_len
fzhedu b354a7d
add tests
fzhedu d3105ab
add more tests
fzhedu 337f949
reformate code
fzhedu 7efdff3
update tests
fzhedu 9fdbe71
cherry pick aggregate.function->setCollators(arg_collators);
windtalker 83210e9
update tests
fzhedu c7ca166
update test
fzhedu 63a407c
update test
fzhedu c382d16
add test with '' separator
fzhedu 9ae03a9
refactor code
fzhedu 856e74b
address comments from xufei
fzhedu fee493f
remove useless commets
fzhedu e4a81ae
add a TODO for delivering arguments of group_concat
fzhedu ff6a9bd
rename
fzhedu 5d6bbde
rename
fzhedu 05882d5
add more comments
fzhedu f00e570
address comments
fzhedu b5e21f2
address comments
fzhedu 61c9aa4
format code
fzhedu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Submodule tipb
updated
4 files
+367 −308 | go-tipb/expression.pb.go | |
+69 −71 | go-tipb/topsql_agent.pb.go | |
+1 −0 | proto/expression.proto | |
+1 −1 | proto/topsql_agent.proto |
271 changes: 271 additions & 0 deletions
271
dbms/src/AggregateFunctions/AggregateFunctionGroupConcat.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,271 @@ | ||
#include <AggregateFunctions/AggregateFunctionGroupUniqArray.h> | ||
#include <AggregateFunctions/AggregateFunctionNull.h> | ||
|
||
|
||
namespace DB | ||
{ | ||
|
||
/// a warp function on the top of groupArray and groupUniqArray, like the AggregateFunctionNull | ||
/// | ||
/// the input argument is in following two types: | ||
/// 1. only one column with original data type and without order_by items, for example: group_concat(c) | ||
/// 2. one column combined with more than one columns including concat items and order-by items, it should be like tuple(concat0, concat1... order0, order1 ...), for example: | ||
/// all columns = concat items + order-by items | ||
/// (c0,c1,o0,o1) = group_concat(c0,c1 order by o0,o1) | ||
/// group_concat(distinct c0,c1 order by b0,b1) = groupUniqArray(tuple(c0,c1,b0,b1)) -> distinct (c0, c1) , i.e., remove duplicates further | ||
|
||
template <bool result_is_nullable, bool only_one_column> | ||
class AggregateFunctionGroupConcat final : public AggregateFunctionNullBase<result_is_nullable, AggregateFunctionGroupConcat<result_is_nullable, only_one_column>> | ||
{ | ||
using State = AggregateFunctionGroupUniqArrayGenericData; | ||
|
||
public: | ||
AggregateFunctionGroupConcat(AggregateFunctionPtr nested_function, const DataTypes & input_args, const String& sep, const UInt64& max_len_, const SortDescription & sort_desc_, const NamesAndTypes& all_columns_names_and_types_, const TiDB::TiDBCollators& collators_, const bool has_distinct) | ||
: AggregateFunctionNullBase<result_is_nullable, AggregateFunctionGroupConcat<result_is_nullable, only_one_column>>(nested_function), | ||
separator(sep),max_len(max_len_), sort_desc(sort_desc_), | ||
all_columns_names_and_types(all_columns_names_and_types_), collators(collators_) | ||
{ | ||
if (input_args.size() != 1) | ||
throw Exception("Logical error: more than 1 arguments are passed to AggregateFunctionGroupConcat", ErrorCodes::LOGICAL_ERROR); | ||
nested_type = std::make_shared<DataTypeArray>(removeNullable(input_args[0])); | ||
|
||
number_of_concat_items = all_columns_names_and_types.size() - sort_desc.size(); | ||
|
||
is_nullable.resize(number_of_concat_items); | ||
for (size_t i = 0; i < number_of_concat_items; ++i) | ||
{ | ||
is_nullable[i] = all_columns_names_and_types[i].type->isNullable(); | ||
/// the inputs of a nested agg reject null, but for more than one args, tuple(args...) is already not nullable, | ||
/// so here just remove null for the only_one_column case | ||
if constexpr (only_one_column) | ||
{ | ||
all_columns_names_and_types[i].type = removeNullable(all_columns_names_and_types[i].type); | ||
} | ||
} | ||
|
||
/// remove redundant rows excluding extra sort items (which do not occur in the concat list) or considering collation | ||
if(has_distinct) | ||
{ | ||
for (auto & desc : sort_desc) | ||
{ | ||
bool is_extra = true; | ||
for (size_t i = 0; i < number_of_concat_items; ++i) | ||
{ | ||
if (desc.column_name == all_columns_names_and_types[i].name) | ||
{ | ||
is_extra = false; | ||
break; | ||
} | ||
} | ||
if (is_extra) | ||
{ | ||
to_get_unique = true; | ||
break; | ||
} | ||
} | ||
/// because GroupUniqArray does consider collations, so if there are collations, | ||
/// we should additionally remove redundant rows with consideration of collations | ||
if(!to_get_unique) | ||
{ | ||
bool has_collation = false; | ||
for (size_t i = 0; i < number_of_concat_items; ++i) | ||
{ | ||
if (collators[i] != nullptr) | ||
{ | ||
has_collation = true; | ||
break; | ||
} | ||
} | ||
to_get_unique = has_collation; | ||
} | ||
} | ||
} | ||
|
||
DataTypePtr getReturnType() const override | ||
{ | ||
return result_is_nullable | ||
? makeNullable(ret_type) | ||
: ret_type; | ||
} | ||
|
||
/// reject nulls before add() of nested agg | ||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override | ||
{ | ||
if constexpr (only_one_column) | ||
{ | ||
if(is_nullable[0]) | ||
{ | ||
const ColumnNullable * column = static_cast<const ColumnNullable *>(columns[0]); | ||
if (!column->isNullAt(row_num)) | ||
{ | ||
this->setFlag(place); | ||
const IColumn * nested_column = &column->getNestedColumn(); | ||
this->nested_function->add(this->nestedPlace(place), &nested_column, row_num, arena); | ||
} | ||
return; | ||
} | ||
} | ||
else | ||
{ | ||
/// remove the row with null, except for sort columns | ||
const ColumnTuple & tuple = static_cast<const ColumnTuple &>(*columns[0]); | ||
for (size_t i = 0; i < number_of_concat_items; ++i) | ||
{ | ||
if (is_nullable[i]) | ||
{ | ||
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(tuple.getColumn(i)); | ||
if (nullable_col.isNullAt(row_num)) | ||
{ | ||
/// If at least one column has a null value in the current row, | ||
/// we don't process this row. | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
this->setFlag(place); | ||
this->nested_function->add(this->nestedPlace(place), columns, row_num, arena); | ||
} | ||
|
||
void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override | ||
{ | ||
ColumnString * col_str= nullptr; | ||
ColumnNullable * col_null= nullptr; | ||
if constexpr (result_is_nullable) | ||
{ | ||
col_null = &static_cast<ColumnNullable &>(to); | ||
col_str = &static_cast<ColumnString &>(col_null->getNestedColumn()); | ||
} | ||
else | ||
{ | ||
col_str = & static_cast<ColumnString &>(to); | ||
} | ||
|
||
if (this->getFlag(place)) | ||
{ | ||
if constexpr (result_is_nullable) | ||
{ | ||
col_null->getNullMapData().push_back(0); | ||
} | ||
|
||
/// get results from nested function, named nested_results | ||
auto mutable_nested_cols = nested_type->createColumn(); | ||
this->nested_function->insertResultInto(this->nestedPlace(place), *mutable_nested_cols, arena); | ||
const auto column_array = checkAndGetColumn<ColumnArray>(mutable_nested_cols.get()); | ||
|
||
/// nested_columns are not nullable, because the nullable rows are removed in add() | ||
Columns nested_cols; | ||
if constexpr (only_one_column) | ||
{ | ||
nested_cols.push_back(column_array->getDataPtr()); | ||
} | ||
else | ||
{ | ||
auto & cols = checkAndGetColumn<ColumnTuple>(&column_array->getData())->getColumns(); | ||
nested_cols.insert(nested_cols.begin(),cols.begin(),cols.end()); | ||
} | ||
|
||
/// sort the nested_col of Array type | ||
if(!sort_desc.empty()) | ||
sortColumns(nested_cols); | ||
|
||
/// get unique flags | ||
std::vector<bool> unique; | ||
if (to_get_unique) | ||
getUnique(nested_cols, unique); | ||
|
||
writeToStringColumn(nested_cols,col_str, unique); | ||
|
||
} | ||
else | ||
{ | ||
if constexpr (result_is_nullable) | ||
col_null->insertDefault(); | ||
else | ||
col_str->insertDefault(); | ||
} | ||
} | ||
|
||
bool allocatesMemoryInArena() const override | ||
{ | ||
return this->nested_function->allocatesMemoryInArena(); | ||
} | ||
|
||
private: | ||
/// construct a block to sort in the case with order-by requirement | ||
void sortColumns(Columns& nested_cols) const | ||
{ | ||
Block res; | ||
int concat_size = nested_cols.size(); | ||
for(int i = 0 ; i < concat_size; ++i ) | ||
{ | ||
res.insert(ColumnWithTypeAndName(nested_cols[i], all_columns_names_and_types[i].type, all_columns_names_and_types[i].name)); | ||
} | ||
/// sort a block with collation | ||
sortBlock(res, sort_desc); | ||
nested_cols = res.getColumns(); | ||
} | ||
|
||
/// get unique argument columns by inserting the unique of the first N of (N + M sort) internal columns within tuple | ||
void getUnique(const Columns & cols, std::vector<bool> & unique) const | ||
{ | ||
std::unique_ptr<State> state = std::make_unique<State>(); | ||
Arena arena1; | ||
auto size = cols[0]->size(); | ||
unique.resize(size); | ||
std::vector<String> containers(collators.size()); | ||
for (size_t i = 0; i < size; ++i) | ||
{ | ||
bool inserted=false; | ||
State::Set::LookupResult it; | ||
const char * begin = nullptr; | ||
size_t values_size = 0; | ||
for (size_t j = 0; j< number_of_concat_items; ++j) | ||
values_size += cols[j]->serializeValueIntoArena(i, arena1, begin, collators[j],containers[j]).size; | ||
|
||
StringRef str_serialized= StringRef(begin, values_size); | ||
state->value.emplace(str_serialized, it, inserted); | ||
unique[i] = inserted; | ||
} | ||
} | ||
|
||
/// write each column cell to string with separator | ||
void writeToStringColumn(const Columns& cols, ColumnString * const col_str, const std::vector<bool> & unique) const | ||
{ | ||
WriteBufferFromOwnString write_buffer; | ||
auto size = cols[0]->size(); | ||
for (size_t i = 0; i < size; ++i) | ||
{ | ||
if(unique.empty() || unique[i]) | ||
{ | ||
if (i != 0) | ||
{ | ||
writeString(separator, write_buffer); | ||
} | ||
for (size_t j = 0; j < number_of_concat_items; ++j) | ||
{ | ||
all_columns_names_and_types[j].type->serializeText(*cols[j], i, write_buffer); | ||
} | ||
} | ||
/// TODO(FZH) output just one warning ("Some rows were cut by GROUPCONCAT()") if this happen | ||
if(write_buffer.count() >=max_len) | ||
{ | ||
break; | ||
} | ||
} | ||
col_str->insertData(write_buffer.str().c_str(),std::min(max_len,write_buffer.count())); | ||
} | ||
|
||
bool to_get_unique =false; | ||
DataTypePtr ret_type = std::make_shared<DataTypeString>(); | ||
DataTypePtr nested_type; | ||
size_t number_of_concat_items = 0; | ||
String separator =","; | ||
UInt64 max_len; | ||
SortDescription sort_desc; | ||
NamesAndTypes all_columns_names_and_types; | ||
TiDB::TiDBCollators collators; | ||
BoolVec is_nullable; | ||
}; | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strange behavior, could you explain in comment here, i.e. why the two functions are specially treated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, added more comments