-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17118][python] Add Cython support for primitive data types #11718
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit f08d03b (Wed Apr 15 11:40:15 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
self._value_coder = value_coder | ||
|
||
cpdef encode_to_stream(self, value, OutputStream out_stream, bint nested): | ||
self._value_coder.encode_to_stream(value, out_stream, False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
False -> nested
|
||
cdef class FlattenRowCoderImpl(StreamCoderImpl): | ||
def __cinit__(self, field_coders): | ||
self._output_field_coders = field_coders |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about removed the prefix _output
?
self._init_attribute() | ||
|
||
cpdef decode_from_stream(self, InputStream in_stream, bint nested): | ||
cdef WrapperInputElement wrapper_input_element |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to InputStreamWrapper?
|
||
import datetime | ||
|
||
cdef class WrapperFuncInputElement: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to InputStreamAndFunctionWrapper?
|
||
import datetime | ||
|
||
cdef class WrapperFuncInputElement: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some description about this class.
cdef encode_row_result(self, WrapperFuncInputElement wrapper_func_input_element, | ||
OutputStream out_stream): | ||
cdef list result | ||
self._before_encode(wrapper_func_input_element, out_stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_before_encode -> _prepare_encode
while self._input_buffer_size > self._input_pos: | ||
self._load_row() | ||
result = self.func(self.row) | ||
self._write_data(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_write_data -> _encode_one_row
return self._load_bytes().decode("utf-8") | ||
elif field_type == DATE: | ||
# Date | ||
return datetime.date.fromordinal(self._load_int() + 719163) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some explain how 719163 is computed?
self._output_field_type[i] = self._output_field_coders[i].type_name() | ||
self._output_coder_type[i] = self._output_field_coders[i].coder_type() | ||
|
||
cdef void _consume_input_data(self, WrapperInputElement wrapper_input_element, size_t size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_wrapInputStream
self._output_row_data = <char*> libc.stdlib.realloc(self._output_row_data, | ||
self._output_row_buffer_size) | ||
self._output_row_data[self._output_row_pos] = <unsigned char> (v >> 8) | ||
self._output_row_data[self._output_row_pos + 1] = <unsigned char> (v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the parentheses
Thanks a lot for @dianfu review, I have addressed the comments at the latest commit. |
out_stream.buffer_size = self._output_buffer_size | ||
|
||
cdef void _encode_byte(self, unsigned char val): | ||
if self._output_row_buffer_size < self._output_row_pos + 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you refactor this a bit and make it reusable for all the _encode_xxx functions?
cdef libc.stdint.int32_t length = strlen(b) | ||
self._encode_int(length) | ||
if self._output_row_buffer_size < self._output_row_pos + length: | ||
self._output_row_buffer_size *= 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is possibility that the buffer size _output_row_buffer_size *= 2
isn't large enough.
self._encode_int(milliseconds) | ||
|
||
# write 0x00 as end message of udtf | ||
cdef void _encode_end_message(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this method to TableFunctionRowCoderImpl?
self._output_remaining_bits_num = self._output_field_count % 8 | ||
self._output_row_buffer_size = 1024 | ||
self._output_row_pos = 0 | ||
self._output_row_data = <char*> libc.stdlib.malloc(self._output_row_buffer_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename it to something like _tmp_output_buffer
to make it more explicitly that this is a tempory buffer?
self._output_data[self._output_pos + 1] = 0x00 | ||
self._output_pos += 2 | ||
|
||
cdef void _copy_row_buffer_to_output_buffer(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to _copy_to_output_buffer?
out_stream.flush() | ||
self._output_pos = 0 | ||
|
||
cdef void _map_output_data_to_output_stream(self, OutputStream out_stream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments describing why we need to establish this map.
@@ -36,65 +36,230 @@ def check_coder(self, coder, *values): | |||
else: | |||
self.assertEqual(v, coder.decode(coder.encode(v))) | |||
|
|||
def check_cython_coder(self, python_field_coders, cython_field_coders, data): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename test_coders_common to test_coders.py and Move the coders for cython to test_fast_coders.py?
# decide whether two floats are equal | ||
@staticmethod | ||
def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0): | ||
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol) | ||
|
||
def skip_python_test(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use unittest.skipIf
@HuangXingBo Thanks a lot for the update. The test time increases a lot, could you take a look? |
5fd6151
to
271588a
Compare
It seems that it's because we run the udf related tests also in cython cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
What is the purpose of the change
This pull request will support primitive DataTypes in Cython
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation