博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
模仿Tomcat的BIO,NIO线程模型
阅读量:6657 次
发布时间:2019-06-25

本文共 5085 字,大约阅读时间需要 16 分钟。

模仿Tomcat的BIO模型,来一个消息,分配一个线程处理.

则主线程池代码如下
package com.guanjian;

import java.util.ArrayList;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

  • Created by Administrator on 2018/7/10.
    */

public class ThreadPool {

private ExecutorService service;private List
tasks;private int fixedThreadNum = 0;private List
messages;private MessageHandler messageHandler;public ThreadPool(int fixedThreadNum,List
messages,MessageHandler messageHandler) { this.fixedThreadNum = fixedThreadNum; this.messages = messages; this.messageHandler = messageHandler; service = Executors.newFixedThreadPool(fixedThreadNum); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { shutdownGracefully(service); } });}public void shutdownGracefully(ExecutorService ThreadPool) { ShutdownPool.shutdownThreadPool(ThreadPool, "main-pool");}public void startup() { tasks = new ArrayList<>(); MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages)); for (String message:messages) { tasks.add(messageTask); service.execute(messageTask); }}

}

它是通过线程数fixedThreadNum来区分使用哪种线程模型.
package com.guanjian;

/**

  • Created by Administrator on 2018/7/10.
    */

public interface MessageHandler {

public void execute(String message);

}

package com.guanjian;

/**

  • Created by Administrator on 2018/7/10.
    */

public class MessageHandlerImpl implements MessageHandler {

@Overridepublic void execute(String message) {    System.out.println(message);}

}

以上是消息处理器的接口和实现类
package com.guanjian;

import java.util.List;

/**

  • Created by Administrator on 2018/7/10.
    */

public abstract class MessageTask implements Runnable {

protected MessageHandler messageHandler;protected  List
messages;MessageTask(MessageHandler messageHandler,List
messages) { this.messageHandler = messageHandler; this.messages = messages;}@Overridepublic void run() { for (String message:messages) { handlerMessage(message); }}protected abstract void handlerMessage(String message);

}

消息任务抽象类实现了Runnable线程接口,以不同的子类来实现BIO,NIO线程模型,具体在抽象方法handlerMessage中实现.
package com.guanjian;

import java.util.List;

/**

  • Created by Administrator on 2018/7/10.
    */

public class SequentialMessageTask extends MessageTask {

SequentialMessageTask(MessageHandler messageHandler, List
messages) { super(messageHandler, messages);}@Overrideprotected void handlerMessage(String message) { messageHandler.execute(message);}

}

BIO线程模型子类,通过主线程池来分配线程处理.
package com.guanjian;

import java.util.List;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

  • Created by Administrator on 2018/7/10.
    */

public class ConcurrentMessageTask extends MessageTask {

private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);ConcurrentMessageTask(MessageHandler messageHandler, List
messages) { super(messageHandler, messages);}@Overrideprotected void handlerMessage(String message) { asyncService.submit(new Runnable() { @Override public void run() { messageHandler.execute(message); } });}protected void shutdown() { ShutdownPool.shutdownThreadPool(asyncService,"async-pool-" + Thread.currentThread().getId());}

}

NIO线程模型,不再使用主线程池来分配线程,而是异步线程池,类比于Netty中的Worker线程池,从BOSS线程池中接管消息处理.
package com.guanjian;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.TimeUnit;

/**

  • Created by Administrator on 2018/7/10.
    */

public class ShutdownPool {

private static Logger log = LoggerFactory.getLogger(ThreadPool.class);/** * 优雅关闭线程池 * @param threadPool * @param alias */public static void shutdownThreadPool(ExecutorService threadPool, String alias) {    log.info("Start to shutdown the thead pool: {}", alias);    threadPool.shutdown(); // 使新任务无法提交.    try {        // 等待未完成任务结束        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {            threadPool.shutdownNow(); // 取消当前执行的任务            log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");            // 等待任务取消的响应            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))                log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");        }    } catch (InterruptedException ie) {        // 重新取消当前线程进行中断        threadPool.shutdownNow();        log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");        // 保留中断状态        Thread.currentThread().interrupt();    }    log.info("Finally shutdown the thead pool: {}", alias);}

}

最后是线程池的优雅关闭,无论是主线程池还是异步线程池皆调用该方法实现优雅关闭.

以上只是模型代码,具体可替换成具体需要的业务代码来达到业务性能的提升.

转载地址:http://xlxto.baihongyu.com/

你可能感兴趣的文章
java inputStream ,outputStream
查看>>
系统服务
查看>>
Linux 文件与目录管理+用户管理命令
查看>>
C#中父类和子类之间相互转换
查看>>
《Linux菜鸟入门2》mail服务
查看>>
Mysql DOS: 进入Mysql运行文目录 F:\mysql-8.0.13-winx64\bin
查看>>
中小型企业网络构建之路由的简单配置
查看>>
Create an inbound email action
查看>>
oracle教程之DML事务锁定的机制
查看>>
Oracle RMAN 维护(一)--RMAN的维护
查看>>
centos6.6关闭防火墙和selinux
查看>>
JAVA RMI远程方法调用简单实例
查看>>
Citrix桌面虚拟化解决方案介绍
查看>>
WCF学习2
查看>>
python之潜心研究多线程(thread模块) 建议使用threading模块
查看>>
阵列无法解挂导致VCS双机倒换失败
查看>>
ORACLE中用for in 使用cursor
查看>>
Apache - AH00451
查看>>
vim使用技巧
查看>>
nagios+centreon监控构建
查看>>