Skip to content

Commit

Permalink
Fix: src.ctf.lttng-live: emitting stream end msg with no stream
Browse files Browse the repository at this point in the history
Background
==========
When a stream hangs up on the `src.ctf.lttng-live` component, we make
sure we send a stream end message to ensure we honor The Contract which
states that any stream beginning must eventually be followed by its
stream end counterpart. We do this by calling
`ctf_msg_iter_get_next_message()` one last time to emit any missing
messages.

Using the upcoming lttng clear feature in conjunction with a per-pid
session makes it highly likely that a live stream hangs up on the
`src.ctf.lttng-live` component between the moment we learn about it and
the moment we first ask for its live index.

In such event, the live stream iterator and its `ctf_msg_it` are both
created but the corresponding stream is uninitialized.

When the component realized that a live stream has hung up, it calls
`ctf_msg_iter_get_next_message()` to respect The Contract but then
errors out here:
  CAUSED BY [lttng-live: 'source.ctf.lttng-live'] (msg-iter.c:2474)
    Cannot create stream end message because stream is NULL:
    msg-it-addr=0x555fba864010

The `stream` field is null because we never got the chance to received
any index for this stream.

Issue
=====
It's possible for a `ctf_msg` state machine to pass by the
`STATE_EMIT_MSG_STREAM_END` state without having passed by the
`STATE_EMIT_MSG_STREAM_BEGINNING` state.

Solution
========
Keep track of the fact that we sent a stream beginning message
downstream and that we need to send its respective stream end message.
If no message were send for a particular stream, we can omit sending a
stream end message.

Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
Change-Id: If7f52f43162e7263785713c01c226907fe475d94
Reviewed-on: https://review.lttng.org/c/babeltrace/+/2719
CI-Build: Simon Marchi <simon.marchi@efficios.com>
Tested-by: jenkins <jenkins@lttng.org>
Reviewed-by: Simon Marchi <simon.marchi@efficios.com>
  • Loading branch information
frdeso authored and jgalar committed Jan 20, 2020
1 parent 62988c5 commit 901f0ce
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
35 changes: 33 additions & 2 deletions src/plugins/ctf/common/msg-iter/msg-iter.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum state {
STATE_EMIT_MSG_PACKET_END_MULTI,
STATE_EMIT_MSG_PACKET_END_SINGLE,
STATE_EMIT_QUEUED_MSG_PACKET_END,
STATE_CHECK_EMIT_MSG_STREAM_END,
STATE_EMIT_MSG_STREAM_END,
STATE_DONE,
};
Expand Down Expand Up @@ -181,6 +182,14 @@ struct ctf_msg_iter {
*/
bool emit_stream_beginning_message;

/*
* True if we need to emit a stream end message at the end of the
* current stream. A live stream may never receive any data and thus
* never send a stream beginning message which removes the need to emit
* a stream end message.
*/
bool emit_stream_end_message;

/* Database of current dynamic scopes */
struct {
bt_field *stream_packet_context;
Expand Down Expand Up @@ -318,6 +327,8 @@ const char *state_string(enum state state)
return "EMIT_MSG_PACKET_END_SINGLE";
case STATE_EMIT_QUEUED_MSG_PACKET_END:
return "EMIT_QUEUED_MSG_PACKET_END";
case STATE_CHECK_EMIT_MSG_STREAM_END:
return "CHECK_EMIT_MSG_STREAM_END";
case STATE_EMIT_MSG_STREAM_END:
return "EMIT_MSG_STREAM_END";
case STATE_DONE:
Expand Down Expand Up @@ -725,7 +736,7 @@ enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it)
medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data);
if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) {
/* No more packets. */
msg_it->state = STATE_EMIT_MSG_STREAM_END;
msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
status = CTF_MSG_ITER_STATUS_OK;
goto end;
} else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) {
Expand Down Expand Up @@ -803,7 +814,7 @@ enum ctf_msg_iter_status read_packet_header_begin_state(
break;
case CTF_MSG_ITER_STATUS_EOF:
status = CTF_MSG_ITER_STATUS_OK;
msg_it->state = STATE_EMIT_MSG_STREAM_END;
msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END;
goto end;
default:
goto end;
Expand Down Expand Up @@ -1657,6 +1668,20 @@ enum ctf_msg_iter_status check_emit_msg_discarded_packets(
return CTF_MSG_ITER_STATUS_OK;
}

static inline
enum state check_emit_msg_stream_end(struct ctf_msg_iter *msg_it)
{
enum state next_state;

if (msg_it->emit_stream_end_message) {
next_state = STATE_EMIT_MSG_STREAM_END;
} else {
next_state = STATE_DONE;
}

return next_state;
}

static inline
enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
{
Expand Down Expand Up @@ -1755,6 +1780,9 @@ enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it)
case STATE_EMIT_QUEUED_MSG_PACKET_END:
msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
break;
case STATE_CHECK_EMIT_MSG_STREAM_END:
msg_it->state = check_emit_msg_stream_end(msg_it);
break;
case STATE_EMIT_MSG_STREAM_END:
msg_it->state = STATE_DONE;
break;
Expand Down Expand Up @@ -1817,6 +1845,7 @@ void ctf_msg_iter_reset(struct ctf_msg_iter *msg_it)
msg_it->prev_packet_snapshots.beginning_clock = UINT64_C(-1);
msg_it->prev_packet_snapshots.end_clock = UINT64_C(-1);
msg_it->emit_stream_beginning_message = true;
msg_it->emit_stream_end_message = false;
}

static
Expand Down Expand Up @@ -2947,6 +2976,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message(
/* create_msg_stream_beginning() logs errors */
*message = create_msg_stream_beginning(msg_it);
msg_it->emit_stream_beginning_message = false;
msg_it->emit_stream_end_message = true;

if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
Expand All @@ -2956,6 +2986,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message(
case STATE_EMIT_MSG_STREAM_END:
/* create_msg_stream_end() logs errors */
*message = create_msg_stream_end(msg_it);
msg_it->emit_stream_end_message = false;

if (!*message) {
status = CTF_MSG_ITER_STATUS_ERROR;
Expand Down
4 changes: 4 additions & 0 deletions src/plugins/ctf/lttng-live/lttng-live.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,10 @@ enum lttng_live_iterator_status lttng_live_iterator_close_stream(
"Error getting the next message from CTF message iterator");
live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
goto end;
} else if (status == CTF_MSG_ITER_STATUS_EOF) {
BT_COMP_LOGI("Reached the end of the live stream iterator.");
live_status = LTTNG_LIVE_ITERATOR_STATUS_END;
goto end;
}

BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);
Expand Down

0 comments on commit 901f0ce

Please sign in to comment.