并发 - Java并发工具 线程池

并发 - Java并发工具 线程池

线程池有点像数据库连接池, 调用连接池的关闭方法, 实际上那个方法被重写了, 会将连接返回连接池. 线程池也是类似, 找到了一篇美团技术团队的文章, 详细解释了线程池. 简单的说, 如果是反复重复的任务需要并行, 由于自行编写并发程序很麻烦, 所以就可以交给线程池来进行操作. 线程池 ThreadP

线程池有点像数据库连接池, 调用连接池的关闭方法, 实际上那个方法被重写了, 会将连接返回连接池. 线程池也是类似, 找到了一篇美团技术团队的文章, 详细解释了线程池. 简单的说, 如果是反复重复的任务需要并行, 由于自行编写并发程序很麻烦, 所以就可以交给线程池来进行操作.
  1. 线程池 ThreadPoolExecutor
  2. 设置线程池参数
  3. 扩展ThreadPoolExecutor
  4. 线程池的一些相关方法

线程池 ThreadPoolExecutor

这个类位于java.util.concurrent中, 包括其父类和子类. Executor类是线程池的基础, 实际使用的ThreadPoolExecutor就是继承这个类, 将Runnable类型的任务提交给线程池就可以被线程池执行. 创建线程池的工厂叫做Executors, 通过工厂类的静态方法可以创建不同类型的线程池, 都是ExecutorService类型的对象, ExecutorService继承自Executor, 常见的有如下:
  1. Executors.newFixedThreadPool(int nThreads), 创建一个固定线程数量的线程池
  2. Executors.newSingleThreadExecutor(), 创建一个只有一个线程的线程池, 所有任务会保存在一个队列中依次执行
  3. Executors.newCachedThreadPool(), 返回自行控制的线程池, 也是比较常见的用法
  4. Executors.newSingleThreadScheduledExecutor(), 返回一个ScheduledExecutorService对象, 单线程, 可以指定定时任务
  5. Executors.newScheduledThreadPool(int poolSize), 返回带有计划功能的, 可指定线程多少的线程池对象.
只要理解了Executors与Executor之间的关系, 一个是工厂, 一个是工厂创建出来的池子, 继承关系就容易理解了. 如果想要微调参数, 则就可以使用ThreadPoolExecutor的构造函数来进行微调. 因为返回的池子其实内部都是使用了ThreadPoolExecutor类. 让线程执行任务只需要调用.submit()方法即可, 方法接受Runnable和Callable对象. 写个简单小例子:
//一个线程类
public class MyThread implements Runnable {
    @Override
    public void run() {
        System.out.println("Start at " + System.currentTimeMillis() + " " + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
    //创建一个4个大小的线程池
    private static ExecutorService newPool = Executors.newFixedThreadPool(4);

    //提交10个任务
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            newPool.submit(new MyThread());
        }

        newPool.shutdown();
    }

}
可见线程一次提交4个, 如果改成其他种类, 会发现每次提交的任务数量会有变化. 注意最后的shutdown(), 如果不关闭线程池, 执行线程池的主线程不会退出. 也可以提交Callable对象, 不过目前还都是无法获取结果, 只能操作共享变量的Thread对象, 暂且先这么用着.

设置线程池参数

使用工厂方法, 基本上就是将策略交给了内置的策略. 如果想精确控制线程池的参数, 就可以在ThreadPoolExecutor的构造器中进行设置. 源码中参数最多的重载构造函数如下:
public ThreadPoolExecutor(int corePoolSize,//线程池中的线程数量
                          int maximumPoolSize,//线程池中的最大线程数量
                          long keepAliveTime,//线程数量超过corePoolSize之后, 线程的最大活动时间
                          TimeUnit unit,//上一个参数的单位
                          BlockingQueue<Runnable> workQueue,//阻塞任务队列, 所谓阻塞队列, 队列为空时候读取会阻塞, 队列满的时候添加任务会阻塞
                          ThreadFactory threadFactory, // 自定义的创建线程的工厂
                          RejectedExecutionHandler handler // 拒绝策略)
这其中BlockingQueue可以自行控制, 决定传入什么样子的队列, 比如直接提交队列, 有界任务队列, 无界任务队列, 优先任务队列等等, 都需要实现BlockingQueue接口. ThreadFactory是如何创建线程工厂, 如果需要自定义每次线程如何创建, 或者在创建线程之前做一些事情, 就可以传入一个自定义的ThreadFactory对象. 拒绝策略是指当一些情况出现的时候, 要如何分配任务(拒绝任务也是任务分配的一种). JDK内置有四种策略, 都实现了RejectedExecutionHandler接口, 详细如下:
  1. AbortPolicy, 直接抛出异常, 也是默认的方式
  2. CallerRunsPolicy, 只要线程池没有关闭, 直接在调用者线程里运行任务
  3. DiscardOldestPolicy, 丢弃最老的一个请求, 也就是即将被执行的任务, 然后再次提交新任务
  4. DiscardPolicy, 丢弃新任务, 不做任何提示.
四种情况要根据实际情况进行判断. 也可以自定义RejectedExecutionHandler对象.

扩展ThreadPoolExecutor

ThreadPoolExecutor中有三个方法可以自定义:
  1. beforeExecute()
  2. afterExecute()
  3. terminated()
前两个方法会在每个线程被执行的时候, 在线程开始和结束的时候调用, 最后一个方法, 则会在线程池结束的时候被调用. 尝试一下上边一节和这次的各种方法来自定义一些东西玩玩:
import java.util.concurrent.*;

public class CustomizeThreadPoolTest {

    public static void main(String[] args) {

        ThreadPoolExecutor pool = new ThreadPoolExecutor(5,
                10, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() {
        }, new ThreadFactory() {
            //自定义ThreadFactory
            @Override
            public Thread newThread(Runnable r) {
                System.out.println("创建一个新线程");
                Thread t = new Thread(r);
                System.out.println("创建新线程: " + t.getName() + " 成功");
                return t;
            }
        }, new ThreadPoolExecutor.AbortPolicy()){
            //自定义ThreadPoolExecutor的三个生命周期方法
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行 "+ r);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完毕 "+ r);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池运行结束.");
            }
        };


        for (int i = 0; i < 10; i++) {
            pool.submit(new MyThread());
        }
        pool.shutdown();
    }

}
运行结果如下:
创建一个新线程
创建新线程: Thread-0 成功
创建一个新线程
创建新线程: Thread-1 成功
创建一个新线程
创建新线程: Thread-2 成功
创建一个新线程
创建新线程: Thread-3 成功
创建一个新线程
创建新线程: Thread-4 成功
准备执行 java.util.concurrent.FutureTask@66910c95[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3b37d803[Wrapped task = multithread.ThreadPool.MyThread@44e1d57b]]
准备执行 java.util.concurrent.FutureTask@73bc7a9d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@38f343ca[Wrapped task = multithread.ThreadPool.MyThread@37876d91]]
准备执行 java.util.concurrent.FutureTask@7cfb39a5[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4a949d85[Wrapped task = multithread.ThreadPool.MyThread@64bb913d]]
准备执行 java.util.concurrent.FutureTask@7eaee205[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31dd3324[Wrapped task = multithread.ThreadPool.MyThread@4395df41]]
准备执行 java.util.concurrent.FutureTask@4e8eae8a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@5825056d[Wrapped task = multithread.ThreadPool.MyThread@6458e989]]
Start at 1593584674017 Thread-1
Start at 1593584674017 Thread-2
Start at 1593584674016 Thread-0
Start at 1593584674017 Thread-3
Start at 1593584674017 Thread-4
执行完毕 java.util.concurrent.FutureTask@7cfb39a5[Completed normally]
准备执行 java.util.concurrent.FutureTask@524b5414[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@b0f7684[Wrapped task = multithread.ThreadPool.MyThread@dfad302]]
Start at 1593584675035 Thread-1
执行完毕 java.util.concurrent.FutureTask@66910c95[Completed normally]
执行完毕 java.util.concurrent.FutureTask@73bc7a9d[Completed normally]
准备执行 java.util.concurrent.FutureTask@3ce1e52a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4a606b02[Wrapped task = multithread.ThreadPool.MyThread@2065a844]]
Start at 1593584675036 Thread-0
准备执行 java.util.concurrent.FutureTask@e5f4084[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@76121ff2[Wrapped task = multithread.ThreadPool.MyThread@70aa8d0e]]
Start at 1593584675036 Thread-2
执行完毕 java.util.concurrent.FutureTask@4e8eae8a[Completed normally]
准备执行 java.util.concurrent.FutureTask@1830ccdb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@296a1f26[Wrapped task = multithread.ThreadPool.MyThread@5ffa45c8]]
执行完毕 java.util.concurrent.FutureTask@7eaee205[Completed normally]
Start at 1593584675037 Thread-4
准备执行 java.util.concurrent.FutureTask@f3d475a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@44438c8f[Wrapped task = multithread.ThreadPool.MyThread@4f4db265]]
Start at 1593584675037 Thread-3
执行完毕 java.util.concurrent.FutureTask@524b5414[Completed normally]
执行完毕 java.util.concurrent.FutureTask@e5f4084[Completed normally]
执行完毕 java.util.concurrent.FutureTask@3ce1e52a[Completed normally]
执行完毕 java.util.concurrent.FutureTask@f3d475a[Completed normally]
执行完毕 java.util.concurrent.FutureTask@1830ccdb[Completed normally]
线程池运行结束.
可以看到, 虽然有10个任务, 但是只创建了5个线程, 然后在每次执行任务之前, 都会打印出相关信息. 最后在线程池结束的时候, terminated()被运行.

线程池的一些相关方法

这个关系到ThreadPoolExecutor的运行状态, 一共有5种:
  1. RUNNING, 能够接受新任务, 也在处理新任务
  2. SHUTDOWN, 不再接受新任务, 但会把遗留任务执行完毕
  3. STOP, 不接受新任务也不再处理尚未处理的任务, 同时中断正在处理的任务
  4. TIDYING, 所有任务都终止, 逐个干掉线程, 线程数量为0, 实际上就是清理阶段
  5. TERMINATED, 调用terminated()方法之后进入该状态, 其实就是彻底结束
所以要注意的是如下方法:
  1. shutdown(), 最常用的, 完成工作提交之后调用该方法, 转入SHUTDOWN状态, 线程池会在处理完毕之后结束
  2. shutdownNow(), 直接进入STOP状态, 相当于突然中止线程池的工作
  3. isShutdown(), 判断是不是在SHUTDOWN状态
  4. isTerminated(), 判断是不是TERMINATED
  5. execute(Runnable command), 这个也是提交任务的方法, 但是不返回任何值
  6. submit(Runnable task), 这个也是提交任务的方法, 返回Future对象, 这个就后边再看了.
SHUTDOWN和STOP之后, 在线程数量为0后, 都会进入TIDYING然后是TERMINATED阶段. 好了, 现在可以把线程池加入到自己的工具中来了.
LICENSED UNDER CC BY-NC-SA 4.0
Comment