Skip to content

Commit

Permalink
Merge pull request cloudwu#135 from cloudwu/dev
Browse files Browse the repository at this point in the history
release v0.4.2
  • Loading branch information
cloudwu committed Jul 14, 2014
2 parents e60fb1d + 4cdf034 commit a8a683b
Show file tree
Hide file tree
Showing 33 changed files with 468 additions and 68 deletions.
8 changes: 4 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
*.o
*.a
./skynet
./skynet.pid
/skynet
/skynet.pid
3rd/lua/lua
3rd/lua/luac
./cservice
./luaclib
/cservice
/luaclib
*.so
*.dSYM
12 changes: 12 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
v0.4.2 (2014-7-14)
-----------
* Bugfix : invalid negative socket id
* Add optional TCP_NODELAY support
* Add worker thread weight
* Add skynet.queue
* Bugfix: socketchannel
* cluster can throw error
* Add readline and writeline to clientsocket lib
* Add cluster.reload to reload config file
* Add datacenter.wait

v0.4.1 (2014-7-7)
-----------
* Add SERVICE_NAME in loader
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
For linux, install autoconf first for jemalloc

```
git clone git@github.com:cloudwu/skynet.git
git clone https://github.com/cloudwu/skynet.git
cd skynet
make 'PLATFORM' # PLATFORM can be linux, macosx, freebsd now
```
Expand Down Expand Up @@ -38,5 +38,4 @@ You can also use the offical lua version , edit the makefile by yourself .

* http://blog.codingnow.com/2012/09/the_design_of_skynet.html
* http://blog.codingnow.com/2012/08/skynet.html
* http://blog.codingnow.com/2012/08/skynet_harbor_rpc.html
* http://blog.codingnow.com/eo/skynet/
2 changes: 1 addition & 1 deletion examples/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ end

while true do
dispatch()
local cmd = socket.readline()
local cmd = socket.readstdin()
if cmd then
local args = {}
string.gsub(cmd, '[^ ]+', function(v) table.insert(args, v) end )
Expand Down
1 change: 1 addition & 0 deletions examples/main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ skynet.start(function()
port = 8888,
maxclient = max_client,
})
print("Watchdog listen on ", 8888)

skynet.exit()
end)
1 change: 1 addition & 0 deletions examples/watchdog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ function SOCKET.data(fd, msg)
end

function CMD.start(conf)
skynet.call(gate, "lua", "nodelay", true)
skynet.call(gate, "lua", "open" , conf)
end

Expand Down
85 changes: 79 additions & 6 deletions lualib-src/lua-clientsocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,19 @@ unpack(lua_State *L, uint8_t *buffer, int sz, int n) {
boolean (true: data, false: block, nil: close)
string last
*/

struct socket_buffer {
void * buffer;
int sz;
};

static int
lrecv(lua_State *L) {
recv_socket(lua_State *L, char *tmp, struct socket_buffer *result) {
int fd = luaL_checkinteger(L,1);
size_t sz = 0;
const char * last = lua_tolstring(L,2,&sz);
luaL_checktype(L, 3, LUA_TTABLE);

char tmp[CACHE_SIZE];
char * buffer;
int r = recv(fd, tmp, CACHE_SIZE, 0);
if (r == 0) {
Expand Down Expand Up @@ -163,10 +168,64 @@ lrecv(lua_State *L) {
lua_pushnil(L);
lua_rawseti(L, 3, i);
}
result->buffer = buffer;
result->sz = r + sz;
return -1;
}

return unpack(L, (uint8_t *)buffer, r+sz, 0);
static int
lrecv(lua_State *L) {
struct socket_buffer sb;
char tmp[CACHE_SIZE];
int ret = recv_socket(L, tmp, &sb);
if (ret < 0) {
return unpack(L, sb.buffer, sb.sz, 0);
} else {
return ret;
}
}

static int
unpack_line(lua_State *L, uint8_t *buffer, int sz, int n) {
if (sz == 0)
goto _block;
if (buffer[0] == '\n') {
return unpack_line(L, buffer+1, sz-1, n);
}
int i;
for (i=1;i<sz;i++) {
if (buffer[i] == '\n') {
++n;
lua_pushlstring(L, (const char *)buffer, i);
lua_rawseti(L, 3, n);
buffer += i + 1;
sz -= i + 1;
return unpack_line(L, buffer, sz, n);
}
}
_block:
lua_pushboolean(L, n==0 ? 0:1);
if (sz == 0) {
lua_pushnil(L);
} else {
lua_pushlstring(L, (const char *)buffer, sz);
}
return 2;
}

static int
lreadline(lua_State *L) {
struct socket_buffer sb;
char tmp[CACHE_SIZE];
int ret = recv_socket(L, tmp, &sb);
if (ret < 0) {
return unpack_line(L, sb.buffer, sb.sz, 0);
} else {
return ret;
}
}


static int
lusleep(lua_State *L) {
int n = luaL_checknumber(L, 1);
Expand Down Expand Up @@ -219,7 +278,7 @@ readline_stdin(void * arg) {
}

static int
lreadline(lua_State *L) {
lreadstdin(lua_State *L) {
struct queue *q = lua_touserdata(L, lua_upvalueindex(1));
LOCK(q);
if (q->head == q->tail) {
Expand All @@ -236,6 +295,18 @@ lreadline(lua_State *L) {
return 1;
}

static int
lwriteline(lua_State *L) {
size_t sz = 0;
int fd = luaL_checkinteger(L,1);
const char * msg = luaL_checklstring(L, 2, &sz);
block_send(L, fd, msg, sz);
char nl[1] = { '\n' };
block_send(L, fd, nl, 1);

return 0;
}

int
luaopen_clientsocket(lua_State *L) {
luaL_checkversion(L);
Expand All @@ -245,14 +316,16 @@ luaopen_clientsocket(lua_State *L) {
{ "send", lsend },
{ "close", lclose },
{ "usleep", lusleep },
{ "readline", lreadline },
{ "writeline", lwriteline },
{ NULL, NULL },
};
luaL_newlib(L, l);

struct queue * q = lua_newuserdata(L, sizeof(*q));
memset(q, 0, sizeof(*q));
lua_pushcclosure(L, lreadline, 1);
lua_setfield(L, -2, "readline");
lua_pushcclosure(L, lreadstdin, 1);
lua_setfield(L, -2, "readstdin");

pthread_t pid ;
pthread_create(&pid, NULL, readline_stdin, q);
Expand Down
31 changes: 22 additions & 9 deletions lualib-src/lua-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
uint32_t next_session
*/

#define TEMP_LENGTH 0x10002
#define TEMP_LENGTH 0x10007

static void
fill_uint32(uint8_t * buf, uint32_t n) {
Expand Down Expand Up @@ -146,6 +146,7 @@ lunpackrequest(lua_State *L) {

/*
int session
boolean ok
lightuserdata msg
int sz
return string response
Expand All @@ -155,15 +156,27 @@ lpackresponse(lua_State *L) {
uint32_t session = luaL_checkunsigned(L,1);
// clusterd.lua:command.socket call lpackresponse,
// and the msg/sz is return by skynet.rawcall , so don't free(msg)
void * msg = lua_touserdata(L,2);
size_t sz = luaL_checkunsigned(L, 3);
int ok = lua_toboolean(L,2);
void * msg;
size_t sz;

if (lua_type(L,3) == LUA_TSTRING) {
msg = (void *)lua_tolstring(L, 3, &sz);
if (sz > 0x1000) {
sz = 0x1000;
}
} else {
msg = lua_touserdata(L,3);
sz = luaL_checkunsigned(L, 4);
}

uint8_t buf[TEMP_LENGTH];
fill_header(L, buf, sz+4, msg);
fill_header(L, buf, sz+5, msg);
fill_uint32(buf+2, session);
memcpy(buf+6,msg,sz);
buf[6] = ok;
memcpy(buf+7,msg,sz);

lua_pushlstring(L, (const char *)buf, sz+6);
lua_pushlstring(L, (const char *)buf, sz+7);

return 1;
}
Expand All @@ -178,13 +191,13 @@ static int
lunpackresponse(lua_State *L) {
size_t sz;
const char * buf = luaL_checklstring(L, 1, &sz);
if (sz < 4) {
if (sz < 5) {
return 0;
}
uint32_t session = unpack_uint32((const uint8_t *)buf);
lua_pushunsigned(L, session);
lua_pushboolean(L, 1);
lua_pushlstring(L, buf+4, sz-4);
lua_pushboolean(L, buf[4]);
lua_pushlstring(L, buf+5, sz-5);

return 3;
}
Expand Down
2 changes: 1 addition & 1 deletion lualib-src/lua-mongo.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ op_insert(lua_State *L) {
int i;
for (i=1;i<=s;i++) {
lua_rawgeti(L,3,i);
document doc = lua_touserdata(L,3);
document doc = lua_touserdata(L,-1);
luaL_addlstring(&b, (const char *)doc, get_length(doc));
lua_pop(L,1);
}
Expand Down
15 changes: 15 additions & 0 deletions lualib-src/lua-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#define BACKLOG 32
// 2 ** 12 == 4096
#define LARGE_PAGE_NODE 12
#define BUFFER_LIMIT (256 * 1024)

struct buffer_node {
char * msg;
Expand All @@ -22,6 +23,7 @@ struct buffer_node {
};

struct socket_buffer {
int limit;
int size;
int offset;
struct buffer_node *head;
Expand Down Expand Up @@ -64,6 +66,7 @@ lnewpool(lua_State *L, int sz) {
static int
lnewbuffer(lua_State *L) {
struct socket_buffer * sb = lua_newuserdata(L, sizeof(*sb));
sb->limit = luaL_optint(L,1,BUFFER_LIMIT);
sb->size = 0;
sb->offset = 0;
sb->head = NULL;
Expand Down Expand Up @@ -126,6 +129,9 @@ lpushbuffer(lua_State *L) {
sb->size += sz;

lua_pushinteger(L, sb->size);
if (sb->limit > 0 && sb->size > sb->limit) {
return luaL_error(L, "buffer overflow (limit = %d, size = %d)", sb->limit, sb->size);
}

return 1;
}
Expand Down Expand Up @@ -476,6 +482,14 @@ lstart(lua_State *L) {
return 0;
}

static int
lnodelay(lua_State *L) {
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = luaL_checkinteger(L, 1);
skynet_socket_nodelay(ctx,id);
return 0;
}

int
luaopen_socketdriver(lua_State *L) {
luaL_checkversion(L);
Expand All @@ -502,6 +516,7 @@ luaopen_socketdriver(lua_State *L) {
{ "lsend", lsendlow },
{ "bind", lbind },
{ "start", lstart },
{ "nodelay", lnodelay },
{ NULL, NULL },
};
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
Expand Down
4 changes: 4 additions & 0 deletions lualib/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ function cluster.open(port)
end
end

function cluster.reload()
skynet.call(clusterd, "lua", "reload")
end

skynet.init(function()
clusterd = skynet.uniqueservice("clusterd")
end)
Expand Down
4 changes: 4 additions & 0 deletions lualib/datacenter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@ function datacenter.set(...)
return skynet.call("DATACENTER", "lua", "UPDATE", ...)
end

function datacenter.wait(...)
return skynet.call("DATACENTER", "lua", "WAIT", ...)
end

return datacenter

2 changes: 2 additions & 0 deletions lualib/mqueue.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-- This is a deprecated module, use skynet.queue instead.

local skynet = require "skynet"
local c = require "skynet.c"

Expand Down
Loading

0 comments on commit a8a683b

Please sign in to comment.