Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TCP_KEEPALIVE option to http provider #24967

Merged
merged 1 commit into from
Jul 12, 2022

Conversation

potiuk
Copy link
Member

@potiuk potiuk commented Jul 11, 2022

Enabling TCP_KEEPALIVE allows to keep the idle connections opened
when there are firewalls in-betweeen that close such connections.

TCP_KEEPALIVE sends a "no-data" packet regularly (and expects
ACK from the server). This should not be mistaken with Python
requests "keep alive" feature - which reuses opened HTTP connections
if you perform several requests to the same server (both are named
keep alive but they both mean two completely different things and
are implemented at differen layers of the OSI network stack. The
Requests Keep Alive is Layer 7 (application) and TCP Keep Alive
is at Layer 4 (transport).

The "Requests" keep alive is enabled by default in the requests
library while the TCP Keep Alive requires a TCPKeepAliveAdapter
to be used (from requests_toolbelt library).

Fixes: #21365


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2022

I think this one is worth breaking change (which is not really breaking in most cases) - enabling TCP Keepalive for SimpleHTTPOperator will have very limited impact in vast majority of cases (either no or very little increased traffic) however it will help out-of-the-box for a number of problems that people report with long running idle connections.

The one "drawback" I can think of in making such a "breaking" change is that in some cases, the firewalls will actully NOT break connections that were broken before because they were idle. However, that is usually not something that should be seen as "bad" - the way how TCP/HTTP works, is that such firewalls breaking idle connections have rather bad effect on the client - the client does not get "disconnect" and the connection is kept open from the client, so the client does not realise tha the connection has already been broken. So even if the connetion is broken, airflow still thinks it is not. Adding Keepalive will actually make it behave much better in case the server stops responding as lack of ACK in Keepalive will actually signal to client that it should close the connection and error out. So in all cases, it seems like a better behaviour. And you can still disable it by setting "tcp_keep_alive" to False.

It is a breaking change though, so we should increase the version of the provider.

It's very difficult to test this one via unit tests, but I tested it in Breeze using `tcpdump' by setting the values to low ones (keep alives generated avery second after 1 second of inactivity) resulted in this:

tcpdump -vv tcp -i lo -n
tcpdump: listening on lo, link-type EN10MB (Ethernet), snapshot length 262144 bytes
15:28:46.111503 IP (tos 0x0, ttl 64, id 5429, offset 0, flags [DF], proto TCP (6), length 60)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [S], cksum 0xfe30 (incorrect -> 0x0b0f), seq 1769816328, win 65495, options [mss 65495,sackOK,TS val 1780041397 ecr 0,nop,wscale 7], length 0
15:28:46.111509 IP (tos 0x0, ttl 64, id 0, offset 0, flags [DF], proto TCP (6), length 60)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [S.], cksum 0xfe30 (incorrect -> 0x8c03), seq 2041795716, ack 1769816329, win 65483, options [mss 65495,sackOK,TS val 1780041397 ecr 1780041397,nop,wscale 7], length 0
15:28:46.111514 IP (tos 0x0, ttl 64, id 5430, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0xb2bf), seq 1, ack 1, win 512, options [nop,nop,TS val 1780041397 ecr 1780041397], length 0
15:28:46.111535 IP (tos 0x0, ttl 64, id 5431, offset 0, flags [DF], proto TCP (6), length 201)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [P.], cksum 0xfebd (incorrect -> 0x33f6), seq 1:150, ack 1, win 512, options [nop,nop,TS val 1780041397 ecr 1780041397], length 149
15:28:46.111537 IP (tos 0x0, ttl 64, id 17233, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0xb22b), seq 1, ack 150, win 511, options [nop,nop,TS val 1780041397 ecr 1780041397], length 0
15:28:47.151984 IP (tos 0x0, ttl 64, id 5432, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0xae1b), seq 149, ack 1, win 512, options [nop,nop,TS val 1780042437 ecr 1780041397], length 0
15:28:47.152031 IP (tos 0x0, ttl 64, id 17234, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0xae1a), seq 1, ack 150, win 512, options [nop,nop,TS val 1780042437 ecr 1780041397], length 0
15:28:50.159130 IP (tos 0x0, ttl 64, id 5433, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x9e4c), seq 149, ack 1, win 512, options [nop,nop,TS val 1780045444 ecr 1780042437], length 0
15:28:50.159150 IP (tos 0x0, ttl 64, id 17235, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0xa25b), seq 1, ack 150, win 512, options [nop,nop,TS val 1780045444 ecr 1780041397], length 0
15:28:53.167721 IP (tos 0x0, ttl 64, id 5434, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x86cc), seq 149, ack 1, win 512, options [nop,nop,TS val 1780048453 ecr 1780045444], length 0
15:28:53.167749 IP (tos 0x0, ttl 64, id 17236, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0x969a), seq 1, ack 150, win 512, options [nop,nop,TS val 1780048453 ecr 1780041397], length 0
15:28:56.175674 IP (tos 0x0, ttl 64, id 5435, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x6f4b), seq 149, ack 1, win 512, options [nop,nop,TS val 1780051461 ecr 1780048453], length 0
15:28:56.175713 IP (tos 0x0, ttl 64, id 17237, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0x8ada), seq 1, ack 150, win 512, options [nop,nop,TS val 1780051461 ecr 1780041397], length 0
15:28:59.181101 IP (tos 0x0, ttl 64, id 5436, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x57ce), seq 149, ack 1, win 512, options [nop,nop,TS val 1780054466 ecr 1780051461], length 0
15:28:59.181160 IP (tos 0x0, ttl 64, id 17238, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0x7f1d), seq 1, ack 150, win 512, options [nop,nop,TS val 1780054466 ecr 1780041397], length 0
15:29:02.193450 IP (tos 0x0, ttl 64, id 5437, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x404c), seq 149, ack 1, win 512, options [nop,nop,TS val 1780057479 ecr 1780054466], length 0
15:29:02.193484 IP (tos 0x0, ttl 64, id 17239, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0x7358), seq 1, ack 150, win 512, options [nop,nop,TS val 1780057479 ecr 1780041397], length 0
15:29:05.197792 IP (tos 0x0, ttl 64, id 5438, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x28cb), seq 149, ack 1, win 512, options [nop,nop,TS val 1780060483 ecr 1780057479], length 0
15:29:05.197797 IP (tos 0x0, ttl 64, id 17240, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0x679c), seq 1, ack 150, win 512, options [nop,nop,TS val 1780060483 ecr 1780041397], length 0
15:29:08.209579 IP (tos 0x0, ttl 64, id 5439, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x114b), seq 149, ack 1, win 512, options [nop,nop,TS val 1780063495 ecr 1780060483], length 0
15:29:08.209608 IP (tos 0x0, ttl 64, id 17241, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0x5bd8), seq 1, ack 150, win 512, options [nop,nop,TS val 1780063495 ecr 1780041397], length 0
15:29:11.215237 IP (tos 0x0, ttl 64, id 5440, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0xf9c7), seq 149, ack 1, win 512, options [nop,nop,TS val 1780066502 ecr 1780063495], length 0
15:29:11.215267 IP (tos 0x0, ttl 64, id 17242, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0x5018), seq 1, ack 150, win 512, options [nop,nop,TS val 1780066503 ecr 1780041397], length 0
15:29:13.836305 IP (tos 0x0, ttl 64, id 17243, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [F.], cksum 0xfe28 (incorrect -> 0x45da), seq 1, ack 150, win 512, options [nop,nop,TS val 1780069124 ecr 1780041397], length 0
15:29:13.836498 IP (tos 0x0, ttl 64, id 5441, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57584 > 127.0.0.1.2222: Flags [F.], cksum 0xfe28 (incorrect -> 0xd989), seq 150, ack 2, win 512, options [nop,nop,TS val 1780069124 ecr 1780069124], length 0
15:29:13.836517 IP (tos 0x0, ttl 64, id 17244, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57584: Flags [.], cksum 0xfe28 (incorrect -> 0xd989), seq 2, ack 151, win 512, options [nop,nop,TS val 1780069124 ecr 1780069124], length 0

(continues every second while the long running HTTP request runs)

Where setting tcp_keep_alive=False resulted in this (no regular keepalive messages - the initial exchange was the only traffic, it did not run continuously while the requests runs:

15:29:52.543116 IP (tos 0x0, ttl 64, id 6929, offset 0, flags [DF], proto TCP (6), length 60)
    127.0.0.1.57900 > 127.0.0.1.2222: Flags [S], cksum 0xfe30 (incorrect -> 0xe043), seq 2766482349, win 65495, options [mss 65495,sackOK,TS val 1780107830 ecr 0,nop,wscale 7], length 0
15:29:52.543126 IP (tos 0x0, ttl 64, id 0, offset 0, flags [DF], proto TCP (6), length 60)
    127.0.0.1.2222 > 127.0.0.1.57900: Flags [S.], cksum 0xfe30 (incorrect -> 0x6a9d), seq 3903183018, ack 2766482350, win 65483, options [mss 65495,sackOK,TS val 1780107830 ecr 1780107830,nop,wscale 7], length 0
15:29:52.543133 IP (tos 0x0, ttl 64, id 6930, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57900 > 127.0.0.1.2222: Flags [.], cksum 0xfe28 (incorrect -> 0x9159), seq 1, ack 1, win 512, options [nop,nop,TS val 1780107830 ecr 1780107830], length 0
15:29:52.543161 IP (tos 0x0, ttl 64, id 6931, offset 0, flags [DF], proto TCP (6), length 201)
    127.0.0.1.57900 > 127.0.0.1.2222: Flags [P.], cksum 0xfebd (incorrect -> 0x1290), seq 1:150, ack 1, win 512, options [nop,nop,TS val 1780107830 ecr 1780107830], length 149
15:29:52.543164 IP (tos 0x0, ttl 64, id 2663, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57900: Flags [.], cksum 0xfe28 (incorrect -> 0x90c5), seq 1, ack 150, win 511, options [nop,nop,TS val 1780107830 ecr 1780107830], length 0
z15:30:33.983987 IP (tos 0x0, ttl 64, id 2664, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57900: Flags [F.], cksum 0xfe28 (incorrect -> 0xeee0), seq 1, ack 150, win 512, options [nop,nop,TS val 1780149272 ecr 1780107830], length 0
15:30:33.984321 IP (tos 0x0, ttl 64, id 6932, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.57900 > 127.0.0.1.2222: Flags [F.], cksum 0xfe28 (incorrect -> 0x4cfd), seq 150, ack 2, win 512, options [nop,nop,TS val 1780149272 ecr 1780149272], length 0
15:30:33.984356 IP (tos 0x0, ttl 64, id 2665, offset 0, flags [DF], proto TCP (6), length 52)
    127.0.0.1.2222 > 127.0.0.1.57900: Flags [.], cksum 0xfe28 (incorrect -> 0x4cfd), seq 2, ack 151, win 512, options [nop,nop,TS val 1780149272 ecr 1780149272], length 0

Copy link
Member

@jedcunningham jedcunningham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should add some testing to make sure the wiring is correct down to session.mount?

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2022

Probably should add some testing to make sure the wiring is correct down to session.mount?

Yeah @jedcunningham I wanted to do it but I found it difficult to test.

This is very low level and it would actually require us to run HTTP server in process. The problem is that "requests" mock does a not-nice trick of replacing whatever adapters you have with "Mock Adapter" so that you can test the "logic" of requests parameters. The TCPKeepAliveAdapter is rather low-level one. I could likely do some HTTP server and unit testing with it, but I am not sure if this is really worth it?

WDYT ?

@jedcunningham
Copy link
Member

I was more thinking just mocking the session and making sure operator kwargs end up getting all the way there through the hook. If that doesn't work out, I don't think it's worth going further.

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2022

I was more thinking just mocking the session and making sure operator kwargs end up getting all the way there through the hook. If that doesn't work out, I don't think it's worth going further.

I found a better way

@potiuk potiuk force-pushed the add-keepalive-to-http-operator branch from c2ca107 to 4ef7f22 Compare July 11, 2022 19:07
@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2022

I was more thinking just mocking the session and making sure operator kwargs end up getting all the way there through the hook. If that doesn't work out, I don't think it's worth going further.

OK. I think I got good balance between having tests and not complicating it too much.

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2022

BTW. The user confirmed that lack of TCP_KEEPALIVE is what's causing SimpleHTTPRequest hangs on long requests: #21365

Copy link
Contributor

@josh-fell josh-fell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll defer to Jed on the testing side. Pretty cool to see that Discussion come to a close. It's been hanging around for a long time. Thanks @potuik!

It might also be cool to have this keep-alive config be part of an HTTP connection too.

@potiuk
Copy link
Member Author

potiuk commented Jul 11, 2022

It might also be cool to have this keep-alive config be part of an HTTP connection too.

There is a lot (IMHO too much) of extra args handling in HTTP connection. We can attempt it as next step :)

@josh-fell
Copy link
Contributor

It might also be cool to have this keep-alive config be part of an HTTP connection too.

There is a lot (IMHO too much) of extra args handling in HTTP connection. We can attempt it as next step :)

Oh absolutely. I wasn't saying it needed to be done here and now 🙂

@potiuk potiuk force-pushed the add-keepalive-to-http-operator branch 2 times, most recently from 385947c to e164679 Compare July 11, 2022 20:42
Enabling TCP_KEEPALIVE allows to keep the idle connections opened
when there are firewalls in-betweeen that close such connections.

TCP_KEEPALIVE sends a "no-data" packet regularly (and expects
ACK from the server). This should not be mistaken with Python
requests "keep alive" feature - which reuses opened HTTP connections
if you perform several requests to the same server (both are named
keep alive but they both mean two completely different things and
are implemented at differen layers of the OSI network stack. The
Requests Keep Alive is Layer 7 (application) and TCP Keep Alive
is at Layer 4 (transport).

The "Requests" keep alive is enabled by default in the requests
library while the TCP Keep Alive requires a TCPKeepAliveAdapter
to be used (from requests_toolbelt library).

Fixes: apache#21365
@potiuk potiuk force-pushed the add-keepalive-to-http-operator branch from e164679 to f0af3d4 Compare July 12, 2022 00:32
@potiuk potiuk merged commit 8c4120c into apache:main Jul 12, 2022
@potiuk potiuk deleted the add-keepalive-to-http-operator branch July 12, 2022 06:38
@@ -91,15 +100,26 @@ def __init__(
self.headers = headers or {}
self.extra_options = extra_options or {}
self.response_check = response_check

self.hook = HttpHook(method=method, http_conn_id=http_conn_id)
Copy link
Member

@pankajkoti pankajkoti Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk Is there a specific reason we would like to remove this assignment to the instance and are only creating the hook at a later stage in the poke method excluding it as an instance variable?
The async operator relies on this instance variable: https://github.com/astronomer/astronomer-providers/blob/main/astronomer/providers/http/sensors/http.py#L67 and the RC is failing for the current async implementation.

Should we create a new hook instance in the derived class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the error we get in our DAG run with the RC:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/astronomer/providers/http/sensors/http.py", line 67, in execute
    method=self.hook.method,  # TODO: Fix this to directly get method from ctor
AttributeError: 'HttpSensorAsync' object has no attribute 'hook'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a PR to reinstantiate the hook in the derived class: astronomer/astronomer-providers#515

Copy link
Member Author

@potiuk potiuk Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well. I have not thought too much about this use case when I added it - it was just bringing a general approach that we usually do where Hooks are created in execute() method rather than in constructor.

But if there are good reasons why it should not be moved I can remove that one from the batch and make RC2 and restore the self.hook.

I think with async operators approach we miss a bit of best practices:

  • should we create Hook in init()?
  • should we have a Hook as Operator attribute (self.hook)?
  • or should we restore the hook every time when we come back from deferred state?

I think the last option is most "reasonable" because first two might make (an incorrect) assumption that the same hook object is available during the whole lifecycle of all operator's methods, and if Hook is not "stateless", it might break when operator is deferred (because the Hook will be recreated when operator resumes from deferred state). If you keep the hook as self. property, you might (unconciously) rely on the fact that the hook is the same object in various methods of deferred operator.

WDYT? Does it sound like a good practice ? Also others @dstandish maybe? I think we had some discussion about this in the past (can't recall exactly)

Copy link
Member

@pankajkoti pankajkoti Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. In that case, I think we can create a clean up story on our end to adhere to the best practices post our conclusion on this discussion. We do not need an RC2 for that then, and also for the sake that the operator still continues to work, I will for the time being, put the hack to create the hook in our operator init() with our PR.

Also, looping in @kaxil @bharanidharan14 @rajaths010494 @pankajastro for the discussion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me. Would be great to contribute such "best practicess for writing Defferable Operators" to the community after all the experiences Astronomer had with writing theirs. It would be a great contribution :).

Later on we could event try to automate some of those "best practices" in pre-commits for any future community async operators. This sounds like a doable task to ast-parse an operatort and if it is a Defferrable one, make sure no self.hook is used.

@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Aug 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants