Skip to content

Commit

Permalink
Pack per frame metadata into one message
Browse files Browse the repository at this point in the history
To send fewer and larger messages, pack both which frames are on device
and how large each frame is into one message.
  • Loading branch information
jakirkham committed Apr 21, 2020
1 parent 7b3cecd commit 98d82dd
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ async def write(

# Send meta data
await self.ep.send(struct.pack("Q", nframes))
await self.ep.send(struct.pack(nframes * "?", *cuda_frames))
await self.ep.send(struct.pack(nframes * "Q", *sizes))
await self.ep.send(
struct.pack(nframes * "?" + nframes * "Q", *cuda_frames, *sizes)
)

# Send frames

Expand Down Expand Up @@ -226,15 +227,11 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
await self.ep.recv(nframes)
(nframes,) = struct.unpack(nframes_fmt, nframes)

cuda_frames_fmt = nframes * "?"
cuda_frames = host_array(struct.calcsize(cuda_frames_fmt))
await self.ep.recv(cuda_frames)
cuda_frames = struct.unpack(cuda_frames_fmt, cuda_frames)

sizes_fmt = nframes * "Q"
sizes = host_array(struct.calcsize(sizes_fmt))
await self.ep.recv(sizes)
sizes = struct.unpack(sizes_fmt, sizes)
header_fmt = nframes * "?" + nframes * "Q"
header = host_array(struct.calcsize(header_fmt))
await self.ep.recv(header)
header = struct.unpack(header_fmt, header)
cuda_frames, sizes = header[:nframes], header[nframes:]
except (ucp.exceptions.UCXBaseException, CancelledError):
self.abort()
raise CommClosedError("While reading, the connection was closed")
Expand Down

0 comments on commit 98d82dd

Please sign in to comment.