注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

0与1构筑世界,程序员创造时代

软件架构设计 Java编程

 
 
 

日志

 
 

Reactor模式与非阻塞I/O | Reactor Pattern and NIO  

2013-11-04 19:23:40|  分类: Java |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
Reactor模式描述:The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

网络请求响应的整个流程,根据其职责进行划分,可以拆分成如下几个步骤:
  1. 接收请求(Acceptor)。
  2. 读取请求数据(Reader)
  3. 解析请求数据(Decoder)
  4. 处理业务逻辑(Process Service | Compute)
  5. 封装响应数据(Encoder)
  6. 发送响应(Writer | Sender
有些童鞋可能要说了,Netty就这是这样划分职责,分成多个组件联合处理网络请求和响应。没错,Netty实现了Reactor模式,通过事件驱动机制(也可以说是好莱坞原则:不要给我们打电话,我们会打电话通知你),非常高效。
前一篇文章“Java网络处理模型-非阻塞I/O+单线程”介绍了如何用NIO实现一个Echo Server。在这里,对它进行重构,按照上面的6个处理步骤,将NioEchoServer拆分成几个组件,分工协作:
  • Reactor:启动、停止服务;分派ACCEPT事件给Acceptor;分派READ事件给Reader。
  • Acceptor:处理新的客户端连接,将Reader注册并关注READ事件。
  • Reader:读取请求数据。调用Decoder解析数据。
  • Decoder:解码器。LineDecoder是一个按"行"解析的解码器。
  • ProcessService:业务逻辑处理。
  • Writer:发送响应数据。调用Encoder封装响应数据。
  • Encoder:编码器。LineEncoder是一个将响应数据转换成"行"的编码器。
Reactor模式与非阻塞I/O | Reactor Pattern and NIO - 傲风 - 宝剑锋从磨砺出 梅花香自苦寒来

代码示例:

注:下面的代码未完整地实现Reactor模式,只是实现了职责划分,未实现事件驱动。
Reactor.java 源代码:
package cn.aofeng.demo.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 负责Echo Server启动和停止 ,ACCEPT和READ事件的分派。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public class Reactor {
    
    private final static Logger logger = Logger.getLogger(Reactor.class.getName());
    
    // 监听端口
    private int _port;
    
    // {@link Selector}检查通道就绪状态的超时时间(单位:毫秒)
    private int _selectTimeout = 3000;
    
    // 服务运行状态
    private volatile boolean _isRun = true;
    
    /**
     @param port 服务监听端口。
     */
    public Reactor(int port) {
        this._port = port;
    }
    
    public void setSelectTimeout(int selectTimeout) {
        this._selectTimeout = selectTimeout;
    }
    
    /**
     * 启动服务。
     */
    public void start() {
        ServerSocketChannel serverChannel = null;
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket serverSocket = serverChannel.socket();
            serverSocket.bind(new InetSocketAddress(_port));
            _isRun = true;
            if (logger.isLoggable(Level.INFO)) {
                logger.info("NIO echo网络服务启动完毕,监听端口:" +_port);
            }
            
            Selector selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, serverChannel));
            
            while (_isRun) {
                int selectNum = selector.select(_selectTimeout);
                if (== selectNum) {
                    continue;
                }
                
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = (SelectionKeyit.next();
                    
                    // 接受新的Socket连接
                    if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                         Acceptor acceptor = (AcceptorselectionKey.attachment();
                         acceptor.accept();
                    }
                    
                    // 读取并处理Socket的数据
                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        Reader reader = (ReaderselectionKey.attachment();
                        reader.read();
                    }
                    
                    // 移除已经处理过的Key
                    it.remove();
                // end of while iterator
            }
        catch (IOException e) {
            logger.log(Level.SEVERE, "处理网络连接出错", e);
        }
    }
    
    /**
     * 停止服务。
     */
    public void stop() {
        _isRun = false;
    }
    
    public static void main(String[] args) {
        if (!= args.length) {
            logger.severe("无效参数。使用示例:\n    java cn.aofeng.demo.reactor.Reactor 9090");
            System.exit(-1);
        }
        int port = Integer.parseInt(args[0]);
        int selectTimeout = 1000;
        
        Reactor reactor = new Reactor(port);
        reactor.setSelectTimeout(selectTimeout);
        reactor.start();
    }

}

Acceptor.java 源代码:
package cn.aofeng.demo.reactor;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 负责处理新连入的客户端Socket连接。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public class Acceptor {

    private final static Logger _logger = Logger.getLogger(Acceptor.class.getName());
    
    protected Selector _selector;
    
    protected ServerSocketChannel _serverChannel;
    
    public Acceptor(Selector selector, ServerSocketChannel serverChannel) {
        this._selector = selector;
        this._serverChannel = serverChannel;
    }
    
    /**
     * 接收一个新连入的客户端Socket连接,交给{@link Reader}处理:{@link Reader}{@link Selector}注册并关注READ事件。
     
     @throws IOException
     */
    public void accept() throws IOException {
        SocketChannel clientChannel = _serverChannel.accept();
        if (null != clientChannel) {
            if (_logger.isLoggable(Level.INFO)) {
                _logger.info("收到一个新的连接,客户端IP:"+clientChannel.socket().getInetAddress().getHostAddress()
                        +",客户端Port:"+clientChannel.socket().getPort());
            }
            clientChannel.configureBlocking(false);
            Reader reader = new Reader(_selector, clientChannel);
            reader.setDecoder(new LineDecoder());
            clientChannel.register(_selector, SelectionKey.OP_READ, reader);
        }
    }

}

Reader.java 源代码:
package cn.aofeng.demo.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 负责读取客户端的请求数据并解析。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public class Reader {

    private final static Logger _logger = Logger.getLogger(Reader.class.getName());
    
    private SocketChannel _clientChannel;
    
    private Decoder _decoder;
    
    private final static int BUFFER_SIZE = 512;
    
    private ByteBuffer _buffer = ByteBuffer.allocate(BUFFER_SIZE);
    
    public Reader(Selector selector, SocketChannel clientChannel) {
        this._clientChannel = clientChannel;
    }
    
    public void setDecoder(Decoder decoder) {
        this._decoder = decoder;
    }
    
    public void read() throws IOException {
        int readCount = _clientChannel.read(_buffer);
        if (-== readCount) {
            _clientChannel.close();
        }
        
        _buffer.flip();
        int oldLimit = _buffer.limit();
        String line = null;
        while( (line = (String_decoder.decode(_buffer)) != null ) { // 处理一次多行发送过来的情况
            if (_logger.isLoggable(Level.FINE)) {
                _logger.fine("收到的数据:"+line);
            }
            
            // 处理业务逻辑
            ProcessService service= new ProcessService(_clientChannel, line);
            String result = service.execute();
            
            // 发送响应
            Writer writer = new Writer(_clientChannel, result);
            writer.setEncoder(new LineEncoder());
            writer.write();
            
            // 重建临时数据缓冲区
            rebuildBuffer(line.length());
        }
        
        // 缓冲区数据还没有符合一个decode数据的条件,重置数据缓冲区的状态方便append数据
        if (oldLimit  == _buffer.limit()) {
            resetBuffer();
        }
    }
    
    private void resetBuffer() {
        _buffer.position(_buffer.limit());
        _buffer.limit(_buffer.capacity());
    }

    /**
     * 重建临时数据缓冲区。
     
     @param lineSize 收到的一行数据(不包括行结束符)的长度 
     */
    private void rebuildBuffer(int lineSize) {
        if (_buffer.limit() == lineSize) {
            // 数据刚好是一行
            _buffer = ByteBuffer.allocate(BUFFER_SIZE);
        else if (_buffer.limit() > lineSize) {
            // 数据多于一行
            byte[] temp = new byte[_buffer.limit() - lineSize];
            System.arraycopy(_buffer.array(), lineSize, temp, 0, temp.length);
            _buffer = ByteBuffer.allocate(BUFFER_SIZE);
            _buffer.put(temp);
            _buffer.flip();
        else {
            // nothing
        }
    }

}

Decoder.java 源代码:
package cn.aofeng.demo.reactor;

import java.nio.ByteBuffer;

/**
 * 请求数据解析器接口定义。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public interface Decoder {

    /**
     * 解析请求数据,不影响源数据的状态和内容。
     
     @param source {@link Reader}读取到的源数据字节数组
     @return 如果解析到符合要求的数据,则返回解析到的数据;否则返回null。
     */
    public Object decode(ByteBuffer source);

}

LineDecoder.java 源代码:
package cn.aofeng.demo.reactor;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 行数据解析器。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public class LineDecoder implements Decoder {

    private final static Logger _logger = Logger.getLogger(LineDecoder.class.getName());
    
    /**
     * 从字节缓冲区中获取"一行"。
     
     @param buffer 输入缓冲区
     @return 有遇到行结束符,返回不包括行结束符的字符串;否则返回null。
     */
    @Override
    public String decode(ByteBuffer source) {
        int index = 0;
        boolean findCR = false;
        int len = source.limit();
        byte[] bytes = source.array();
        while(index < len) {
            index ++;
            
            byte temp = bytes[index-1];
            if (Constant.CR == temp) {
                findCR = true;
            }
            if (Constant.LF == temp && findCR) { // 找到了行结束符
                byte[] copy = new byte[index];
                System.arraycopy(bytes, 0, copy, 0, index);
                try {
                    return new String(copy, Constant.CHARSET_UTF8);
                catch (UnsupportedEncodingException e) {
                    _logger.log(Level.SEVERE, "将解析完成的请求数据转换成字符串出错", e);
                }
            }
        }
        
        return null;
    }

}

ProcessService.java 源代码:
package cn.aofeng.demo.reactor;

import java.io.IOException;
import java.nio.channels.SocketChannel;

/**
 * 业务逻辑处理。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public class ProcessService {

    private SocketChannel _clientChannel;
    
    private String _line;
    
    public ProcessService(SocketChannel clientChannel, String line) {
        this._clientChannel = clientChannel;
        this._line = line;
    }
    
    public String execute() {
        // 判断客户端是否发送了退出指令
        String content = _line.substring(0, _line.length()-2);
        if (isCloseClient(content)) {
            try {
                _clientChannel.close();
            catch (IOException e) {
                // nothing
            }
        }
        
        return _line;
    }
    
    /**
     * 客户端是否发送了退出指令("quit" | "exit")。
     
     @param str 收到的客户端数据
     @return 返回true表示收到了退出指令;否则返回false。
     */
    private boolean isCloseClient(String str) {
        return "exit".equalsIgnoreCase(str|| "quit".equalsIgnoreCase(str);
    }

}

Writer.java 源代码:
package cn.aofeng.demo.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * 负责向客户端发送响应数据。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public class Writer {

    private SocketChannel _clientChannel;
    
    private Object _data;
    
    private Encoder _encoder;
    
    public Writer(SocketChannel clientChannel, Object data) {
        this._clientChannel = clientChannel;
        this._data = data;
    }
    
    public void setEncoder(Encoder encoder) {
        this._encoder = encoder;
    }
    
    public void write() throws IOException {
        if (null == _data || !_clientChannel.isOpen()) {
            return;
        }
        
        ByteBuffer buffer = _encoder.encode(_data);
        if (null == buffer) {
            return;
        }
        _clientChannel.write(buffer);
    }

}

Encoder.java 源代码:
package cn.aofeng.demo.reactor;

import java.nio.ByteBuffer;

/**
 * 响应数据封装接口定义。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public interface Encoder {

    /**
     * 将源数据转换成{@link ByteBuffer}
     
     @param source 源数据
     @return {@link ByteBuffer}对象。
     */
    public ByteBuffer encode(Object source);

}

LineEncoder.java 源代码:
package cn.aofeng.demo.reactor;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 将字符串转换成{@link ByteBuffer}并加上行结束符。
 
 @author <a href="mailto:aofengblog@163.com">NieYong </a>
 */
public class LineEncoder implements Encoder {

    private final static Logger logger = Logger.getLogger(LineEncoder.class.getName());
    
    @Override
    public ByteBuffer encode(Object source) {
        String line = (Stringsource;
        try {
            ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(Constant.CHARSET_UTF8));
            
            return buffer;
        catch (UnsupportedEncodingException e) {
            logger.log(Level.SEVERE, "将响应数据转换成ByteBuffer出错", e);
        }
        
        return null;
    }

}

<正文结束>
文章声明


作者:傲风(aofengblog@163.com)       编写时间:2013年11月03日

网址:http://aofengblog.blog.163.com

作者保留所有权利,转载请保留文章全部内容或者说明原作者和转载地址!

  评论这张
 
阅读(2734)| 评论(6)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017