Skip to content

Commit

Permalink
job-manager: fix indent of multi-line pack/unpack
Browse files Browse the repository at this point in the history
Problem: some job-manager multi-line pack/unpack function calls
are broken and indented inconsistently.

Make them consistent with modern project norms.
  • Loading branch information
garlick committed Jul 6, 2023
1 parent 72c7b57 commit b04941c
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 170 deletions.
92 changes: 55 additions & 37 deletions src/modules/job-manager/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ static void requeue_pending (struct alloc *alloc, struct job *job)
}
annotations_sched_clear (job, &cleared);
if (cleared) {
if (event_job_post_pack (ctx->event, job, "annotations",
if (event_job_post_pack (ctx->event,
job,
"annotations",
EVENT_NO_COMMIT,
"{s:n}", "annotations") < 0)
"{s:n}",
"annotations") < 0)
flux_log_error (ctx->h,
"%s: event_job_post_pack",
__FUNCTION__);
Expand Down Expand Up @@ -184,8 +187,7 @@ int cancel_request (struct alloc *alloc, struct job *job)
FLUX_NODEID_ANY,
FLUX_RPC_NORESPONSE,
"{s:I}",
"id",
job->id))) {
"id", job->id))) {
flux_log_error (h, "sending sched.cancel id=%s", idf58 (job->id));
return -1;
}
Expand All @@ -210,11 +212,12 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,

if (flux_response_decode (msg, NULL, NULL) < 0)
goto teardown; // ENOSYS here if scheduler not loaded/shutting down
if (flux_msg_unpack (msg, "{s:I s:i s?s s?o}",
"id", &id,
"type", &type,
"note", &note,
"annotations", &annotations) < 0)
if (flux_msg_unpack (msg,
"{s:I s:i s?s s?o}",
"id", &id,
"type", &type,
"note", &note,
"annotations", &annotations) < 0)
goto teardown;
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
flux_log (h, LOG_ERR, "sched.alloc-response: id=%s not active",
Expand Down Expand Up @@ -251,8 +254,11 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,
alloc->alloc_pending_count--;
job->alloc_pending = 0;
if (job->annotations) {
if (event_job_post_pack (ctx->event, job, "alloc", 0,
"{ s:O }",
if (event_job_post_pack (ctx->event,
job,
"alloc",
0,
"{s:O}",
"annotations", job->annotations) < 0)
goto teardown;
}
Expand All @@ -279,9 +285,12 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,
}
annotations_clear (job, &cleared);
if (cleared) {
if (event_job_post_pack (ctx->event, job, "annotations",
if (event_job_post_pack (ctx->event,
job,
"annotations",
EVENT_NO_COMMIT,
"{s:n}", "annotations") < 0)
"{s:n}",
"annotations") < 0)
flux_log_error (ctx->h,
"%s: event_job_post_pack: id=%s",
__FUNCTION__,
Expand Down Expand Up @@ -309,9 +318,12 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh,
}
job->alloc_pending = 0;
if (cleared) {
if (event_job_post_pack (ctx->event, job, "annotations",
if (event_job_post_pack (ctx->event,
job,
"annotations",
EVENT_NO_COMMIT,
"{s:n}", "annotations") < 0)
"{s:n}",
"annotations") < 0)
flux_log_error (ctx->h,
"%s: event_job_post_pack: id=%s",
__FUNCTION__,
Expand Down Expand Up @@ -345,12 +357,13 @@ int alloc_request (struct alloc *alloc, struct job *job)

if (!(msg = flux_request_encode ("sched.alloc", NULL)))
return -1;
if (flux_msg_pack (msg, "{s:I s:I s:I s:f s:O}",
"id", job->id,
"priority", (json_int_t)job->priority,
"userid", (json_int_t) job->userid,
"t_submit", job->t_submit,
"jobspec", job->jobspec_redacted) < 0)
if (flux_msg_pack (msg,
"{s:I s:I s:I s:f s:O}",
"id", job->id,
"priority", (json_int_t)job->priority,
"userid", (json_int_t) job->userid,
"t_submit", job->t_submit,
"jobspec", job->jobspec_redacted) < 0)
goto error;
if (flux_send (alloc->ctx->h, msg, 0) < 0)
goto error;
Expand Down Expand Up @@ -383,7 +396,8 @@ static void hello_cb (flux_t *h, flux_msg_handler_t *mh,
job = zhashx_first (ctx->active_jobs);
while (job) {
if (job->has_resources) {
if (flux_respond_pack (h, msg,
if (flux_respond_pack (h,
msg,
"{s:I s:I s:I s:f}",
"id", job->id,
"priority", job->priority,
Expand Down Expand Up @@ -416,9 +430,11 @@ static void ready_cb (flux_t *h, flux_msg_handler_t *mh,
struct job *job;
const char *sender;

if (flux_request_unpack (msg, NULL, "{s:s s?i}",
"mode", &mode,
"limit", &limit) < 0)
if (flux_request_unpack (msg,
NULL,
"{s:s s?i}",
"mode", &mode,
"limit", &limit) < 0)
goto error;
if (streq (mode, "limited")) {
if (limit <= 0) {
Expand Down Expand Up @@ -541,8 +557,11 @@ static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
/* Post event for debugging if job was submitted FLUX_JOB_DEBUG flag.
*/
if ((job->flags & FLUX_JOB_DEBUG))
(void)event_job_post_pack (ctx->event, job,
"debug.alloc-request", 0, NULL);
(void)event_job_post_pack (ctx->event,
job,
"debug.alloc-request",
0,
NULL);
}

/* called from event_job_action() FLUX_JOB_STATE_CLEANUP */
Expand All @@ -554,8 +573,11 @@ int alloc_send_free_request (struct alloc *alloc, struct job *job)
return -1;
job->free_pending = 1;
if ((job->flags & FLUX_JOB_DEBUG))
(void)event_job_post_pack (alloc->ctx->event, job,
"debug.free-request", 0, NULL);
(void)event_job_post_pack (alloc->ctx->event,
job,
"debug.free-request",
0,
NULL);
alloc->free_pending_count++;
}
return 0;
Expand Down Expand Up @@ -709,14 +731,10 @@ static void alloc_query_cb (flux_t *h,
if (flux_respond_pack (h,
msg,
"{s:i s:i s:i s:i}",
"queue_length",
zlistx_size (alloc->queue),
"alloc_pending",
alloc->alloc_pending_count,
"free_pending",
alloc->free_pending_count,
"running",
alloc->ctx->running_jobs) < 0)
"queue_length", zlistx_size (alloc->queue),
"alloc_pending", alloc->alloc_pending_count,
"free_pending", alloc->free_pending_count,
"running", alloc->ctx->running_jobs) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
return;
}
Expand Down
10 changes: 6 additions & 4 deletions src/modules/job-manager/annotate.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,12 @@ void annotate_memo_request (flux_t *h,
int no_commit = 0;
json_t *tmp = NULL;

if (flux_request_unpack (msg, NULL, "{s:I s?b s:o}",
"id", &id,
"volatile", &no_commit,
"memo", &memo) < 0
if (flux_request_unpack (msg,
NULL,
"{s:I s?b s:o}",
"id", &id,
"volatile", &no_commit,
"memo", &memo) < 0
|| flux_msg_get_cred (msg, &cred) < 0)
goto error;
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
Expand Down
5 changes: 3 additions & 2 deletions src/modules/job-manager/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ static int event_submit_context_decode (json_t *context,
uint32_t *userid,
int *flags)
{
if (json_unpack (context, "{ s:i s:i s:i }",
if (json_unpack (context,
"{s:i s:i s:i}",
"urgency", urgency,
"userid", userid,
"flags", flags) < 0) {
Expand Down Expand Up @@ -499,7 +500,7 @@ static int event_exception_context_decode (json_t *context,
const char *type;

if (json_unpack (context,
"{ s:i s:s }",
"{s:i s:s}",
"severity", severity,
"type", &type) < 0) {
errno = EPROTO;
Expand Down
7 changes: 4 additions & 3 deletions src/modules/job-manager/job-manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ void getinfo_handle_request (flux_t *h,
if (flux_respond_pack (h,
msg,
"{s:I}",
"max_jobid",
ctx->max_jobid) < 0)
"max_jobid", ctx->max_jobid) < 0)
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
return;
error:
Expand All @@ -77,7 +76,9 @@ static void stats_cb (flux_t *h, flux_msg_handler_t *mh,
{
struct job_manager *ctx = arg;
int journal_listeners = journal_listeners_count (ctx->journal);
if (flux_respond_pack (h, msg, "{s:{s:i} s:i s:i s:I}",
if (flux_respond_pack (h,
msg,
"{s:{s:i} s:i s:i s:I}",
"journal",
"listeners", journal_listeners,
"active_jobs", zhashx_size (ctx->active_jobs),
Expand Down
19 changes: 10 additions & 9 deletions src/modules/job-manager/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ static int jobspec_redacted_parse_queue (struct job *job)
if (json_unpack (job->jobspec_redacted,
"{s?{s?{s?s}}}",
"attributes",
"system",
"queue", &job->queue) < 0) {
"system",
"queue", &job->queue) < 0) {
errno = EINVAL;
return -1;
}
Expand Down Expand Up @@ -266,13 +266,14 @@ struct job *job_create_from_json (json_t *o)

if (!(job = job_create ()))
return NULL;
if (json_unpack (o, "{s:I s:i s:i s:f s:i s:O}",
"id", &job->id,
"urgency", &job->urgency,
"userid", &job->userid,
"t_submit", &job->t_submit,
"flags", &job->flags,
"jobspec", &job->jobspec_redacted) < 0) {
if (json_unpack (o,
"{s:I s:i s:i s:f s:i s:O}",
"id", &job->id,
"urgency", &job->urgency,
"userid", &job->userid,
"t_submit", &job->t_submit,
"flags", &job->flags,
"jobspec", &job->jobspec_redacted) < 0) {
errno = EPROTO;
job_decref (job);
return NULL;
Expand Down
36 changes: 23 additions & 13 deletions src/modules/job-manager/jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,9 @@ static int jobtap_conf_entry (struct jobtap *jobtap,
const char *remove = NULL;
json_t *conf = NULL;

if (json_unpack_ex (entry, &json_err, 0,
if (json_unpack_ex (entry,
&json_err,
0,
"{s?s s?o s?s}",
"load", &load,
"conf", &conf,
Expand Down Expand Up @@ -674,7 +676,8 @@ int jobtap_get_priority (struct jobtap *jobtap,
/*
* A priority.get callback was run. Try to unpack a new priority
*/
if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_OUT,
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_OUT,
"{s?I}",
"priority", &priority) < 0) {
flux_log (jobtap->ctx->h, LOG_ERR,
Expand Down Expand Up @@ -763,7 +766,8 @@ static int jobtap_call_early (struct jobtap *jobtap,
* If plugin did not provide an error message, then construct
* a generic error "rejected by plugin".
*/
if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_OUT,
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:s}",
"errmsg", &errmsg) < 0)
errmsg = "rejected by job-manager plugin";
Expand Down Expand Up @@ -874,7 +878,8 @@ static int jobtap_check_dependency (struct jobtap *jobtap,
* a generic error "rejected by plugin".
*/
const char *errmsg;
if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_OUT,
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:s}",
"errmsg", &errmsg) < 0) {
errmsg = "rejected by job-manager dependency plugin";
Expand All @@ -892,11 +897,13 @@ static int dependencies_unpack (struct jobtap * jobtap,
json_t *dependencies = NULL;
json_error_t error;

if (json_unpack_ex (job->jobspec_redacted, &error, 0,
if (json_unpack_ex (job->jobspec_redacted,
&error,
0,
"{s:{s?{s?o}}}",
"attributes",
"system",
"dependencies", &dependencies) < 0) {
"system",
"dependencies", &dependencies) < 0) {
error_asprintf (jobtap, job, errp,
"unable to unpack dependencies: %s",
error.text);
Expand Down Expand Up @@ -1041,7 +1048,8 @@ int jobtap_call (struct jobtap *jobtap,
"jobtap: %s: callback returned error",
topic);
}
if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_OUT,
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_OUT,
"{s?I s?o}",
"priority", &priority,
"annotations", &note) < 0) {
Expand Down Expand Up @@ -1387,8 +1395,9 @@ static void jobtap_handle_list_req (flux_t *h,
json_t *o = jobtap_plugin_list (jobtap);
if (o == NULL)
flux_respond_error (h, msg, ENOMEM, "Failed to create plugin list");
else if (flux_respond_pack (h, msg,
"{ s:o }",
else if (flux_respond_pack (h,
msg,
"{s:o}",
"plugins", o) < 0)
flux_log_error (h, "jobtap_handle_list: flux_respond");
}
Expand Down Expand Up @@ -1918,8 +1927,7 @@ int flux_jobtap_job_set_flag (flux_plugin_t *p,
"set-flags",
0,
"{s:[s]}",
"flags",
flag);
"flags", flag);
}

static int jobtap_job_vraise (struct jobtap *jobtap,
Expand Down Expand Up @@ -2020,7 +2028,9 @@ int flux_jobtap_get_job_result (flux_plugin_t *p,
errno = EINVAL;
return -1;
}
if (json_unpack_ex (job->end_event, &error, 0,
if (json_unpack_ex (job->end_event,
&error,
0,
"{s:s s:{s?i s?s s?i}}",
"name", &name,
"context",
Expand Down
10 changes: 7 additions & 3 deletions src/modules/job-manager/journal.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ int journal_process_event (struct journal *journal,
msg = flux_msglist_first (journal->listeners);
while (msg) {
if (allow_deny_check (msg, name)) {
if (flux_respond_pack (journal->ctx->h, msg,
"{s:[O]}", "events", wrapped_entry) < 0)
if (flux_respond_pack (journal->ctx->h,
msg,
"{s:[O]}",
"events", wrapped_entry) < 0)
flux_log_error (journal->ctx->h, "%s: flux_respond_pack",
__FUNCTION__);
}
Expand Down Expand Up @@ -153,7 +155,9 @@ static void journal_handle_request (flux_t *h,

if (!(filter = calloc (1, sizeof (*filter))))
goto error;
if (flux_request_unpack (msg, NULL, "{s?o s?o}",
if (flux_request_unpack (msg,
NULL,
"{s?o s?o}",
"allow", &filter->allow,
"deny", &filter->deny) < 0
|| flux_msg_aux_set (msg, "filter", filter,
Expand Down
Loading

0 comments on commit b04941c

Please sign in to comment.