Skip to content

Commit

Permalink
libflux: support setting prep watcher priority
Browse files Browse the repository at this point in the history
Problem: Watcher priorities are only configurable in check
watchers, but would be useful in prepare watchers as well.

Add support for prepare watcher priorities.  Add unit tests.
  • Loading branch information
chu11 committed Oct 11, 2024
1 parent 50f8dc2 commit d41f7cc
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 17 deletions.
7 changes: 7 additions & 0 deletions src/common/libflux/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,12 @@ double flux_watcher_next_wakeup (flux_watcher_t *w)

/* Prepare
*/

static void prepare_set_priority (flux_watcher_t *w, int priority)
{
ev_set_priority ((ev_prepare *)w->data, priority);
}

static void prepare_start (flux_watcher_t *w)
{
ev_prepare_start (w->r->loop, (ev_prepare *)w->data);
Expand All @@ -543,6 +549,7 @@ static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents)
}

static struct flux_watcher_ops prepare_watcher = {
.set_priority = prepare_set_priority,
.start = prepare_start,
.stop = prepare_stop,
.destroy = NULL,
Expand Down
93 changes: 76 additions & 17 deletions src/common/libflux/test/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,21 +636,73 @@ static void test_reactor_flags (flux_reactor_t *r)

static char cblist[6] = {0};
static int cblist_index = 0;

static void priority_prepare_watcher_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
char *s = arg;
/* stick the char name of this watcher into the array, we'll
* compare later
*/
cblist[cblist_index++] = s[0];
flux_watcher_stop (w);
}

static void test_priority_prepare_watcher (flux_reactor_t *r)
{
flux_watcher_t *a, *b, *c, *d, *e;

memset (cblist, '\0', sizeof (cblist));
cblist_index = 0;

a = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "A");
b = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "B");
c = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "C");
d = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "D");
e = flux_prepare_watcher_create (r, priority_prepare_watcher_prep_cb, "E");
ok (a != NULL && b != NULL && c != NULL && d != NULL && e != NULL,
"prepare watcher create worked");
// Don't set priority of 'a', it'll be default
flux_watcher_set_priority (b, 1);
flux_watcher_set_priority (c, -2);
flux_watcher_set_priority (d, -1);
flux_watcher_set_priority (e, 2);
flux_watcher_start (a);
flux_watcher_start (b);
flux_watcher_start (c);
flux_watcher_start (d);
flux_watcher_start (e);
ok (flux_reactor_run (r, 0) == 0,
"reactor ran to completion");
/* given priorities, callbacks should be called in the following order
* EBADC
*/
ok (memcmp (cblist, "EBADC", 5) == 0,
"prepare callbacks called in the correct order");
flux_watcher_destroy (a);
flux_watcher_destroy (b);
flux_watcher_destroy (c);
flux_watcher_destroy (d);
flux_watcher_destroy (e);
}

static flux_watcher_t *priority_prep = NULL;
static flux_watcher_t *priority_idle = NULL;

static void priority_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
static void priority_check_watcher_prep_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
flux_watcher_start (priority_idle);
}

static void priority_check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
static void priority_check_watcher_check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
char *s = arg;
/* stick the char name of this watcher into the array, we'll
Expand All @@ -664,20 +716,26 @@ static void priority_check_cb (flux_reactor_t *r,
flux_watcher_stop (w);
}

static void test_priority (flux_reactor_t *r)
static void test_priority_check_watcher (flux_reactor_t *r)
{
flux_watcher_t *a, *b, *c, *d, *e;
priority_prep = flux_prepare_watcher_create (r, priority_prep_cb, NULL);

memset (cblist, '\0', sizeof (cblist));
cblist_index = 0;

priority_prep = flux_prepare_watcher_create (r,
priority_check_watcher_prep_cb,
NULL);
ok (priority_prep != NULL,
"prep watcher create worked");
priority_idle = flux_idle_watcher_create (r, NULL, NULL);
ok (priority_idle != NULL,
"idle watcher create worked");
a = flux_check_watcher_create (r, priority_check_cb, "A");
b = flux_check_watcher_create (r, priority_check_cb, "B");
c = flux_check_watcher_create (r, priority_check_cb, "C");
d = flux_check_watcher_create (r, priority_check_cb, "D");
e = flux_check_watcher_create (r, priority_check_cb, "E");
a = flux_check_watcher_create (r, priority_check_watcher_check_cb, "A");
b = flux_check_watcher_create (r, priority_check_watcher_check_cb, "B");
c = flux_check_watcher_create (r, priority_check_watcher_check_cb, "C");
d = flux_check_watcher_create (r, priority_check_watcher_check_cb, "D");
e = flux_check_watcher_create (r, priority_check_watcher_check_cb, "E");
ok (a != NULL && b != NULL && c != NULL && d != NULL && e != NULL,
"check watcher create worked");
// Don't set priority of 'a', it'll be default
Expand All @@ -697,7 +755,7 @@ static void test_priority (flux_reactor_t *r)
* DCAEB
*/
ok (memcmp (cblist, "DCAEB", 5) == 0,
"callbacks called in the correct order");
"check callbacks called in the correct order");
flux_watcher_destroy (a);
flux_watcher_destroy (b);
flux_watcher_destroy (c);
Expand Down Expand Up @@ -731,7 +789,8 @@ int main (int argc, char *argv[])
test_stat (reactor);
test_active_ref (reactor);
test_reactor_flags (reactor);
test_priority (reactor);
test_priority_prepare_watcher (reactor);
test_priority_check_watcher (reactor);

flux_reactor_destroy (reactor);

Expand Down

0 comments on commit d41f7cc

Please sign in to comment.