Skip to content

Commit

Permalink
NIO demo
Browse files Browse the repository at this point in the history
  • Loading branch information
xuchang1 committed Jul 8, 2019
1 parent d3420a8 commit 9ac14f6
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 0 deletions.
93 changes: 93 additions & 0 deletions src/main/java/nio_demo/demo1/NIOServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package nio_demo.demo1;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOServer {

public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8081));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
selector.select();
//获取注册的channel
Set<SelectionKey> keys = selector.selectedKeys();
//遍历所有的key
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isValid()) {
//如果通道上有事件发生
if (key.isAcceptable()) {
//获取该通道
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println(System.currentTimeMillis() + " Accept request from : " + socketChannel.getRemoteAddress());
//同时将SelectionKey标记为可读,以便读取。
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
//利用SelectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
//Processor对象为自定义处理请求的类
readKey.attach(new Processor());
} else if (key.isReadable()) {
System.out.println(((SocketChannel)key.channel()).getRemoteAddress() + "触发读事件");
Processor processor = (Processor) key.attachment();
processor.process(key);
}
}
}
}
}
}

/**
* Processor类中设置一个线程池来处理请求,
* 这样就可以充分利用多线程的优势
*/
class Processor {
private static final ExecutorService service = Executors.newFixedThreadPool(2);

public void process(final SelectionKey selectionKey) {
service.submit(new Runnable() {
@Override
public void run() {
ByteBuffer buffer = null;
SocketChannel socketChannel = null;
try {
buffer = ByteBuffer.allocate(1024);
socketChannel = (SocketChannel) selectionKey.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
selectionKey.cancel();
System.out.println("Read ended");
} else if (count == 0) {
}
} catch (IOException e) {
try {
socketChannel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
selectionKey.cancel();
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + " 接收消息 : " + new String(buffer.array()));
}
});
}
}
46 changes: 46 additions & 0 deletions src/main/java/nio_demo/demo1/NioClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package nio_demo.demo1;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NioClient {
public static void main(String[] args) throws IOException, InterruptedException {
Selector selector = Selector.open();
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress("127.0.0.1", 8081));

channel.register(selector, SelectionKey.OP_CONNECT);

while (true) {
selector.select();

Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isConnectable()) {
if (channel.isConnectionPending()) {
if (channel.finishConnect()) {
/*channel.write(ByteBuffer.wrap("1234".getBytes()));
System.out.println(System.currentTimeMillis() + "发送数据 : 1234");
Thread.sleep(5000);
System.out.println(System.currentTimeMillis() + "发送数据 : 5678");
channel.write(ByteBuffer.wrap("5678".getBytes()));*/
}
} else {
channel.connect(new InetSocketAddress("127.0.0.1", 8081));
}
}

}
}

}
}

0 comments on commit 9ac14f6

Please sign in to comment.