侧边栏壁纸
  • 累计撰写 274 篇文章
  • 累计创建 141 个标签
  • 累计收到 17 条评论

目 录CONTENT

文章目录

Java 线程池注意事项

Sherlock
2021-01-07 / 0 评论 / 0 点赞 / 936 阅读 / 0 字
温馨提示:
本文最后更新于2023-10-09,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

线程池相关前置知识可参考:learning--线程池

大家都知道阿里巴巴开发规范是不建议使用Executors来创建线程池的,具体原因就不再赘述了,可以参考上面的链接。

Tomcat 线程池策略

不同于Java实现的线程池(倾向于优先使用队列,队列满了再开启更多线程),
Tomcat的线程池队列是无限长度的,但是线程池会一直创建到maximumPoolSize,然后才把请求放入等待队列中:

tomcat 任务队列org.apache.tomcat.util.threads.TaskQueue其继承与LinkedBlockingQueue,覆写了offer方法。

    @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent == null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount() < (parent.getPoolSize())) return super.offer(o);
        //线程个数小于MaximumPoolSize会创建新的线程。
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

实现一个更激进的线程池

重写队列的 offer 方法直接返回 false,数据不入队列,并且自定义 RejectedExecutionHandler,触发拒绝策略的时候再把任务加入队列。

看一下 Dubbo 内置的线程池策略

Dubbo 提供了 3 种线程池模型即:FixedThreadPoolCachedThreadPool(客户端默认的)、LimitedThreadPool(服务端默认的),从源码可以看出,其默认的队列长度都是 0,当队列长度为 0 ,其使用是无缓冲的队列SynchronousQueue,当运行线程超过maximumPoolSize 则拒绝请求。

// 此线程池启动时即创建固定大小的线程数,不做任何伸缩
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

// 此线程池一直增长,直到上限,增长后不收缩
public class LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

// 此线程池可伸缩,线程空闲一分钟后回收,新请求重新创建线程
public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}

自己用的

直接上一个平时使用的初始化线程池代码(依赖 Guava):

import com.google.common.util.concurrent.ThreadFactoryBuilder;

// 推荐使用 guava 的 ThreadFactoryBuilder 来创建线程池
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("xxx-exxcute-thread-%d")
                // 解决 线程池的 submit() 方法提交任务没有异常抛出
                .setUncaughtExceptionHandler((thread, throwable)-> log.error("XxxPool {} got exception", thread,throwable))
                .build();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                POOL_SIZE,
                POOL_SIZE,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(0),
                namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());

扩展:Java8 的 ParallelStream 背后是一个公共线程池,别把 IO 任务使用 ParallelStream 来处理。

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区