server
package nio;// $Id$
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioServer {
private int ports[];
private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
public NioServer(int ports[]) throws IOException {
this.ports = ports;
go();
}
static public void main(String args[]) throws Exception {
// 同时监听 10000,10001端口
int ports[] = new int[]{10000, 10001};
new NioServer(ports);
}
private void go() throws IOException {
// Create a new selector
Selector selector = Selector.open();
// Open a listener on each port, and register each one
// with the selector
for (int i = 0; i < ports.length; ++i) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress(ports[i]);
ss.bind(address);
SelectionKey key = ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Going to listen on " + ports[i]);
}
while (true) {
// 获取事件、阻塞到有事件到来为止(可能同时有多个事件到来)
selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
System.out.println("Got connection from " + sc);
} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
// Read the data
SocketChannel sc = (SocketChannel) key.channel();
// Echo data
int bytesEchoed = 0;
while (true) {
echoBuffer.clear();
int r = sc.read(echoBuffer);
if (r <= 0) {
break;
}
echoBuffer.flip();
byte b[] = new byte[r];
echoBuffer.get(b);
System.out.println("Echoed " + new String(b));
sc.write(ByteBuffer.wrap(b));
bytesEchoed += r;
}
}
it.remove();
}
}
}
}
client
package nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NioClient extends Thread{
//管道管理器
private Selector selector;
private String host;
private int port;
private String msg;
public NioClient(String host, int port, String msg){
this.msg = msg;
this.host = host;
this.port = port;
}
public void run(){
try {
init(host,port).listen();
} catch (IOException e) {
e.printStackTrace();
}
}
public NioClient init(String serverIp, int port) throws IOException {
//获取socket通道
SocketChannel channel = SocketChannel.open();
//设置为非阻塞
channel.configureBlocking(false);
//创建通道管理器,可以管理多个通道
selector=Selector.open();
//客户端连接服务器,需要调用channel.finishConnect();才能实际完成连接。
channel.connect(new InetSocketAddress(serverIp, port));
//为该通道注册SelectionKey.OP_CONNECT事件
channel.register(selector, SelectionKey.OP_CONNECT);
return this;
}
public void listen() throws IOException{
System.out.println("客户端启动");
//轮询访问selector
while(true){
//选择注册过的io操作的事件,这是一个阻塞操作,直到有事件发生,可能有多个事件同时发生
selector.select();
//获取事件列表
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = ite.next();
if(key.isConnectable()){//(第一次为SelectionKey.OP_CONNECT)
SocketChannel channel=(SocketChannel)key.channel();
//如果正在连接,则完成连接
if(channel.isConnectionPending()){
channel.finishConnect();
}
channel.configureBlocking(false);
//向服务器发送消息
channel.write(ByteBuffer.wrap(new String(msg).getBytes()));
//连接成功后,注册接收服务器消息的事件
channel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
}else if(key.isReadable()){ //有可读数据事件。
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(msg.length());
channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("recevie: " + message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ByteBuffer outbuffer = ByteBuffer.wrap(data);
channel.write(outbuffer);
}
//删除已选的key,防止重复处理
ite.remove();
}
}
}
public static void main(String[] args) throws IOException {
Thread thread1 = new Thread(new NioClient("127.0.0.1", 10000, "123"));
Thread thread2 = new Thread(new NioClient("127.0.0.1", 10001, "abcdefg"));
thread1.start();
thread2.start();
}
}