Skip to content

Commit

Permalink
libsubprocess: cover subprocess on_credit
Browse files Browse the repository at this point in the history
Problem: There is no coverage for the new libsubprocess on_credit
callback.

Add unit tests in libsubprocess/test/stdio.c and libsubprocess/test/iostress.c.
  • Loading branch information
chu11 committed Oct 7, 2024
1 parent 7372eed commit d29b711
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 1 deletion.
91 changes: 90 additions & 1 deletion src/common/libsubprocess/test/iostress.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
enum {
WRITE_API,
WRITE_DIRECT,
WRITE_CREDIT,
};

struct iostress_ctx {
Expand All @@ -40,12 +41,15 @@ struct iostress_ctx {
pid_t pid;
size_t linesize;
char *buf;
int buf_index;
int linerecv;
int batchcount;
int batchlines;
int batchcursor;
int batchlinescursor;
int outputcount;
int write_type;
int stdin_credits;
const char *name;
};

Expand Down Expand Up @@ -105,7 +109,12 @@ static void iostress_state_cb (flux_subprocess_t *p,
case FLUX_SUBPROCESS_INIT:
case FLUX_SUBPROCESS_RUNNING:
ctx->pid = flux_subprocess_pid (p);
flux_watcher_start (ctx->source); // start sourcing data
/* if credit based write, writing will be taken care of in
* iostress_credit_cb()
*/
if (ctx->write_type == WRITE_API
|| ctx->write_type == WRITE_DIRECT)
flux_watcher_start (ctx->source); // start sourcing data
break;
case FLUX_SUBPROCESS_STOPPED:
break;
Expand Down Expand Up @@ -197,10 +206,67 @@ static void iostress_source_cb (flux_reactor_t *r,
iostress_start_doomsday (ctx, 2.);
}

static void iostress_credit_cb (flux_subprocess_t *p,
const char *stream,
int bytes)
{
struct iostress_ctx *ctx = flux_subprocess_aux_get (p, "ctx");

if (ctx->write_type != WRITE_CREDIT)
return;

// diag ("%s credit cb stream=%s bytes=%d", ctx->name, stream, bytes);

ctx->stdin_credits += bytes;

while (ctx->batchcursor < ctx->batchcount) {
while (ctx->batchlinescursor < ctx->batchlines) {
int wlen, len;

if ((ctx->linesize - ctx->buf_index) < ctx->stdin_credits)
wlen = ctx->linesize - ctx->buf_index;
else
wlen = ctx->stdin_credits;
len = flux_subprocess_write (ctx->p,
"stdin",
&ctx->buf[ctx->buf_index],
wlen);
if (len < 0) {
diag ("%s: source: %s", ctx->name, strerror (errno));
goto error;
}
ctx->stdin_credits -= len;
ctx->buf_index += len;
if (ctx->buf_index == ctx->linesize) {
ctx->buf_index = 0;
ctx->batchlinescursor++;
}
if (ctx->stdin_credits == 0)
break;
}
if (ctx->batchlinescursor == ctx->batchlines) {
ctx->batchlinescursor = 0;
if (++ctx->batchcursor == ctx->batchcount) {
if (flux_subprocess_close (ctx->p, "stdin") < 0) {
diag ("%s: source: %s", ctx->name, strerror (errno));
goto error;
}
break;
}
}
if (ctx->stdin_credits == 0)
break;
}
return;
error:
iostress_start_doomsday (ctx, 2.);
}

flux_subprocess_ops_t iostress_ops = {
.on_completion = iostress_completion_cb,
.on_state_change = iostress_state_cb,
.on_stdout = iostress_output_cb,
.on_credit = iostress_credit_cb,
};

bool iostress_run_check (flux_t *h,
Expand All @@ -226,6 +292,7 @@ bool iostress_run_check (flux_t *h,
ctx.linesize = linesize;
ctx.name = name;
ctx.write_type = write_type;
ctx.stdin_credits = 0;

if (!(ctx.buf = malloc (ctx.linesize)))
BAIL_OUT ("out of memory");
Expand Down Expand Up @@ -344,6 +411,28 @@ int main (int argc, char *argv[])
4096),
"tinystdin-direct failed as expected");

// remote stdin buffer managed via credits should work
ok (iostress_run_check (h,
"tinystdin-credit",
WRITE_CREDIT,
128,
0,
1,
1,
4096),
"tinystdin-direct failed as expected");

// credits w/ more data
ok (iostress_run_check (h,
"tinystdin-credit2",
WRITE_CREDIT,
128,
0,
8,
8,
4096),
"tinystdin-direct failed as expected");

test_server_stop (h);
flux_close (h);
done_testing ();
Expand Down
120 changes: 120 additions & 0 deletions src/common/libsubprocess/test/stdio.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ int multiple_lines_stderr_output_cb_count;
int stdin_closed_stdout_cb_count;
int stdin_closed_stderr_cb_count;
int timer_cb_count;
int credit_cb_count;
int stdin_credit;
char inputbuf[1024];
int inputbuf_index;
int inputbuf_len;
char outputbuf[1024];
int outputbuf_len;

Expand Down Expand Up @@ -1414,6 +1419,119 @@ void test_long_line (flux_reactor_t *r)
flux_cmd_destroy (cmd);
}

void credit_output_cb (flux_subprocess_t *p, const char *stream)
{
const char *buf = NULL;
int len;

if (strcasecmp (stream, "stdout")) {
ok (false, "unexpected stream %s", stream);
return;
}

len = flux_subprocess_read (p, stream, &buf);
ok (len >= 0,
"flux_subprocess_read on %s success", stream);

if (len > 0) {
memcpy (outputbuf + outputbuf_len, buf, len);
outputbuf_len += len;
}
else {
char cmpbuf[1024];

ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

sprintf (cmpbuf, "abcdefghijklmnopqrstuvwxyz0123456789\n");
ok (streq (outputbuf, cmpbuf),
"flux_subprocess_read returned correct data");
/* 26 (ABCs) + 10 (1-10) + 1 for `\n' */
ok (outputbuf_len == (26 + 10 + 1),
"flux_subprocess_read returned correct amount of data");
}
stdout_output_cb_count++;
}

void credit_cb (flux_subprocess_t *p, const char *stream, int bytes)
{
int *credits = flux_subprocess_aux_get (p, "credits");
int len;
int ret;

assert (credits);

diag ("on_credit: credit of %d bytes", bytes);

(*credits) += bytes;

if ((inputbuf_len - inputbuf_index) > 0) {
if ((inputbuf_len - inputbuf_index) > (*credits))
len = (*credits);
else
len = (inputbuf_len - inputbuf_index);

ret = flux_subprocess_write (p, "stdin", &inputbuf[inputbuf_index], len);
ok (ret == len,
"flux_subprocess_write success");

(*credits) -= ret;
inputbuf_index += ret;
}
else {
ok (flux_subprocess_close (p, "stdin") == 0,
"flux_subprocess_close success");
}
credit_cb_count++;
}

void test_on_credit (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", NULL };
flux_cmd_t *cmd;
flux_subprocess_t *p = NULL;
int credits = 0;
int ret;

ok ((cmd = flux_cmd_create (2, av, environ)) != NULL, "flux_cmd_create");
ok (flux_cmd_setopt (cmd, "stdin_BUFSIZE", "8") == 0,
"set stdin buffer size to 1024 bytes");

flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_stdout = credit_output_cb,
.on_credit = credit_cb
};
completion_cb_count = 0;
stdout_output_cb_count = 0;
credit_cb_count = 0;
sprintf (inputbuf, "abcdefghijklmnopqrstuvwxyz0123456789");
inputbuf_index = 0;
inputbuf_len = (26 + 10);
memset (outputbuf, '\0', sizeof (outputbuf));
outputbuf_len = 0;
p = flux_local_exec (r, 0, cmd, &ops);
ok (p != NULL, "flux_local_exec");
ret = flux_subprocess_aux_set (p, "credits", &credits, NULL);
ok (ret == 0, "flux_subprocess_aux_set works");

ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING,
"subprocess state == RUNNING after flux_local_exec");

errno = 0;
ret = flux_subprocess_write (p, "stdin", &inputbuf[inputbuf_index], 10);
ok (ret < 0 && errno == ENOSPC,
"flux_subprocess_write fails with too much data");

int rc = flux_reactor_run (r, 0);
ok (rc == 0, "flux_reactor_run returned zero status");
ok (completion_cb_count == 1, "completion callback called 1 time");
ok (stdout_output_cb_count >= 2, "stdout output callback called >= 2 times");
ok (credit_cb_count == 6, "credit callback called 6 times");
flux_subprocess_destroy (p);
flux_cmd_destroy (cmd);
}

int main (int argc, char *argv[])
{
flux_reactor_t *r;
Expand Down Expand Up @@ -1475,6 +1593,8 @@ int main (int argc, char *argv[])
test_stream_start_stop_mid_stop (r);
diag ("long_line");
test_long_line (r);
diag ("on_credit");
test_on_credit (r);

end_fdcount = fdcount ();

Expand Down

0 comments on commit d29b711

Please sign in to comment.