用户工具


server

package nio;// $Id$

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NioServer {
    private int ports[];
    private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);

    public NioServer(int ports[]) throws IOException {
        this.ports = ports;
        go();
    }

    static public void main(String args[]) throws Exception {
        // 同时监听 10000,10001端口
        int ports[] = new int[]{10000, 10001};
        new NioServer(ports);
    }

    private void go() throws IOException {
        // Create a new selector
        Selector selector = Selector.open();

        // Open a listener on each port, and register each one
        // with the selector
        for (int i = 0; i < ports.length; ++i) {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(ports[i]);
            ss.bind(address);

            SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("Going to listen on " + ports[i]);
        }

        while (true) {
            // 获取事件、阻塞到有事件到来为止(可能同时有多个事件到来)
            selector.select();
            Set selectedKeys = selector.selectedKeys();
            Iterator it = selectedKeys.iterator();

            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();

                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    // Accept the new connection
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    // Add the new connection to the selector
                    SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);

                    System.out.println("Got connection from " + sc);
                } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                    // Read the data
                    SocketChannel sc = (SocketChannel) key.channel();

                    // Echo data
                    int bytesEchoed = 0;
                    while (true) {
                        echoBuffer.clear();
                        int r = sc.read(echoBuffer);

                        if (r <= 0) {
                            break;
                        }

                        echoBuffer.flip();
                        byte b[] = new byte[r];
                        echoBuffer.get(b);
                        System.out.println("Echoed " + new String(b));

                        sc.write(ByteBuffer.wrap(b));
                        bytesEchoed += r;
                    }
                }
                it.remove();
            }
        }
    }
}

client

package nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NioClient extends Thread{
    //管道管理器
    private Selector selector;
    private String host;
    private int port;
    private String msg;

    public NioClient(String host, int port, String msg){
        this.msg = msg;
        this.host = host;
        this.port = port;
    }

    public void run(){
        try {
            init(host,port).listen();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public NioClient init(String serverIp, int port) throws IOException {
        //获取socket通道
        SocketChannel channel = SocketChannel.open();
        //设置为非阻塞
        channel.configureBlocking(false);
        //创建通道管理器,可以管理多个通道
        selector=Selector.open();
        //客户端连接服务器,需要调用channel.finishConnect();才能实际完成连接。
        channel.connect(new InetSocketAddress(serverIp, port));
        //为该通道注册SelectionKey.OP_CONNECT事件
        channel.register(selector, SelectionKey.OP_CONNECT);
        return this;
    }

    public void listen() throws IOException{
        System.out.println("客户端启动");
        //轮询访问selector
        while(true){
            //选择注册过的io操作的事件,这是一个阻塞操作,直到有事件发生,可能有多个事件同时发生
            selector.select();
            //获取事件列表
            Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
            while(ite.hasNext()){
                SelectionKey key = ite.next();
                if(key.isConnectable()){//(第一次为SelectionKey.OP_CONNECT)
                    SocketChannel channel=(SocketChannel)key.channel();
                    //如果正在连接,则完成连接
                    if(channel.isConnectionPending()){
                        channel.finishConnect();
                    }

                    channel.configureBlocking(false);
                    //向服务器发送消息
                    channel.write(ByteBuffer.wrap(new String(msg).getBytes()));

                    //连接成功后,注册接收服务器消息的事件
                    channel.register(selector, SelectionKey.OP_READ);
                    System.out.println("客户端连接成功");
                }else if(key.isReadable()){ //有可读数据事件。
                    SocketChannel channel = (SocketChannel)key.channel();

                    ByteBuffer buffer = ByteBuffer.allocate(msg.length());
                    channel.read(buffer);
                    byte[] data = buffer.array();
                    String message = new String(data);
                    System.out.println("recevie: " + message);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    ByteBuffer outbuffer = ByteBuffer.wrap(data);
                    channel.write(outbuffer);
                }
                //删除已选的key,防止重复处理
                ite.remove();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        Thread thread1 = new Thread(new NioClient("127.0.0.1", 10000, "123"));
        Thread thread2 = new Thread(new NioClient("127.0.0.1", 10001, "abcdefg"));

        thread1.start();
        thread2.start();
    }
}