diff --git a/common.cpp b/common.cpp index 749f48b..2322d88 100644 --- a/common.cpp +++ b/common.cpp @@ -416,3 +416,12 @@ int create_new_udp(int &new_udp_fd,int remote_address_uint32,int remote_port) } return 0; }*/ +void ip_port_t::from_u64(u64_t u64) +{ + ip=get_u64_h(u64); + port=get_u64_l(u64); +} +u64_t ip_port_t::to_u64() +{ + return pack_u64(ip,port); +} diff --git a/common.h b/common.h index 30ddb1c..aa65d7b 100644 --- a/common.h +++ b/common.h @@ -124,6 +124,37 @@ typedef u64_t padding_t; typedef u64_t anti_replay_seq_t; +typedef u64_t fd64_t; + +enum dest_type{none=0,type_ip_port,type_fd64,type_fd}; + + +struct ip_port_t +{ + u32_t ip; + int port; + void from_u64(u64_t u64); + u64_t to_u64(); +}; + +union inner_t +{ + ip_port_t ip_port; + int fd; + fd64_t fd64; +}; +struct dest_t +{ + dest_type type; + inner_t inner; +}; + +struct fd_info_t +{ + ip_port_t ip_port; +}; + + u64_t get_current_time(); u64_t get_current_time_us(); u64_t pack_u64(u32_t a,u32_t b); diff --git a/connection.cpp b/connection.cpp index cd32e36..5325bca 100644 --- a/connection.cpp +++ b/connection.cpp @@ -166,7 +166,7 @@ conv_manager_t::~conv_manager_t() { ready_num=0; mp.reserve(100007); - fd64_mp.reserve(100007); + //fd64_mp.reserve(100007); clear_it=mp.begin(); last_clear_time=0; } @@ -209,6 +209,7 @@ conv_manager_t::~conv_manager_t() } return *mp[u64]; } + /* int conn_manager_t::exist_fd64(fd64_t fd64) { return fd64_mp.find(fd64)!=fd64_mp.end(); @@ -225,7 +226,7 @@ conv_manager_t::~conv_manager_t() ip_port_t res; res.from_u64(fd64_mp[fd64]); return res; - } + }*/ int conn_manager_t::erase(unordered_map::iterator erase_it) { /* @@ -310,17 +311,8 @@ void server_clear_function(u64_t u64)//used in conv_manager in server mode.for s { int fd64=u64; int ret; - assert(fd_manager.fd64_exist(fd64)); - int fd=fd_manager.fd64_to_fd(fd64); + assert(fd_manager.exist(fd64)); + int fd=fd_manager.to_fd(fd64); - fd_manager.remove_fd64(fd64); - ret= close(fd); //closed fd should be auto removed from epoll - if (ret!=0) - { - mylog(log_fatal,"close fd %d failed !!!!\n",fd); - myexit(-1); //this shouldnt happen - } - //mylog(log_fatal,"size:%d !!!!\n",conn_manager.udp_fd_mp.size()); - assert(conn_manager.fd64_mp.find(fd)!=conn_manager.fd64_mp.end()); - conn_manager.fd64_mp.erase(fd); + fd_manager.close(fd64); } diff --git a/connection.h b/connection.h index 36ca1e2..14c6b13 100644 --- a/connection.h +++ b/connection.h @@ -79,7 +79,7 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m u32_t ready_num; - unordered_map fd64_mp; +// unordered_map fd64_mp; unordered_map mp;// to conn_info_t; //put it at end so that it de-consturcts first @@ -91,9 +91,10 @@ struct conn_manager_t //manager for connections. for client,we dont need conn_m int exist_ip_port(ip_port_t); conn_info_t *& find_insert_p(ip_port_t); //be aware,the adress may change after rehash conn_info_t & find_insert(ip_port_t) ; //be aware,the adress may change after rehash + /* int exist_fd64(fd64_t fd64); void insert_fd64(fd64_t fd64,ip_port_t); - ip_port_t find_by_fd64(fd64_t fd64); + ip_port_t find_by_fd64(fd64_t fd64);*/ int erase(unordered_map::iterator erase_it); diff --git a/fd_manager.cpp b/fd_manager.cpp index e1a8d6d..99ee673 100644 --- a/fd_manager.cpp +++ b/fd_manager.cpp @@ -11,20 +11,22 @@ int fd_manager_t::fd_exist(int fd) { return fd_to_fd64_mp.find(fd)!=fd_to_fd64_mp.end(); } -int fd_manager_t::fd64_exist(fd64_t fd64) +int fd_manager_t::exist(fd64_t fd64) { return fd64_to_fd_mp.find(fd64)!=fd64_to_fd_mp.end(); } +/* fd64_t fd_manager_t::fd_to_fd64(int fd) { assert(fd_exist(fd)); return fd_to_fd64_mp[fd]; -} -int fd_manager_t::fd64_to_fd(fd64_t fd64) +}*/ +int fd_manager_t::to_fd(fd64_t fd64) { - assert(fd64_exist(fd64)); + assert(exist(fd64)); return fd64_to_fd_mp[fd64]; } +/* void fd_manager_t::remove_fd(int fd) { assert(fd_exist(fd)); @@ -32,27 +34,34 @@ void fd_manager_t::remove_fd(int fd) fd_to_fd64_mp.erase(fd); fd64_to_fd_mp.erase(fd64); //return 0; -} -void fd_manager_t::remove_fd64(fd64_t fd64) +}*/ +void fd_manager_t::close(fd64_t fd64) { - assert(fd64_exist(fd64)); + assert(exist(fd64)); int fd=fd64_to_fd_mp[fd64]; fd64_to_fd_mp.erase(fd64); fd_to_fd64_mp.erase(fd); + if(exist_info(fd64)) + { + fd_info_mp.erase(fd64); + } + close(fd); //return 0; } void fd_manager_t::reserve() { - fd_to_fd64_mp.reserve(100007); - fd64_to_fd_mp.reserve(100007); + fd_to_fd64_mp.reserve(10007); + fd64_to_fd_mp.reserve(10007); + fd_info_mp.reserve(10007); //return 0; } -u64_t fd_manager_t::insert_fd(int fd) +u64_t fd_manager_t::create(int fd) { assert(!fd_exist(fd)); fd64_t fd64=counter++; fd_to_fd64_mp[fd]=fd64; fd64_to_fd_mp[fd64]=fd; + //fd_info_mp[fd64]; return fd64; } fd_manager_t::fd_manager_t() @@ -60,3 +69,12 @@ fd_manager_t::fd_manager_t() counter=u32_t(-1); counter+=2; } +fd_info_t & fd_manager_t::get_info(fd64_t fd64) +{ + assert(exist(fd64)); + return fd_info_mp[fd64]; +} +int fd_manager_t::exist_info(fd64_t fd64) +{ + return fd_info_mp.find(fd64)!=fd_info_mp.end(); +} diff --git a/fd_manager.h b/fd_manager.h index 4ad55da..adaf0ca 100644 --- a/fd_manager.h +++ b/fd_manager.h @@ -9,24 +9,29 @@ #define FD_MANAGER_H_ #include "common.h" +#include "packet.h" + -typedef u64_t fd64_t; struct fd_manager_t //conver fd to a uniq 64bit number,avoid fd value conflict caused by close and re-create //not used currently { - u64_t counter; - unordered_map fd_to_fd64_mp; - unordered_map fd64_to_fd_mp; - int fd_exist(int fd); - int fd64_exist(fd64_t fd64); - fd64_t fd_to_fd64(int fd); - int fd64_to_fd(fd64_t); - void remove_fd(int fd); - void remove_fd64(fd64_t fd64); + fd_info_t & get_info(fd64_t fd64); + int exist_info(fd64_t); + int exist(fd64_t fd64); + int to_fd(fd64_t); + void close(fd64_t fd64); void reserve(); - u64_t insert_fd(int fd); + u64_t create(int fd); fd_manager_t(); +private: + u64_t counter; + unordered_map fd_to_fd64_mp; + unordered_map fd64_to_fd_mp; + unordered_map fd_info_mp; + int fd_exist(int fd); + //void remove_fd(int fd); + //fd64_t fd_to_fd64(int fd); }; extern fd_manager_t fd_manager; diff --git a/main.cpp b/main.cpp index a63deab..76ebdca 100644 --- a/main.cpp +++ b/main.cpp @@ -186,16 +186,10 @@ int client_event_loop() get_conv(conv,data,data_len,new_data,new_len); if(!conn_info.conv_manager.is_conv_used(conv))continue; u64_t u64=conn_info.conv_manager.find_conv_by_u64(conv); - u32_t ip=get_u64_h(u64); - int port=get_u64_l(u64); dest_t dest; + dest.inner.ip_port.from_u64(u64); dest.type=type_ip_port; - dest.inner.ip_port.ip=ip; - dest.inner.ip_port.port=port; my_send(dest,new_data,new_len); - //sendto_ip_port(ip,port,new_data,new_len,0); - - //////////////////todo } /* else if(events[idx].data.u64 ==(u64_t)timer_fd) @@ -226,7 +220,10 @@ int client_event_loop() mylog(log_trace,"Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), ntohs(udp_new_addr_in.sin_port),data_len); - u64_t u64=((u64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port); + ip_port_t ip_port; + ip_port.ip=udp_new_addr_in.sin_addr.s_addr; + ip_port.port=udp_new_addr_in.sin_port; + u64_t u64=ip_port.to_u64(); u32_t conv; if(!conn_info.conv_manager.is_u64_used(u64)) @@ -244,7 +241,6 @@ int client_event_loop() { conv=conn_info.conv_manager.find_conv_by_u64(u64); } - conn_info.conv_manager.update_active_time(conv); char *new_data; @@ -371,23 +367,23 @@ int server_event_loop() if (!conn_info.conv_manager.is_conv_used(conv)) { int new_udp_fd; - new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port); + ret=new_connected_socket(new_udp_fd,remote_ip_uint32,remote_port); if (ret != 0) { - mylog(log_warn, "[%s:%d]add udp_fd error\n",my_ntoa(ip_port.ip),ip_port.port); - close(new_udp_fd); - return -1; + mylog(log_warn, "[%s:%d]new_connected_socket failed\n",my_ntoa(ip_port.ip),ip_port.port); + continue; } - fd64_t fd64 = fd_manager.insert_fd(new_udp_fd); + fd64_t fd64 = fd_manager.create(new_udp_fd); ev.events = EPOLLIN; ev.data.u64 = fd64; ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_udp_fd, &ev); conn_info.conv_manager.insert_conv(conv, fd64); - assert(!conn_manager.exist_fd64(fd64)); + fd_manager.get_info(fd64).ip_port=ip_port; + //assert(!conn_manager.exist_fd64(fd64)); - conn_manager.insert_fd64(fd64,ip_port); + //conn_manager.insert_fd64(fd64,ip_port); } fd64_t fd64= conn_info.conv_manager.find_u64_by_conv(conv); //int fd=fd_manager.fd64_to_fd(fd64); @@ -438,37 +434,27 @@ int server_event_loop() char data[buf_len]; int data_len; fd64_t fd64=events[idx].data.u64; - if(!fd_manager.fd64_exist(fd64)) + if(!fd_manager.exist(fd64)) //fd64 has been closed { continue; } - int fd=fd_manager.fd64_to_fd(fd64); - if(!conn_manager.exist_fd64(fd64)) //this can happen,when fd is a just closed fd - { - mylog(log_debug,"fd no longer exists in udp_fd_mp,udp fd64 %lld\n",fd64); - recv(fd,0,0,0); - continue; - } - ip_port_t ip_port=conn_manager.find_by_fd64(fd64); - conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port); - if(!conn_manager.exist_ip_port(ip_port))//TODO remove this for peformance - { - mylog(log_fatal,"ip port no longer exits 2!!!this shouldnt happen\n"); - myexit(-1); - } + //assert(conn_manager.exist_fd64(fd64)); + + assert(fd_manager.exist_info(fd64)); + ip_port_t ip_port=fd_manager.get_info(fd64).ip_port; + + assert(conn_manager.exist_ip_port(ip_port)); + + conn_info_t* p_conn_info=conn_manager.find_insert_p(ip_port); conn_info_t &conn_info=*p_conn_info; - if(!conn_info.conv_manager.is_u64_used(fd)) - { - mylog(log_debug,"conv no longer exists,udp fd %d\n",fd); - int recv_len=recv(fd,0,0,0); ///////////TODO ,delete this - continue; - } + assert(conn_info.conv_manager.is_u64_used(fd64)); - u32_t conv_id=conn_info.conv_manager.find_conv_by_u64(fd); + u32_t conv=conn_info.conv_manager.find_conv_by_u64(fd64); + int fd=fd_manager.to_fd(fd64); data_len=recv(fd,data,max_data_len,0); mylog(log_trace,"received a packet from udp_fd,len:%d\n",data_len); @@ -485,6 +471,13 @@ int server_event_loop() mylog(log_warn,"huge packet,data len=%d (>=%d).strongly suggested to set a smaller mtu at upper level,to get rid of this warn\n ",data_len,mtu_warn); } + char *new_data; + int new_len; + put_conv(conv,data,data_len,new_data,new_len); + dest_t dest; + dest.type=type_ip_port; + dest.inner.ip_port=ip_port; + my_send(dest,new_data,new_len); ////////todo send data } else diff --git a/packet.cpp b/packet.cpp index 3db5ab1..e912152 100644 --- a/packet.cpp +++ b/packet.cpp @@ -26,6 +26,8 @@ char key_string[1000]= "secret key"; int local_listen_fd=-1; + + struct anti_replay_t { u64_t max_packet_received; @@ -244,8 +246,8 @@ int my_send(dest_t &dest,char *data,int len) } case type_fd64: { - if(!fd_manager.fd64_exist(dest.inner.fd64)) return -1; - int fd=fd_manager.fd64_to_fd(dest.inner.fd64); + if(!fd_manager.exist(dest.inner.fd64)) return -1; + int fd=fd_manager.to_fd(dest.inner.fd64); return send_fd(fd,data,len,0); break; } diff --git a/packet.h b/packet.h index f30bfad..772bd11 100644 --- a/packet.h +++ b/packet.h @@ -24,36 +24,6 @@ extern int random_drop; extern int local_listen_fd; -enum dest_type{none=0,type_ip_port,type_fd64,type_fd}; - - -struct ip_port_t -{ - u32_t ip; - int port; - void from_u64(u64_t u64) - { - ip=get_u64_h(u64); - port=get_u64_l(u64); - } - u64_t to_u64() - { - return pack_u64(ip,port); - } - -}; -union inner_t -{ - ip_port_t ip_port; - int fd; - fd64_t fd64; -}; -struct dest_t -{ - dest_type type; - inner_t inner; -}; - int my_send(dest_t &dest,char *data,int len); void encrypt_0(char * input,int &len,char *key);