Skip to content

Commit

Permalink
broker/overlay: add RPC tracking for TBON children
Browse files Browse the repository at this point in the history
Problem: when TBON children become lost, any pending RPCs
passing through them may go unanswered, leading to hangs
in other parts of the system.

Track pending RPCs for each TBON child.  When a child's
state transitions from an online state to offline/lost,
responses are generated for these RPCs.

RPCs are considered terminated when the RPC request has:
- the NORESPONSE flag is set
- the STREAMING flag is set, and a matching error response is received
- neither flag set, and any matching response is received
- the same sending UUID as a disconnect request

Note: this ony affects RPCs where the next hop is in the
downstream/leaves direction.  Each broker along the path
of a multi-hop RPC tracks RPCs routed to its downstream peer,
but only the broker whose downstream peer transitions to
lost or offline sends an error response.

This PR does not address loss of the parent.

Fixes flux-framework#3800
  • Loading branch information
garlick committed Aug 15, 2021
1 parent 620a43d commit 657570d
Showing 1 changed file with 37 additions and 3 deletions.
40 changes: 37 additions & 3 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "src/common/libutil/fsd.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libutil/monotime.h"
#include "src/common/librouter/rpc_track.h"

#include "overlay.h"
#include "attr.h"
Expand Down Expand Up @@ -75,6 +76,7 @@ struct child {
enum subtree_status status;
struct timespec status_timestamp;
bool idle;
struct rpc_track *tracker;
};

struct parent {
Expand Down Expand Up @@ -291,6 +293,9 @@ int overlay_set_geometry (struct overlay *ov, uint32_t size, uint32_t rank)
child->rank = kary_childof (ov->fanout, size, rank, i);
child->status = SUBTREE_STATUS_OFFLINE;
monotime (&child->status_timestamp);
child->tracker = rpc_track_create (MSG_HASH_TYPE_UUID_MATCHTAG);
if (!child->tracker)
return -1;
}
ov->status = SUBTREE_STATUS_PARTIAL;
}
Expand Down Expand Up @@ -505,7 +510,7 @@ int overlay_sendmsg (struct overlay *ov,
flux_msg_t *cpy = NULL;
const char *uuid;
uint32_t nodeid;
struct child *child;
struct child *child = NULL;
int rc;

if (flux_msg_get_type (msg, &type) < 0
Expand Down Expand Up @@ -544,8 +549,17 @@ int overlay_sendmsg (struct overlay *ov,
}
if (where == OVERLAY_UPSTREAM)
rc = overlay_sendmsg_parent (ov, msg);
else
else {
rc = overlay_sendmsg_child (ov, msg);
if (rc == 0) {
if (!child) {
if ((uuid = flux_msg_route_last (msg)))
child = child_lookup (ov, ov->uuid);
}
if (child)
rpc_track_update (child->tracker, msg);
}
}
if (rc < 0)
goto error;
break;
Expand Down Expand Up @@ -615,6 +629,19 @@ const char *overlay_get_bind_uri (struct overlay *ov)
return ov->bind_uri;
}

static void fail_child_rpcs (const flux_msg_t *msg, void *arg)
{
struct overlay *ov = arg;
flux_msg_t *rep;

if (!(rep = flux_response_derive (msg, EHOSTUNREACH))
|| flux_msg_route_delete_last (rep) < 0
|| flux_msg_route_delete_last (rep) < 0
|| flux_send (ov->h, rep, 0) < 0)
flux_log_error (ov->h, "tracker: error sending EHOSTUNREACH response");
flux_msg_destroy (rep);
}

static void overlay_child_status_update (struct overlay *ov,
struct child *child,
int status)
Expand All @@ -623,6 +650,7 @@ static void overlay_child_status_update (struct overlay *ov,
if (subtree_is_online (child->status)
&& !subtree_is_online (status)) {
zhashx_delete (ov->child_hash, child->uuid);
rpc_track_purge (child->tracker, fail_child_rpcs, ov);
}
else if (!subtree_is_online (child->status)
&& subtree_is_online (status)) {
Expand Down Expand Up @@ -744,6 +772,7 @@ static void child_cb (flux_reactor_t *r, flux_watcher_t *w,
*/
(void)flux_msg_route_delete_last (msg); // child id from ROUTER
(void)flux_msg_route_delete_last (msg); // my id
rpc_track_update (child->tracker, msg);
break;
case FLUX_MSGTYPE_EVENT:
break;
Expand Down Expand Up @@ -1499,7 +1528,12 @@ void overlay_destroy (struct overlay *ov)
flux_watcher_destroy (ov->bind_w);

zhashx_destroy (&ov->child_hash);
free (ov->children);
if (ov->children) {
int i;
for (i = 0; i < ov->child_count; i++)
rpc_track_destroy (ov->children[i].tracker);
free (ov->children);
}
free (ov);
errno = saved_errno;
}
Expand Down

0 comments on commit 657570d

Please sign in to comment.