Skip to content

Commit

Permalink
Use ThreadedMotoServer instead of subprocess in spinning up s3
Browse files Browse the repository at this point in the history
…server (#10822)

This is a follow-up PR to address review comments from here: #10769 (review)

This PR:

- [x] Uses `ThreadedMotoServer` instead of using `subprocess.open` to create a new server, this way it is guaranteed to close the server upon exit.
- [x] Add's IP address fixture instead of having it hard-coded at multiple places.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Charles Blackmon-Luca (https://github.com/charlesbluca)

URL: #10822
  • Loading branch information
galipremsagar authored May 10, 2022
1 parent 4539e5e commit dc0c3cd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 51 deletions.
40 changes: 14 additions & 26 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

import os
import shlex
import socket
import subprocess
import time
from contextlib import contextmanager
from io import BytesIO

Expand All @@ -21,9 +18,15 @@

moto = pytest.importorskip("moto", minversion="3.1.6")
boto3 = pytest.importorskip("boto3")
requests = pytest.importorskip("requests")
s3fs = pytest.importorskip("s3fs")

ThreadedMotoServer = pytest.importorskip("moto.server").ThreadedMotoServer


@pytest.fixture(scope="session")
def endpoint_ip():
return "127.0.0.1"


@pytest.fixture(scope="session")
def endpoint_port():
Expand Down Expand Up @@ -51,7 +54,7 @@ def ensure_safe_environment_variables():


@pytest.fixture(scope="session")
def s3_base(endpoint_port):
def s3_base(endpoint_ip, endpoint_port):
"""
Fixture to set up moto server in separate process
"""
Expand All @@ -65,35 +68,20 @@ def s3_base(endpoint_port):
# Launching moto in server mode, i.e., as a separate process
# with an S3 endpoint on localhost

endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"

proc = subprocess.Popen(
shlex.split(f"moto_server s3 -p {endpoint_port}"),
)
endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/"

timeout = 5
while timeout > 0:
try:
# OK to go once server is accepting connections
r = requests.get(endpoint_uri)
if r.ok:
break
except Exception:
pass
timeout -= 0.1
time.sleep(0.1)
server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port)
server.start()
yield endpoint_uri

proc.terminate()
proc.wait()
server.stop()


@pytest.fixture()
def s3so(endpoint_port):
def s3so(endpoint_ip, endpoint_port):
"""
Returns s3 storage options to pass to fsspec
"""
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"
endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/"

return {"client_kwargs": {"endpoint_url": endpoint_uri}}

Expand Down
38 changes: 13 additions & 25 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

import os
import shlex
import socket
import subprocess
import time
from contextlib import contextmanager
from io import BytesIO

Expand All @@ -16,8 +13,13 @@

moto = pytest.importorskip("moto", minversion="3.1.6")
boto3 = pytest.importorskip("boto3")
requests = pytest.importorskip("requests")
s3fs = pytest.importorskip("s3fs")
ThreadedMotoServer = pytest.importorskip("moto.server").ThreadedMotoServer


@pytest.fixture(scope="session")
def endpoint_ip():
return "127.0.0.1"


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -46,7 +48,7 @@ def ensure_safe_environment_variables():


@pytest.fixture(scope="session")
def s3_base(endpoint_port):
def s3_base(endpoint_ip, endpoint_port):
"""
Fixture to set up moto server in separate process
"""
Expand All @@ -59,35 +61,21 @@ def s3_base(endpoint_port):

# Launching moto in server mode, i.e., as a separate process
# with an S3 endpoint on localhost
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"

proc = subprocess.Popen(
shlex.split(f"moto_server s3 -p {endpoint_port}"),
)
endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/"

timeout = 5
while timeout > 0:
try:
# OK to go once server is accepting connections
r = requests.get(endpoint_uri)
if r.ok:
break
except Exception:
pass
timeout -= 0.1
time.sleep(0.1)
server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port)
server.start()
yield endpoint_uri

proc.terminate()
proc.wait()
server.stop()


@pytest.fixture()
def s3so(endpoint_port):
def s3so(endpoint_ip, endpoint_port):
"""
Returns s3 storage options to pass to fsspec
"""
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"
endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/"

return {"client_kwargs": {"endpoint_url": endpoint_uri}}

Expand Down

0 comments on commit dc0c3cd

Please sign in to comment.