[TOC]
转载地址:https://www.jianshu.com/p/6130557eccfb
BlockingQueue(自定义阻塞队列)
这个类通过一个队列,设置为一个生产者和消费者的模式,先开始等待任务过来, 也就是直接wait。然后等任务放入到队列中的时候, 然后在唤醒等待的几个线程,唤醒后,获取到任务, 然后进行执行。
需要注意, notifyAll不是马上就进行唤醒了。 下面代码需要主要,不一定就是放入一个任务, 那边就能获取到一个任务出来, 有可能是放入了多个任务后, 那边才被唤醒,然后一次拿出来的。
import java.util.LinkedList;
import java.util.List;
public class BlockingQueue<T> {
/**
* 使用链表实现一个阻塞队列(数据结构定义数据存储和获取方式,
所以只要满足这两点,阻塞队列可以用链表,也可以使用数组等来实现)
*/
private volatile List<T> queue = new LinkedList();
/**
* limit用来限制提交任务的最大数,默认10
*/
private int limit = 10;
public BlockingQueue(int limit) {
this.limit = limit;
}
/**
*
* @param item enqueue是一个同步方法,当任务到达上限,便会调用wait方法进行阻塞,
否则将任务放入队列中,并唤醒dequeue()任务线程
*/
public synchronized void enqueue(T item) {
while (this.queue.size() == this.limit) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (this.queue.size() <= limit) {
this.notifyAll();
}
this.queue.add(item);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
*
* dequeue也是一个同步方法,当队列中没有任务时便会调用wait方法进入阻塞,
当任务到达最大容量是唤醒其他dequeue()线程 ,并出列一个任务。
*/
public synchronized T dequeue() {
while (this.queue.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (this.queue.size() == this.limit) {
this.notifyAll();
}
T t = this.queue.remove(0);
return t;
}
public synchronized int size() {
return queue.size();
}
}
PoolThread
取出一个任务,进行任务业务代码具体执行。
public class PoolThread extends Thread {
private BlockingQueue<MyWork> taskQueue = null;
private boolean isStopped = false;
public PoolThread(BlockingQueue<MyWork> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
// 这里通过判断线程没有被中断的判断, 进行线程的死循环。
while (!isStopped() && !Thread.currentThread().isInterrupted()) {
try {
// 从任务队列获取任务并执行
MyWork myWork = (MyWork) taskQueue.dequeue();
myWork.work();
} catch (Exception e) {
isStopped = true;
break;
}
System.out.println("当前线程状态:-->"+Thread.currentThread().getState());
}
}
public synchronized void doStop() {
isStopped = true;
this.interrupt();
}
public synchronized boolean isStopped() {
return isStopped;
}
}
ThreadPool
初始化线程容器, 启动具体的多少条线程。然后启动线程, 线程去获取自定义队列中的任务,在任务那边的拿取时候,会进行wait的等待,直到从任务容器中拿到任务,然后在进行执行任务具体业务代码。
public class ThreadPool implements Service {
/**
* 任务队列,用来存储提交的任务
*/
private BlockingQueue<MyWork> taskQueue = null;
/**
* 线程池中存储线程的容器。
*/
private Queue<PoolThread> threads = new ArrayDeque<PoolThread>();
private boolean isShutdown = false;
public ThreadPool(final int initSize, final int maxNoOfTasks) {
taskQueue = new BlockingQueue<MyWork>(maxNoOfTasks);
// 初始化线程池
for (int i = 0; i < initSize; i++) {
threads.add(new PoolThread(taskQueue));
}
// 启动线程池线程
threads.forEach(thread -> thread.start());
}
@Override
public synchronized void execute(MyWork task) {
if (this.isShutdown()) {
throw new IllegalStateException("ThreadPool is stopped");
}
// 任务入列
taskQueue.enqueue(task);
}
@Override
public void execute(List<MyWork> myWorkList) {
if (this.isShutdown()) {
throw new IllegalStateException("ThreadPool is stopped");
}
// 任务入列
if(myWorkList != null && myWorkList.size() > 0) {
for (MyWork myWork : myWorkList) {
taskQueue.enqueue(myWork);
}
}
}
@Override
public synchronized void shutdown() {
this.isShutdown = true;
threads.forEach(thread -> thread.doStop());
}
@Override
public boolean isShutdown() {
return isShutdown;
}
}
Service
public interface Service {
// 关闭线程池
void shutdown();
// 查看线程池是否已经被shutdown
boolean isShutdown();
// 提交任务到线程池
void execute(MyWork myWork);
// 提交任务到线程池
void execute(List<MyWork> myWorkList);
}
MyWork(抽象任务类)
public abstract class MyWork {
private void beforeWork() {
System.out.println("----------------------->"+Thread.currentThread().getName() + " 开始工作");
}
public void work(){
beforeWork();
doWork();
afterWork();
}
protected abstract void doWork();
private void afterWork() {
System.out.println("----------------------->"+Thread.currentThread().getName() + " 结束工作");
}
}
PlayWork
public class PlayWork extends MyWork {
private String name;
public PlayWork(String name) {
this.name = name;
}
@Override
public void doWork() {
System.out.println(this.name+"-> 夺取冠军");
}
}
TeacherWork
public class TeacherWork extends MyWork {
private String name;
public TeacherWork(String name) {
this.name = name;
}
@Override
public void doWork() {
System.out.println(this.name + "-> 教书");
}
}
StudentWork
public class StudentWork extends MyWork {
private String name;
public StudentWork(String name) {
this.name = name;
}
@Override
public void doWork() {
System.out.println(this.name+"-> 读书");
}
}
ThreadPoolTest
public class ThreadPoolTest {
public static void main(String[] args) {
// 1. 启动5条线程, 从队列里面去获取任务, 如果没有任务的话,就进行wait等待。
final ThreadPool threadPool = new ThreadPool(5, 20);
List<MyWork> workers = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
MyWork studentWork = new StudentWork(i + "号小明同学");
workers.add(studentWork);
}
for (int i = 1; i <= 5; i++) {
MyWork teacherWork = new TeacherWork(i + "号高老师");
workers.add(teacherWork);
}
for (int i = 1; i <= 5; i++) {
MyWork playWork = new PlayWork(i + "号太空易");
workers.add(playWork);
}
// 添加任务
threadPool.execute(workers);
}
}
运行结果
----------------------->Thread-4 开始工作
1号小明同学-> 读书
----------------------->Thread-4 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-3 开始工作
2号小明同学-> 读书
----------------------->Thread-3 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-2 开始工作
3号小明同学-> 读书
----------------------->Thread-2 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-1 开始工作
4号小明同学-> 读书
----------------------->Thread-1 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-0 开始工作
5号小明同学-> 读书
----------------------->Thread-0 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-0 开始工作
----------------------->Thread-3 开始工作
----------------------->Thread-2 开始工作
3号高老师-> 教书
----------------------->Thread-2 结束工作
----------------------->Thread-1 开始工作
2号高老师-> 教书
----------------------->Thread-1 结束工作
当前线程状态:-->RUNNABLE
当前线程状态:-->RUNNABLE
4号高老师-> 教书
----------------------->Thread-3 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-3 开始工作
----------------------->Thread-4 开始工作
5号高老师-> 教书
----------------------->Thread-4 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-4 开始工作
4号太空易-> 夺取冠军
----------------------->Thread-4 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-4 开始工作
5号太空易-> 夺取冠军
----------------------->Thread-4 结束工作
当前线程状态:-->RUNNABLE
1号高老师-> 教书
----------------------->Thread-0 结束工作
当前线程状态:-->RUNNABLE
3号太空易-> 夺取冠军
----------------------->Thread-3 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-2 开始工作
2号太空易-> 夺取冠军
----------------------->Thread-2 结束工作
当前线程状态:-->RUNNABLE
----------------------->Thread-1 开始工作
1号太空易-> 夺取冠军
----------------------->Thread-1 结束工作
当前线程状态:-->RUNNABLE
自动扩容线程数
新增两个控制线程池线程数量的参数。线程池自动扩充时最大的线程池数量max,线程池空闲时需要释放线程但是也要维护一定数量的活跃线程数量或者核心数量core。有了这init , max , core三个参数就能很好的控制线程池中线程数量,三者之间的关系init <= core <= max。
public ThreadPool(final int activeCount, final int maxNoOfTasks) {
this.activeCount = activeCount;
taskQueue = new BlockingQueue<MyWork>(maxNoOfTasks);
// 初始化线程池
for (int i = 0; i < initSize; i++) {
threads.add(new PoolThread(taskQueue,i));
}
// 如果初始化的线程,没有到达活跃的线程数,继续添加
if (initSize < activeCount) {
for (int i = initSize; i < activeCount; i++) {
threads.add(new PoolThread(taskQueue,i));
}
}
// 启动线程池线程
threads.forEach(thread -> thread.start());
}
任务取出来,完成后, 保留2条存活的线程。
public void run() {
System.out.println("---->" + ativeThread);
// 这里通过判断线程没有被中断的判断, 进行线程的死循环。
while (!isStopped() && !Thread.currentThread().isInterrupted()) {
try {
// 从任务队列获取任务并执行
MyWork myWork = (MyWork) taskQueue.dequeue();
myWork.work();
} catch (Exception e) {
isStopped = true;
break;
}
// 当拿出一个任务完成后, 最后默认保存2条存活线程,在wait中。
System.out.println("队列当前数:-->" + this.taskQueue.size());
if (threadCount - ativeThread > 2) {
doStop();
}
}
System.out.println(Thread.currentThread().getName() + " 线程已经退出");
}
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付