注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

0与1构筑世界,程序员创造时代

软件架构设计 Java编程

 
 
 

日志

 
 

Java网络处理模型-非阻塞I/O+单线程  

2013-10-26 21:24:53|  分类: Java |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
"阻塞I/O+线程池"网络模型虽然比"阻塞I/O+多线程"网络模型在性能方面有提升,但这两种模型都存在一个共同的问题:面对大并发(持续大量连接同时请求)的场景,需要消耗大量的线程维持连接。CPU在大量的线程之间频繁切换,性能损耗很大。一旦单机的连接超过1万,甚至达到几万的时候,服务器的性能会急剧下降。
而NIO的Selector却很好地解决了这个问题,用“一个线程或者是CPU个数的线程”(主线程)hold住所有的连接,管理和读取客户端连接的数据,将读取的数据交给后面的线程池处理,线程池处理完业务逻辑后,将结果交给主线程发送响应给客户端,少量的线程就可以处理大量连接的请求。

Java网络处理模型-非阻塞+单线程 - 傲风 - 宝剑锋从磨砺出 梅花香自苦寒来 
单个线程hold住所有连接并处理请求和响应

代码示例

示例代码只实现一个线程负责所有的事务:接收请求,解析请求,业务逻辑处理,响应数据封闭,发送响应。
NioEchoServer.java 源代码:
package cn.aofeng.demo.nio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 用NIO实现的Echo Server。
 @author NieYong <aofengblog@163.com>
 */
public class NioEchoServer {
    
    private final static Logger logger = Logger.getLogger(NioEchoServer.class.getName());
    
    // 换行符
    public final static char CR = '\r';
    
    // 回车符
    public final static char LF = '\n';
    
    /**
     @return 当前系统的行结束符
     */
    private static String getLineEnd() {
        return System.getProperty("line.separator");
    }
    
    /**
     * 重置缓冲区状态标志位:position设置为0,limit设置为capacity的值,所有mark无效。
     * 注:缓冲区原来的内容还在,并没有清除。
     
     @param buffer 字节缓冲区
     */
    private static void clear(ByteBuffer buffer) {
        if (null != buffer) {
            buffer.clear();
        }
    }
    
    /**
     * 将字节缓冲区的每一个字节转换成ASCII字符。
     @param buffer 字节缓冲区
     @return 转换后的字节数组字符串
     */
    private static String toDisplayChar(ByteBuffer buffer) {
        if (null == buffer) {
            return "null";
        }
        
        return Arrays.toString(buffer.array());
    }
    
    /**
     * 将字节缓冲区用utf8编码,转换成字符串。
     
     @param buffer 字节缓冲区
     @return utf8编码转换的字符串
     @throws UnsupportedEncodingException
     */
    private static String convert2String(ByteBuffer bufferthrows UnsupportedEncodingException {
        return new String(buffer.array()"utf8");
    }
    
    /**
     * 去掉尾末的行结束符(\r\n),并转换成字符串。
     
     @param buffer 字节缓冲区
     @return 返回去掉行结束符后的字符串。
     @throws UnsupportedEncodingException
     @see #convert2String(ByteBuffer)
     */
    private static String getLineContent(ByteBuffer bufferthrows UnsupportedEncodingException {
        if (null == buffer) {
            return null;
        }
        
        byte[] result = new byte[buffer.limit()-2];
        System.arraycopy(buffer.array()0, result, 0, result.length);
        return convert2String(ByteBuffer.wrap(result));
    }
    
    /**
     * 顺序合并两个{@link ByteBuffer}的内容,且不改变{@link ByteBuffer}原来的标志位。即:
     <pre>
     * 合并后的ByteBuffer = first + second
     </pre>
     @param first 第一个待合并的{@link ByteBuffer},合并后其内容在前面
     @param second 第二个待合并的{@link ByteBuffer},合并后其内容在后面
     @return 合并后的内容。如果两个{@link ByteBuffer}都为null,返回null。
     */
    private static ByteBuffer merge(ByteBuffer first, ByteBuffer second) {
        if (null == first && null == second) {
            return null;
        }
        
        int oneSize = null != first ? first.limit() 0;
        int twoSize = null != second ? second.limit() 0;
        ByteBuffer result = ByteBuffer.allocate(oneSize+twoSize);
        if (null != first) {
            result.put(Arrays.copyOfRange(first.array()0, oneSize));
        }
        if (null != second) {
            result.put(Arrays.copyOfRange(second.array()0, twoSize));
        }
        result.rewind();
        
        return result;
    }
    
    /**
     * 从字节缓冲区中获取"一行",即获取包括行结束符及其前面的内容。
     
     @param buffer 输入缓冲区
     @return 有遇到行结束符,返回包括行结束符在内的字节缓冲区;否则返回null。
     */
    private static ByteBuffer getLine(ByteBuffer buffer) {
        int index = 0;
        boolean findCR = false;
        int len = buffer.limit();
        while(index < len) {
            index ++;
            
            byte temp = buffer.get();
            if (CR == temp) {
                findCR = true;
            }
            if (LF == temp && findCR && index > 0) { // 找到了行结束符
                byte[] copy = new byte[index];
                System.arraycopy(buffer.array()0, copy, 0, index);
                buffer.rewind()// 位置复原
                return ByteBuffer.wrap(copy);
            }
        }
        buffer.rewind()// 位置复原
        
        return null;
    }
    
    private static void readData(Selector selector, SelectionKey selectionKeythrows IOException {
        SocketChannel socketChannel = (SocketChannelselectionKey.channel();
        
        // 获取上次已经读取的数据
        ByteBuffer oldBuffer = (ByteBufferselectionKey.attachment();
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("上一次读取的数据:"+oldBuffer+getLineEnd()+toDisplayChar(oldBuffer));
        }
        
        // 读新的数据
        int readNum = 0;
        ByteBuffer newBuffer = ByteBuffer.allocate(1024);
        if ( (readNum = socketChannel.read(newBuffer)) <= ) {
            return;
        }
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("这次读取的数据:"+newBuffer+getLineEnd()+toDisplayChar(newBuffer));
        }
        
        newBuffer.flip();
        ByteBuffer lineRemain = getLine(newBuffer);
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("解析的行数据剩余部分:"+lineRemain+getLineEnd()+toDisplayChar(lineRemain));
        }
        if (null != lineRemain) { // 获取到行结束符
            ByteBuffer completeLine = merge(oldBuffer, lineRemain);
            if (logger.isLoggable(Level.FINE)) {
                logger.fine("准备输出的数据:"+completeLine+getLineEnd()+toDisplayChar(completeLine));
            }
            while (completeLine.hasRemaining()) { // 有可能一次没有写完,需多次写
                socketChannel.write(completeLine);
            }
            
            // 清除数据
            selectionKey.attach(null);
            clear(oldBuffer);
            clear(lineRemain);
            
            // 判断是否退出
            String lineStr = getLineContent(completeLine);
            if (logger.isLoggable(Level.FINE)) {
                logger.fine("判断是否退出的行数据:"+lineStr);
            }
            if ("exit".equalsIgnoreCase(lineStr|| "quit".equalsIgnoreCase(lineStr)) {
                socketChannel.close();
            }
            
            // FIXME 行结束符后面是否还有数据? 此部分代码尚未测试
            if (lineRemain.limit()+< newBuffer.limit()) {
                byte[] temp = new byte[newBuffer.limit() - lineRemain.limit()];
                newBuffer.get(temp, lineRemain.limit(), temp.length);
                
                selectionKey.attach(temp);
            }
        else // 没有读到一个完整的行,继续读并且带上已经读取的部分数据
            ByteBuffer temp = merge(oldBuffer, newBuffer);
            socketChannel.register(selector, SelectionKey.OP_READ, temp)
            
            if (logger.isLoggable(Level.FINE)) {
                logger.fine("暂存到SelectionKey的数据:"+temp+getLineEnd()+toDisplayChar(temp));
            }
        }
    }

    /**
     * 接受新的Socket连接。
     
     @param selector 选择器
     @param selectionKey 
     @return
     @throws IOException
     @throws ClosedChannelException
     */
    private static SocketChannel acceptNew(Selector selector,
            SelectionKey selectionKeythrows IOException,
            ClosedChannelException {
        ServerSocketChannel server = (ServerSocketChannelselectionKey.channel();
        SocketChannel socketChannel = server.accept();
        if (null != socketChannel) {
            if (logger.isLoggable(Level.INFO)) {
                logger.info("收到一个新的连接,客户端IP:"+socketChannel.socket().getInetAddress().getHostAddress()+",客户端Port:"+socketChannel.socket().getPort());
            }
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        }
        
        return socketChannel;
    }
    
    /**
     * 启动服务器。
     
     @param port 服务监听的端口
     @param selectTimeout {@link Selector}检查通道就绪状态的超时时间(单位:毫秒)
     */
    private static void startServer(int port, int selectTimeout) {
        ServerSocketChannel serverChannel = null;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket serverSocket = serverChannel.socket();
            serverSocket.bind(new InetSocketAddress(port));
            if (logger.isLoggable(Level.INFO)) {
                logger.info("NIO echo网络服务启动完毕,监听端口:" +port);
            }
            
            Selector selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            
            while (true) {
                int selectNum = selector.select(selectTimeout);
                if (== selectNum) {
                    continue;
                }
                
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = (SelectionKeyit.next();
                    
                    // 接受新的Socket连接
                    if (selectionKey.isAcceptable()) {
                        acceptNew(selector, selectionKey);
                    }
                    
                    // 读取并处理Socket的数据
                    if (selectionKey.isReadable()) {
                        readData(selector, selectionKey);
                    }
                    
                    it.remove();
                // end of while iterator
            }
        catch (IOException e) {
            logger.log(Level.SEVERE, "处理网络连接出错", e);
        }
    }
    
    public static void main(String[] args) {
        int port = 9090;
        int selectTimeout = 1000;
        
        startServer(port, selectTimeout);
    }

}

验证

1、启动服务。
java cn.aofeng.demo.nio.NioEchoServer 9090
执行上面的命令,启动服务,输出信息:
2013-10-26 20:38:42 cn.aofeng.demo.nio.NioEchoServer startServer
信息: NIO echo网络服务启动完毕,监听端口:9090

2、打开三个终端窗口,执行命令:
telnet 192.168.56.102 9090
服务输出如下信息:
2013-10-26 20:40:55 cn.aofeng.demo.nio.NioEchoServer acceptNew
信息: 收到一个新的连接,客户端IP:192.168.56.101,客户端Port:1211
2013-10-26 20:40:58 cn.aofeng.demo.nio.NioEchoServer acceptNew
信息: 收到一个新的连接,客户端IP:192.168.56.101,客户端Port:1212
2013-10-26 20:41:00 cn.aofeng.demo.nio.NioEchoServer acceptNew
信息: 收到一个新的连接,客户端IP:192.168.56.101,客户端Port:1215
注:服务所在机器的IP地址是192.168.56.102。

3、连接后,三个客户端均发送一段时间的数据,然后发送exit或quit指令,服务端关闭客户端连接。其线程列表及其状态 如下图所示:
Java网络处理模型-非阻塞+单线程 - 傲风 - 宝剑锋从磨砺出 梅花香自苦寒来
可以看到,当有三个客户端连接上来后,NioEchoServer并没有生成其他线程来处理连接,而是全部由main线程完成。

<正文结束>
文章声明


作者:傲风(aofengblog@163.com)       编写时间:2013年10月26日

网址:http://aofengblog.blog.163.com

作者保留所有权利,转载请保留文章全部内容或者说明原作者和转载地址!

  评论这张
 
阅读(2963)| 评论(2)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017