• 欧洲杯直播比赛

NIO之多线程协作处理数据读写

关键词:NIO,之多,线程,协作,处理,数据,读写,经过,前面,

经过前面几章的学习,我们已经 能够掌握了JDK NIO的开发方式,我们来总结一下NIO开发的流程: 创建一个服务端通道 ServerSocketChannel 创建一个选择器 Selector 将服务端通道注册到选择器上

  • 经过前面几章的学习,我们已经 能够掌握了JDK NIO的开发方式,我们来总结一下NIO开发的流程:

    创建一个服务端通道 ServerSocketChannel 创建一个选择器 Selector 将服务端通道注册到选择器上,并且关注我们感兴趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 绑定服务管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989)); 开始进行事件选择,选择我们感兴趣的事件做对应的操作!

    具体的代码信息请参照第一章:多路复用模型章节,这里不做太多的赘述!

    有关多路复用的概念,我们也在第一章进行了分析。多路复用模型能够最大限度的将一个线程的执行能力榨干,一条线程执行所有的数据,包括新连接的接入、数据的读取、计算与回写,但是假设,我们的数据计算及其缓慢,那么该任务的执行就势必影响下一个新链接的接入!

    传统NIO单线程模型

    单线程的NIO模型

    如图,我们能了解到,单线程情况下,读事件因为要做一些业务性操作(数据库连接、图片、文件下载)等操作,导致线程阻塞再,读事件的处理上,此时单线程程序无法进行下一次新链接的处理!我们对该线程模型进行优化,select事件处理封装为任务,提交到线程池!

    NIO多线程模型

    上面的这种数据结构能够解决掉因为计算任务耗时过长,导致新链接接入阻塞的问题,我们能否再次进行一次优化呢?

    我们能否创建多个事件选择器,每个事件选择器,负责不同的Socket连接,就像下面这种:

    NIO多线程优化模型

    这样我们就可以每一个Select选择器负责多个客户端Socket连接,主线程只需要将客户端新连接选择一个选择器注册到select选择器上就可以了!所以我们的架构图,就变成了下图这样:

    我们在select选择器内部处理计算任务的时候,也可以将任务封装为task,提交到线程池里面去,彻底将新连接接入和读写事件处理分离开,互不影响!事实上,这也是Netty的核心思想之一,我们可以根据上面的图例,自己简单写一个:

    代码实现

    构建一个事件执行器 对应上图的select选择器

    /**  * Nio事件处理器  *  * @author huangfu  * @date  */ public class MyNioEventLoop implements Runnable {     static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128);     private final Selector selector;     private final LinkedBlockingQueue<Runnable> linkedBlockingQueue;     public MyNioEventLoop(Selector selector) {         this.selector = selector;         linkedBlockingQueue = new LinkedBlockingQueue<>();     }      public Selector getSelector() {         return selector;     }      public LinkedBlockingQueue<Runnable> getLinkedBlockingQueue() {         return linkedBlockingQueue;     }      //忽略  hashCode和eques      /**      * 任务处理器      */     @Override     public void run() {         while (!Thread.currentThread().isInterrupted()) {             try {                 //进行事件选择  这里我们只处理读事件                 if (selector.select() > 0) {                     Set<SelectionKey> selectionKeys = selector.selectedKeys();                     Iterator<SelectionKey> iterator = selectionKeys.iterator();                     //处理读事件                     while (iterator.hasNext()) {                         SelectionKey next = iterator.next();                         iterator.remove();                         if (next.isReadable()) {                             SocketChannel channel = (SocketChannel) next.channel();                             int read = channel.read(ALLOCATE);                             if(read > 0) {                                 System.out.printf("线程%s【%s】发来消-息:",Thread.currentThread().getName(), channel.getRemoteAddress());                                 System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8));                             }else if(read == -1) {                                 System.out.println("连接断开");                                 channel.close();                             }                             ALLOCATE.clear();                         }                     }                     selectionKeys.clear();                 }else {                     //处理异步任务  进行注册                     while (!linkedBlockingQueue.isEmpty()) {                         Runnable take = linkedBlockingQueue.take();                         //异步事件执行                         take.run();                     }                 }             } catch (IOException | InterruptedException e) {                 e.printStackTrace();             }         }     } } 

    构建一个选择器组

    /**  * 选择器组  *  * @author huangfu  * @date 2021年3月12日09:44:37  */ public class SelectorGroup {     private final List<MyNioEventLoop> SELECTOR_GROUP = new ArrayList<>(8);     private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();     private final static AtomicInteger IDX = new AtomicInteger();      /**      * 初始化选择器      * @param count 处理器数量      * @throws IOException 异常欣喜      */     public SelectorGroup(int count) throws IOException {          for (int i = 0; i < count; i++) {             Selector open = Selector.open();             MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open);             SELECTOR_GROUP.add(myNioEventLoop);         }     }      public SelectorGroup() throws IOException {         this(AVAILABLE_PROCESSORS << 1);     }      /**      * 轮询获取一个选择器      * @return 返回一个选择器      */     public MyNioEventLoop next(){         int andIncrement = IDX.getAndIncrement();         int length = SELECTOR_GROUP.size();          return SELECTOR_GROUP.get(Math.abs(andIncrement % length));     } } 

    构建一个执行器记录器

    /**  * @author huangfu  * @date  */ public class ThreadContext {     /**      * 记录当前使用过的选择器      */     public static final Set<MyNioEventLoop> RUN_SELECT = new HashSet<>(); } 

    构建一个新连接接入选择器

    /**  * 连接器  *  * @author huangfu  * @date 2021年3月12日10:15:37  */ public class Acceptor implements Runnable {     private final ServerSocketChannel serverSocketChannel;     private final SelectorGroup selectorGroup;      public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) {         this.serverSocketChannel = serverSocketChannel;         this.selectorGroup = selectorGroup;     }       @Override     public void run() {         try {             SocketChannel socketChannel = serverSocketChannel.accept();             MyNioEventLoop next = selectorGroup.next();              //向队列追加一个注册任务             next.getLinkedBlockingQueue().offer(() -> {                 try {                     //客户端注册为非阻塞                     socketChannel.configureBlocking(false);                     //注册到选择器 关注一个读事件                     socketChannel.register(next.getSelector(), SelectionKey.OP_READ);                 } catch (Exception e) {                     e.printStackTrace();                 }             });             //唤醒对应的任务,让其处理异步任务             next.getSelector().wakeup();               System.out.println("检测到连接:" + socketChannel.getRemoteAddress());             //当当前选择器已经被使用过了  就不再使用了,直接注册就行了             if (ThreadContext.RUN_SELECT.add(next)) {                 //启动任务                 new Thread(next).start();             }           } catch (IOException e) {             e.printStackTrace();         }     } } 
    创建启动器
    /**  * @author huangfu  * @date  */ public class TestMain {      public static void main(String[] args) throws IOException {         //创建一个选择器组   传递选择器组的大小 决定使用多少选择器来实现         SelectorGroup selectorGroup = new SelectorGroup(2);         //开启一个服务端管道         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();         //开启一个服务端专用的选择器         Selector selector = Selector.open();         //设置非阻塞         serverSocketChannel.configureBlocking(false);         //创建一个连接器         Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup);         //将服务端通道注册到服务端选择器上  这里会绑定一个新连接接入器         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor);         //绑定端口         serverSocketChannel.bind(new InetSocketAddress(8989));         //启动处理器         new Reactor(selector).run();     } } 
    总结

    单线程下的NIO存在性能瓶颈,当某一计算过程缓慢的时候会阻塞住整个线程,导致影响其他事件的处理!

    为了解决这一缺陷,我们提出了使用异步线程的方式去操作任务,将耗时较长的业务,封装为一个异步任务,提交到线程池执行!

    为了使业务操作和新连接接入完全分离开,我们做了另外一重优化,我们封装了一个选择器组,轮询的方式获取选择器,每一个选择器都能够处理多个新连接, socket连接->selector选择器 = 多 -> 1,在每一个选择器里面又可以使用线程池来处理任务,进一步提高吞吐量!

    【编辑推荐】

    鸿蒙官方战略合作共建——HarmonyOS技术社区 Redisson 分布式锁源码之一:可重入锁加锁 大厂经典面试题:Redis为什么这么快? Java项目实战上卷[SpringBoot/SpringCloud/RabbitMQ/Redis] Redisson 分布式锁源码之二:看门狗 10+款Redis容器化技术选型对比,K8S并非万金油
发表时间:2021-06-29 | 评论 () | 复制本页地址 | 打印

相关文章