Skip to content

Commit

Permalink
Protect fdb writes with mutex if hbeats are enabled
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Hannum <mhannum72@gmail.com>
  • Loading branch information
markhannum committed Oct 24, 2023
1 parent 342a286 commit a3bb6d4
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 47 deletions.
18 changes: 15 additions & 3 deletions db/fdb_fend.c
Original file line number Diff line number Diff line change
Expand Up @@ -3776,6 +3776,7 @@ static fdb_tran_t *fdb_trans_dtran_get_subtran(struct sqlclntstate *clnt,
free(msg);
return NULL;
}
memcpy(tran->magic, "FDBT", 4);
tran->tid = (char *)tran->tiduuid;
comdb2uuid((unsigned char *)tran->tid);

Expand All @@ -3795,6 +3796,7 @@ static fdb_tran_t *fdb_trans_dtran_get_subtran(struct sqlclntstate *clnt,

/* need hbeats */
Pthread_mutex_init(&tran->hbeats.sb_mtx, NULL);
sbuf2setuserptr(tran->sb, tran);
tran->hbeats.tran = tran;
enable_fdb_heartbeats(&tran->hbeats);

Expand Down Expand Up @@ -3868,8 +3870,6 @@ static void _free_fdb_tran(fdb_distributed_tran_t *dtran, fdb_tran_t *tran)

listc_rfl(&dtran->fdb_trans, tran);

if (tran->sb)
sbuf2close(tran->sb);
if (tran->errstr)
free(tran->errstr);
if (tran->dedup_tbl != NULL) {
Expand All @@ -3879,7 +3879,6 @@ static void _free_fdb_tran(fdb_distributed_tran_t *dtran, fdb_tran_t *tran)
"%s: error closing temptable, rc %d, bdberr %d\n",
__func__, rc, bdberr);
}
Pthread_mutex_destroy(&tran->hbeats.sb_mtx);
disable_fdb_heartbeats_and_free(&tran->hbeats);
}

Expand Down Expand Up @@ -4753,6 +4752,19 @@ int fdb_heartbeats(fdb_hbeats_type *hbeats)
return rc;
}

/**
* Close sbuf2, destroy mutex and free fdb-tran
*
*/
void fdb_heartbeat_free_tran(fdb_hbeats_type *hbeats)
{
if (hbeats->tran->sb) {
sbuf2close(hbeats->tran->sb);
}
Pthread_mutex_destroy(&hbeats->sb_mtx);
free(hbeats->tran);
}

/* check if the mentioned fdb has a preferred node, and get the status of last
* op */
static char *_fdb_get_affinity_node(struct sqlclntstate *clnt, const fdb_t *fdb,
Expand Down
7 changes: 7 additions & 0 deletions db/fdb_fend.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ typedef struct fdb_sqlstat_table fdb_sqlstat_table_t;
typedef struct fdb_sqlstat_cursor fdb_sqlstat_cursor_t;

struct fdb_tran {
char magic[4];
char *tid; /* transaction id */
char *
host; /* what is the remote replicant this transaction is submitted to
Expand Down Expand Up @@ -410,6 +411,12 @@ int fdb_unlock_table(fdb_tbl_ent_t *ent);
*/
int fdb_heartbeats(fdb_hbeats_type *hbeats);

/**
* Close sbuf2, destroy mutex and free fdb-tran
*
*/
void fdb_heartbeat_free_tran(fdb_hbeats_type *hbeats);

/**
* Change association of a cursor to a table (see body note)
*
Expand Down
5 changes: 2 additions & 3 deletions net/net_evbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1827,10 +1827,8 @@ static void fdb_heartbeat(int dummyfd, short what, void *data)
check_fdb_thd();

fdb_hbeats_type *hb = data;
Pthread_mutex_lock(&hb->sb_mtx);
logmsg(LOGMSG_INFO, "Sending fdb heartbeat for tran %p\n", hb);
fdb_heartbeats(hb);
Pthread_mutex_unlock(&hb->sb_mtx);
}

static void do_enable_fdb_heartbeats(int dummyfd, short what, void *data)
Expand Down Expand Up @@ -1858,6 +1856,7 @@ int enable_fdb_heartbeats(fdb_hbeats_type *hb)
hb, NULL);
}

extern void fdb_heartbeat_free_tran(fdb_hbeats_type *hb);
static void do_disable_fdb_heartbeats_and_free(int dummyfd, short what, void *data)
{
fdb_hbeats_type *hb= data;
Expand All @@ -1868,7 +1867,7 @@ static void do_disable_fdb_heartbeats_and_free(int dummyfd, short what, void *da
event_free(hb->ev_hbeats);
hb->ev_hbeats = NULL;
}
free(hb->tran);
fdb_heartbeat_free_tran(hb);
}

int disable_fdb_heartbeats_and_free(fdb_hbeats_type *hb)
Expand Down
77 changes: 36 additions & 41 deletions plugins/remsql/fdb_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,7 @@ void fdb_msg_print_message(SBUF2 *sb, fdb_msg_t *msg, char *prefix)
}

/* stuff goes as network endian */
static int fdb_msg_write_message(SBUF2 *sb, fdb_msg_t *msg, int flush)
static int fdb_msg_write_message_lk(SBUF2 *sb, fdb_msg_t *msg, int flush)
{
int type;
int tmp;
Expand All @@ -1701,7 +1701,7 @@ static int fdb_msg_write_message(SBUF2 *sb, fdb_msg_t *msg, int flush)
return FDB_ERR_WRITE_IO;
}

assert (msg->hd.type & FD_MSG_FLAGS_ISUUID);
assert(msg->hd.type & FD_MSG_FLAGS_ISUUID);
idsz = sizeof(uuid_t);

send_dk = 0;
Expand Down Expand Up @@ -1819,7 +1819,7 @@ static int fdb_msg_write_message(SBUF2 *sb, fdb_msg_t *msg, int flush)

int haveid;

assert (msg->hd.type & FD_MSG_FLAGS_ISUUID);
assert(msg->hd.type & FD_MSG_FLAGS_ISUUID);
haveid = !comdb2uuid_is_zero((unsigned char *)msg->cc.tid);

if (haveid) {
Expand Down Expand Up @@ -2268,18 +2268,28 @@ static int fdb_msg_write_message(SBUF2 *sb, fdb_msg_t *msg, int flush)
return 0;
}

static void _fdb_extract_source_id(struct sqlclntstate *clnt, SBUF2 *sb,
fdb_msg_t *msg)
static int fdb_msg_write_message(SBUF2 *sb, fdb_msg_t *msg, int flush)
{
fdb_tran_t *tran = (fdb_tran_t *)sbuf2getuserptr(sb);
pthread_mutex_t *sb_mtx =
(tran && !memcmp(tran->magic, "FDBT", 4) && tran->hbeats.tran) ? &tran->hbeats.sb_mtx : NULL;
if (sb_mtx)
Pthread_mutex_lock(sb_mtx);
int rc = fdb_msg_write_message_lk(sb, msg, flush);
if (sb_mtx)
Pthread_mutex_unlock(sb_mtx);
return rc;
}

static void _fdb_extract_source_id(struct sqlclntstate *clnt, SBUF2 *sb, fdb_msg_t *msg)
{
clnt->conninfo.node = -1; /*get_origin_mach_by_fd(sbuf2fileno(sb));*/

/* extract source */
if (msg->co.srcname)
strncpy(clnt->conninfo.pename, msg->co.srcname,
sizeof(clnt->conninfo.pename));
strncpy(clnt->conninfo.pename, msg->co.srcname, sizeof(clnt->conninfo.pename));
else
strncpy(clnt->conninfo.pename, "UNKNOWN",
sizeof(clnt->conninfo.pename));
strncpy(clnt->conninfo.pename, "UNKNOWN", sizeof(clnt->conninfo.pename));
clnt->conninfo.pename[sizeof(clnt->conninfo.pename) - 1] = '\0';
clnt->conninfo.pid = msg->co.srcpid;
}
Expand All @@ -2294,8 +2304,7 @@ int fdb_bend_cursor_open(SBUF2 *sb, fdb_msg_t *msg, svc_callback_arg_t *arg)
assert(msg->hd.type == FDB_MSG_CURSOR_OPEN);

/* create a cursor */
if (!fdb_svc_cursor_open(tid, cid, msg->co.rootpage, msg->co.version,
msg->co.flags, seq, &clnt)) {
if (!fdb_svc_cursor_open(tid, cid, msg->co.rootpage, msg->co.version, msg->co.flags, seq, &clnt)) {
logmsg(LOGMSG_ERROR, "%s: failed to open cursor\n", __func__);
arg->clnt = NULL;
return -1;
Expand Down Expand Up @@ -2336,8 +2345,7 @@ int fdb_bend_cursor_close(SBUF2 *sb, fdb_msg_t *msg, svc_callback_arg_t *arg)
*/
rc = fdb_svc_cursor_close(cid, (clnt) ? &clnt : NULL);
if (rc < 0) {
logmsg(LOGMSG_ERROR, "%s: failed to close cursor rc=%d\n", __func__,
rc);
logmsg(LOGMSG_ERROR, "%s: failed to close cursor rc=%d\n", __func__, rc);
}

return rc;
Expand Down Expand Up @@ -2365,8 +2373,7 @@ static enum svc_move_types move_type(int type)
return -1;
}

int fdb_bend_send_row(SBUF2 *sb, fdb_msg_t *msg, char *cid,
unsigned long long genid, char *data, int datalen,
int fdb_bend_send_row(SBUF2 *sb, fdb_msg_t *msg, char *cid, unsigned long long genid, char *data, int datalen,
char *datacopy, int datacopylen, int ret)
{
int rc;
Expand Down Expand Up @@ -2400,8 +2407,7 @@ int fdb_bend_send_row(SBUF2 *sb, fdb_msg_t *msg, char *cid,
if (rc) {
/* this happens natural for fractured streams */
if (gbl_fdb_track)
logmsg(LOGMSG_USER, "%s: failed send row back, rc=%d\n", __func__,
rc);
logmsg(LOGMSG_USER, "%s: failed send row back, rc=%d\n", __func__, rc);
}

return rc;
Expand All @@ -2416,16 +2422,14 @@ int fdb_bend_cursor_move(SBUF2 *sb, fdb_msg_t *msg, svc_callback_arg_t *arg)
int datalen;
int datacopylen;

rc = fdb_svc_cursor_move(move_type(msg->hd.type), msg->cm.cid, &data,
&datalen, &genid, &datacopy, &datacopylen);
rc = fdb_svc_cursor_move(move_type(msg->hd.type), msg->cm.cid, &data, &datalen, &genid, &datacopy, &datacopylen);
if (rc != IX_FND && rc != IX_NOTFND && rc != IX_PASTEOF && rc != IX_EMPTY) {
logmsg(LOGMSG_ERROR, "%s: failed move rc %d\n", __func__, rc);
/*TODO: notify the other side! */
return rc;
}

rc = fdb_bend_send_row(sb, msg, NULL, genid, data, datalen, datacopy,
datacopylen, rc);
rc = fdb_bend_send_row(sb, msg, NULL, genid, data, datalen, datacopy, datacopylen, rc);

return rc;
}
Expand All @@ -2442,20 +2446,16 @@ int fdb_bend_cursor_find(SBUF2 *sb, fdb_msg_t *msg, svc_callback_arg_t *arg)
int datacopylen;
int rc;

assert(msg->hd.type == FDB_MSG_CURSOR_FIND ||
msg->hd.type == FDB_MSG_CURSOR_FIND_LAST);
assert(msg->hd.type == FDB_MSG_CURSOR_FIND || msg->hd.type == FDB_MSG_CURSOR_FIND_LAST);

rc = fdb_svc_cursor_find(
cid, keylen, key, msg->hd.type == FDB_MSG_CURSOR_FIND_LAST, &genid,
&datalen, &data, &datacopy, &datacopylen);
rc = fdb_svc_cursor_find(cid, keylen, key, msg->hd.type == FDB_MSG_CURSOR_FIND_LAST, &genid, &datalen, &data,
&datacopy, &datacopylen);
if (rc != IX_FND && rc != IX_NOTFND && rc != IX_PASTEOF && rc != IX_EMPTY) {
logmsg(LOGMSG_ERROR, "%s: failed to execute a cursor find rc=%d\n",
__func__, rc);
logmsg(LOGMSG_ERROR, "%s: failed to execute a cursor find rc=%d\n", __func__, rc);
return rc;
}

rc = fdb_bend_send_row(sb, msg, NULL, genid, data, datalen, datacopy,
datacopylen, rc);
rc = fdb_bend_send_row(sb, msg, NULL, genid, data, datalen, datacopy, datacopylen, rc);

return rc;
}
Expand All @@ -2479,35 +2479,30 @@ int fdb_bend_run_sql(SBUF2 *sb, fdb_msg_t *msg, svc_callback_arg_t *arg)
start_localrpc = osql_log_time();
/*fprintf(stderr, "=== Calling appsock %llu\n", start_localrpc);*/

rc = fdb_appsock_work(cid, clnt, version, flags, sql, sqllen, trim_key,
trim_keylen, sb);
rc = fdb_appsock_work(cid, clnt, version, flags, sql, sqllen, trim_key, trim_keylen, sb);
if (rc) {
/* this happens natural for fractured streams */
if (gbl_fdb_track)
logmsg(LOGMSG_USER, "%s: failed to dispatch request rc=%d \"%s\"\n",
__func__, rc, sql);
logmsg(LOGMSG_USER, "%s: failed to dispatch request rc=%d \"%s\"\n", __func__, rc, sql);
return -1;
}

end_localrpc = osql_log_time();

if (gbl_time_fdb) {
logmsg(LOGMSG_USER,
"=== DONE running remsql time %llu [%llu -> %llu] rc=%d\n",
end_localrpc - start_localrpc, start_localrpc, end_localrpc, rc);
logmsg(LOGMSG_USER, "=== DONE running remsql time %llu [%llu -> %llu] rc=%d\n", end_localrpc - start_localrpc,
start_localrpc, end_localrpc, rc);
}

/* was there any error processing? */
int irc;
if ((irc = errstat_get_rc(&clnt->fdb_state.xerr)) != 0) {
/* we need to send back a rc code */
const char *tmp = errstat_get_str(&clnt->fdb_state.xerr);
rc = fdb_svc_sql_row(clnt->fdb_state.remote_sql_sb, cid,
(char *)tmp, /* the actual row is the errstr */
rc = fdb_svc_sql_row(clnt->fdb_state.remote_sql_sb, cid, (char *)tmp, /* the actual row is the errstr */
strlen(tmp) + 1, irc);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: fdb_svc_sql_row failed rc=%d\n", __func__,
rc);
logmsg(LOGMSG_ERROR, "%s: fdb_svc_sql_row failed rc=%d\n", __func__, rc);
}
}

Expand Down

0 comments on commit a3bb6d4

Please sign in to comment.