===== server ===== package nio;// $Id$ import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NioServer { private int ports[]; private ByteBuffer echoBuffer = ByteBuffer.allocate(1024); public NioServer(int ports[]) throws IOException { this.ports = ports; go(); } static public void main(String args[]) throws Exception { // 同时监听 10000,10001端口 int ports[] = new int[]{10000, 10001}; new NioServer(ports); } private void go() throws IOException { // Create a new selector Selector selector = Selector.open(); // Open a listener on each port, and register each one // with the selector for (int i = 0; i < ports.length; ++i) { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ServerSocket ss = ssc.socket(); InetSocketAddress address = new InetSocketAddress(ports[i]); ss.bind(address); SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT); System.out.println("Going to listen on " + ports[i]); } while (true) { // 获取事件、阻塞到有事件到来为止(可能同时有多个事件到来) selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ); System.out.println("Got connection from " + sc); } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); // Echo data int bytesEchoed = 0; while (true) { echoBuffer.clear(); int r = sc.read(echoBuffer); if (r <= 0) { break; } echoBuffer.flip(); byte b[] = new byte[r]; echoBuffer.get(b); System.out.println("Echoed " + new String(b)); sc.write(ByteBuffer.wrap(b)); bytesEchoed += r; } } it.remove(); } } } } ===== client ===== package nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NioClient extends Thread{ //管道管理器 private Selector selector; private String host; private int port; private String msg; public NioClient(String host, int port, String msg){ this.msg = msg; this.host = host; this.port = port; } public void run(){ try { init(host,port).listen(); } catch (IOException e) { e.printStackTrace(); } } public NioClient init(String serverIp, int port) throws IOException { //获取socket通道 SocketChannel channel = SocketChannel.open(); //设置为非阻塞 channel.configureBlocking(false); //创建通道管理器,可以管理多个通道 selector=Selector.open(); //客户端连接服务器,需要调用channel.finishConnect();才能实际完成连接。 channel.connect(new InetSocketAddress(serverIp, port)); //为该通道注册SelectionKey.OP_CONNECT事件 channel.register(selector, SelectionKey.OP_CONNECT); return this; } public void listen() throws IOException{ System.out.println("客户端启动"); //轮询访问selector while(true){ //选择注册过的io操作的事件,这是一个阻塞操作,直到有事件发生,可能有多个事件同时发生 selector.select(); //获取事件列表 Iterator ite = selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = ite.next(); if(key.isConnectable()){//(第一次为SelectionKey.OP_CONNECT) SocketChannel channel=(SocketChannel)key.channel(); //如果正在连接,则完成连接 if(channel.isConnectionPending()){ channel.finishConnect(); } channel.configureBlocking(false); //向服务器发送消息 channel.write(ByteBuffer.wrap(new String(msg).getBytes())); //连接成功后,注册接收服务器消息的事件 channel.register(selector, SelectionKey.OP_READ); System.out.println("客户端连接成功"); }else if(key.isReadable()){ //有可读数据事件。 SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(msg.length()); channel.read(buffer); byte[] data = buffer.array(); String message = new String(data); System.out.println("recevie: " + message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } ByteBuffer outbuffer = ByteBuffer.wrap(data); channel.write(outbuffer); } //删除已选的key,防止重复处理 ite.remove(); } } } public static void main(String[] args) throws IOException { Thread thread1 = new Thread(new NioClient("127.0.0.1", 10000, "123")); Thread thread2 = new Thread(new NioClient("127.0.0.1", 10001, "abcdefg")); thread1.start(); thread2.start(); } }