Skip to content

Commit

Permalink
Add a condition variable for waiting on events, and wait for it to be…
Browse files Browse the repository at this point in the history
… signalled if min_nr hasn't been reached
  • Loading branch information
kmeisthax committed Nov 21, 2021
1 parent 0a8c063 commit e8d3c9f
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 21 deletions.
23 changes: 17 additions & 6 deletions fs/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct aioctx *aioctx_new(int events_capacity, pid_t pid) {
memset(aioctx_events, 0, sizeof(struct aioctx_event) * events_capacity);

lock_init(&aioctx->lock);
cond_init(&aioctx->cond);

aioctx->refcount = 1;
aioctx->events_capacity = events_capacity;
Expand All @@ -65,6 +66,7 @@ void aioctx_retain(struct aioctx *ctx) {

static void _aioctx_decrement_ref(struct aioctx *ctx) {
if (--ctx->refcount == 0) {
cond_destroy(&ctx->cond);
free(ctx->events);
free(ctx);
} else {
Expand Down Expand Up @@ -143,6 +145,7 @@ void aioctx_complete_event(struct aioctx *ctx, unsigned int index, int64_t resul
ctx->events[index].data.as_complete = data;
}

notify_once(&ctx->cond);
unlock(&ctx->lock);
}

Expand Down Expand Up @@ -171,6 +174,16 @@ bool aioctx_consume_completed_event(struct aioctx *ctx, uint64_t *user_data, add
return result;
}

int aioctx_wait_for_completion(struct aioctx *ctx, struct timespec *timeout) {
if (ctx == NULL) return _EINVAL;

lock(&ctx->lock);
int err = wait_for(&ctx->cond, &ctx->lock, timeout);
unlock(&ctx->lock);

return err;
}

void aioctx_lock(struct aioctx* ctx) {
if (ctx == NULL) return;

Expand All @@ -194,18 +207,17 @@ signed int aioctx_get_pending_event(struct aioctx *ctx, unsigned int index, stru
return 0;
}

struct aioctx_table *aioctx_table_new(unsigned int capacity) {
struct aioctx_table *tbl = malloc(sizeof(struct aioctx_table));
if (tbl == NULL) return NULL;
signed int aioctx_table_new(struct aioctx_table *tbl, unsigned int capacity) {
if (tbl == NULL) return _EINVAL;

tbl->capacity = 0;
tbl->contexts = NULL;
lock_init(&tbl->lock);

int err = _aioctx_table_ensure(tbl, capacity);
if (err < 0) return ERR_PTR(err);
if (err < 0) return err;

return tbl;
return 0;
}

void aioctx_table_delete(struct aioctx_table *tbl) {
Expand All @@ -218,7 +230,6 @@ void aioctx_table_delete(struct aioctx_table *tbl) {
}
}
free(tbl->contexts);
free(tbl);
}

signed int aioctx_table_insert(struct aioctx_table *tbl, struct aioctx *ctx) {
Expand Down
24 changes: 23 additions & 1 deletion fs/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct aioctx_event {
struct aioctx {
atomic_uint refcount;
lock_t lock;
cond_t cond;

// Indicates if this context is owned by a task.
//
Expand Down Expand Up @@ -128,7 +129,12 @@ struct aioctx_table {
struct aioctx **contexts;
};

struct aioctx_table *aioctx_table_new(unsigned int capacity);
// In-place construct an AIO context table.
//
// Returns an error value if internal table buffers could not be allocated.
signed int aioctx_table_new(struct aioctx_table *tbl, unsigned int capacity);

// In-place destroy an AIO context table.
void aioctx_table_delete(struct aioctx_table *tbl);

// Insert an AIO context into a given table.
Expand Down Expand Up @@ -192,6 +198,9 @@ void aioctx_cancel_event(struct aioctx *ctx, unsigned int index);
//
// This accepts two result parameters, whose meaning is determined solely by
// the event opcode.
//
// This also signals any threads waiting on the context that an event has been
// completed.
void aioctx_complete_event(struct aioctx *ctx, unsigned int index, int64_t result0, int64_t result1);

// Consume a completed I/O event.
Expand All @@ -205,6 +214,19 @@ void aioctx_complete_event(struct aioctx *ctx, unsigned int index, int64_t resul
// from the queue, and the passed-in parameters should not be used.
bool aioctx_consume_completed_event(struct aioctx *ctx, uint64_t *user_data, addr_t *iocbp, struct aioctx_event_complete *completed_data);

// Wait for an event to complete.
//
// This function blocks the current thread until an event completion is posted
// to the context, or the timeout expires. When new events are completed, this
// function will return 0. If the timeout expired, this function will return
// _ETIMEDOUT. Any other error codes should be sent to client code.
//
// Please note that this function returning with 0 is not a guarantee that
// `aioctx_consume_completed_event` will yield data. This function may
// spuriously return 0 or some other thread may have claimed the event in
// between this function returning and the other function being called.
int aioctx_wait_for_completion(struct aioctx *ctx, struct timespec *timeout);

void aioctx_lock(struct aioctx* ctx);
void aioctx_unlock(struct aioctx* ctx);

Expand Down
34 changes: 26 additions & 8 deletions kernel/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "kernel/task.h"
#include "kernel/aio.h"
#include "kernel/fs.h"
#include "kernel/time.h"
#include "fs/aio.h"
#include "fs/fd.h"

Expand Down Expand Up @@ -36,7 +37,7 @@ dword_t sys_io_setup(dword_t nr_events, addr_t ctx_idp) {
if (ctx == NULL) return _ENOMEM;
if (IS_ERR(ctx)) return PTR_ERR(ctx);

int ctx_id = aioctx_table_insert(current->aioctx, ctx);
int ctx_id = aioctx_table_insert(&current->aioctx, ctx);
aioctx_release(ctx);
if (ctx_id < 0) {
return ctx_id;
Expand All @@ -52,20 +53,32 @@ dword_t sys_io_setup(dword_t nr_events, addr_t ctx_idp) {
dword_t sys_io_destroy(dword_t ctx_id) {
STRACE("io_destroy(%d)", ctx_id);

int err = aioctx_table_remove(current->aioctx, ctx_id) < 0;
int err = aioctx_table_remove(&current->aioctx, ctx_id) < 0;
if (err < 0) {
return err;
}

return 0;
}

dword_t sys_io_getevents(dword_t ctx_id, dword_t min_nr, dword_t nr, addr_t events, addr_t timeout) {
STRACE("io_getevents(0x%x, %d, %d, 0x%x, 0x%x)", ctx_id, min_nr, nr, events, timeout);
dword_t sys_io_getevents(dword_t ctx_id, dword_t min_nr, dword_t nr, addr_t events, addr_t timeout_addr) {
STRACE("io_getevents(0x%x, %d, %d, 0x%x, 0x%x)", ctx_id, min_nr, nr, events, timeout_addr);

struct aioctx *ctx = aioctx_table_get_and_retain(current->aioctx, ctx_id);
struct aioctx *ctx = aioctx_table_get_and_retain(&current->aioctx, ctx_id);
if (ctx == NULL) return _EINVAL;
if (events == 0) return _EFAULT;

struct timespec_ guest_timeout;
struct timespec host_timeout;
struct timespec *timeout = &host_timeout;

if (timeout_addr != 0) {
if (user_get(timeout_addr, guest_timeout)) return _EFAULT;
host_timeout.tv_sec = guest_timeout.sec;
host_timeout.tv_nsec = guest_timeout.nsec;
} else {
timeout = NULL;
}

dword_t i = 0;
for (i = 0; i < nr; i += 1) {
Expand All @@ -74,8 +87,13 @@ dword_t sys_io_getevents(dword_t ctx_id, dword_t min_nr, dword_t nr, addr_t even
struct aioctx_event_complete cdata;

if (!aioctx_consume_completed_event(ctx, &user_data, &iocbp, &cdata)) {
//TODO: Block until min_nr events recieved or timeout exceeded
break;
if (i >= min_nr) break;

int err = aioctx_wait_for_completion(ctx, timeout);

if (err == _ETIMEDOUT) break;
if (err < 0) return err;
continue;
}

uint64_t obj = (uint64_t)iocbp;
Expand All @@ -97,7 +115,7 @@ dword_t sys_io_submit(dword_t ctx_id, dword_t u_nr, addr_t iocbpp) {

if (nr < 0) return _EINVAL;

struct aioctx *ctx = aioctx_table_get_and_retain(current->aioctx, ctx_id);
struct aioctx *ctx = aioctx_table_get_and_retain(&current->aioctx, ctx_id);
if (ctx == NULL) return _EINVAL;

sdword_t i;
Expand Down
5 changes: 2 additions & 3 deletions kernel/fork.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,8 @@ static int copy_task(struct task *task, dword_t flags, addr_t stack, addr_t ptid
task->clear_tid = ctid_addr;
task->exit_signal = flags & CSIGNAL_;

task->aioctx = aioctx_table_new(0);
if (IS_ERR(task->aioctx)) {
err = PTR_ERR(task->aioctx);
err = aioctx_table_new(&task->aioctx, 0);
if (err < 0) {
goto fail_free_sighand;
}

Expand Down
4 changes: 3 additions & 1 deletion kernel/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ static struct task *construct_task(struct task *parent) {
task_set_mm(task, mm_new());
task->sighand = sighand_new();
task->files = fdtable_new(3); // why is there a 3 here
task->aioctx = aioctx_table_new(0);

signed int err = aioctx_table_new(&task->aioctx, 0);
if (err < 0) return ERR_PTR(err);

task->fs = fs_info_new();
task->fs->umask = 0022;
Expand Down
2 changes: 1 addition & 1 deletion kernel/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ struct task *task_create_(struct task *parent) {
void task_destroy(struct task *task) {
list_remove(&task->siblings);
pid_get(task->pid)->task = NULL;
aioctx_table_delete(task->aioctx);
aioctx_table_delete(&task->aioctx);
free(task);
}

Expand Down
2 changes: 1 addition & 1 deletion kernel/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct task {
struct fs_info *fs;

// Currently active AIO contexts. Contains internal lock.
struct aioctx_table *aioctx;
struct aioctx_table aioctx;

// locked by sighand->lock
struct sighand *sighand;
Expand Down

0 comments on commit e8d3c9f

Please sign in to comment.