多线程的基础知识,温故知新

在 Java 5.0 提供了 java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,
用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中
的 Collection 实现等;

多线程基础

多线程书写模板

1.在高内聚低耦合的前提下考虑三要素
线程 操作(对外暴露的调用方法) 资源类

线程的创建采用匿名内部类
new Thread(runable target,String name)

lock的使用

//可复用锁
Lock lock = new ReentrantLock();

//在操作的方法中
lock.lock();
try {
//具体操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}

mysql默认端口3306
redis默认端口6379

接口之前不允许有默认实现,1.8后面有default实现,只允许一个默认实现

静态方法实现(属于整个类,类名调用)

当接口中就一个方法时可以采用lambda表达式

//启动一个线程 new Thread(runable target,String name)
new Thread(()->{
try {
//具体操作资源类
} catch (InterruptedException e) {
e.printStackTrace();
}
},"ThreadName").start();

线程间的通信

关键词:判断/干活/通知

wait()
notifyAll()

在多线程的交互中,要避免虚假唤醒
判断要用while,不能用if

生产者消费者防止虚假唤醒

synchronized wait notify

lock 和 condition配合使用
condition = lock.newCondition()
await signal signalAll

//声明
Condition condition = lock.newCondition();

//方法
condition.await();
condition.signalAll();

lock新功能解决synchronize没解决的问题
多线程之间按顺序调用实现A->B->C
A打印5次B打印10次C打印15次 10轮

//首先要为 A B C 标志
private int number = 1;//1:A 2:B 3:C`

Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();

//比如在打印5次的方法中 其他方法类似1唤醒2 2唤醒3 3唤醒1
public void print5() throws InterruptedException{

lock.lock();
try {
while(number != 1){
condition1.await();}
for(int i =1;i<=5;i++){
System.out.println(Thread.currentThread().getName()+"\t"+i);
}
number=2;//将标志位更改为B
condition2.signal();//唤醒代表操作B的线程
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

注意标志位的设置

锁的思考

//基础资源类
class Phone{
public synchronized void sendEmail(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("---sendEmail");
}
public synchronized void sendSMS(){
System.out.println("---sendSMS");
}
public void sayHello(){
System.out.println("---Hello");
}
}
//main
public class Lock8 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
try {
phone.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
},"B").start();
new Thread(()->{
try {
phone.sayHello();
} catch (Exception e) {
e.printStackTrace();
}
},"C").start();
}
}

image-20191118211152259

  1. 不知道 随机
  2. 先邮件
  3. 先Hello
  4. 先短信
  5. 先邮件
  6. 先邮件
  7. 先短信
  8. 先短信

5,6两锁静态同步(static synchronized)方法锁的是.class文件,不是实例对象

list线程不安全

ConcurrentModificationException并发修改异常

原因

解决方案一

Vector线程安全 实现加了synchronized (不允许用,淘汰)

小数据量使用List <String> list = Collection.synchronizedList(new ArrayList())

HashMap HashSet 线程不安全 也可以使用Collection

解决方案二(juc)

CopyOnWriteArrayList

原理

写时复制,读写分离思想

image-20191118221307322

set线程不安全 同理

HashMap HashSet 线程不安全 也可以使用Collection

List <String> list = Collection.synchronizedSet(new HashSet())

CopyOnWriteArraySet **结构严格意义来说是一个集合,它的底层实现是利用数组,它的上层实现是CopyOnWriteArrayList**。

HashSet底层HashMap

HashSet入值只有一个为什么

底层调用的HashMap的put,键(Key)为Set入值,值(Value)为Object常量

image-20191118222740799

image-20191118222748828

补充

HashMap底层 Node类型数组+链表+红黑树

初始容量16,负载因子0.75

即默认情况下数组长度是16*0.75=12时,触发扩容操作。

当map添加12个元素到的时候就发生扩容,创建新的数组的大小2*16=32,然后重新计算每个元素在新数组中的位置

Map的线程安全实现为: ConcurrentHashMap

Callable接口

第三种获取多线程得方式

和Runable接口区别

  1. 方法为call()
  2. call()抛出异常
  3. call()方法有返回值

image-20191119204531149

使用

FutureTask futureTask = new FutureTask();
new Thread(futureTask,"A").start();

futureTask.get();//获取返回值

使用细节

  1. get方法一般请放在最后一行

ReadWriteLock 读写锁

写时加锁 读时共享

未加锁时,事务原子性不满足

image-20191122151327071

加锁

//定义语法
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//写锁
readWriteLock.writeLock().lock();
readWriteLock.writeLock().unlock();
//读锁
readWriteLock.readLock().lock();
readWriteLock.readLock().unlock();

效果

image-20191122152959235

//模拟缓存
class myCache{
private volatile Map<String,Object> map = new HashMap<>();
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

public void put(String key,Object object){

readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"---写入数据");
try { TimeUnit.MILLISECONDS.sleep(300); } catch (Exception e) { e.printStackTrace(); }
map.put(key,object);
System.out.println(Thread.currentThread().getName()+"---写入完成");
} catch (Exception e) {
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}

public void get(String key){
readWriteLock.readLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"+++读取数据");
try { TimeUnit.MILLISECONDS.sleep(300); } catch (Exception e) { e.printStackTrace(); }
Object object = map.get(key);
System.out.println(Thread.currentThread().getName()+"+++读取完成"+object);
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}

}
/*----------------------------*/
public class ReadWriteDemo {
public static void main(String[] args) {
myCache myCache = new myCache();

for(int i = 1;i<=5;i++){
String tmp = String.valueOf(i);
new Thread(()->{
myCache.put(tmp,tmp);
},String.valueOf(i)).start();
}

for(int i = 1;i<=5;i++){
String tmp = String.valueOf(i);
new Thread(()->{
myCache.get(tmp);
},String.valueOf(i)).start();
}
}
}

BlockingQueue 阻塞队列

image-20191122154331386

队列为空时,从队列中获取元素阻塞

队列为满时,从队列中增加元素阻塞

7个实现类

主要了解

ArrayBlockingQueue

LinkedBlockingQueue

synchronousQueue 单个元素阻塞队列

参数为队列容量

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);

核心方法

image-20191122162722963

线程池

ThreadPool线程池

优势

线程复用,控制最大并发数,线程管理。

ThreadPoolExecutor类是关键!

代码api调用

//定义指定数量线程
ExecutorService executorService = Executors.newFixedThreadPool(5);

try{
for(int i = 1;i<=10;i++){
executorService.execute(()->{
System.out.println(Thread.currentThread().getName()+"办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
executorService.shutdown();
}

效果

image-20191122210214897

线程复用

//单一线程线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
//可扩充线程池
ExecutorService executorService = Executors.newCachedThreadPool();

三种实现的原理

底层是阻塞队列

//固定数量线程池子
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

//单一线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//可扩容线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

线程池的7个参数

ThreadPoolExecutor类构造方法

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  1. corePoolSize 线程池中常住核心线程数
  2. maximumPoolSize 线程池中能够容纳同时执行的最大线程数
  3. keepAliveTime 多余的空闲线程存活时间
  4. unit keepAliveTime的单位
  5. workQueue 任务队列,被提交但是尚未执行的队列
  6. threadFactory 创建线程的线程工厂 默认即可
  7. handler 拒绝策略,当队列满了,并且工作线程大于等于maximumPoolSize时如何拒绝请求

线程池底层工作原理图

image-20191122220626521

线程池参数合理设置

问:实际中,三个实现线程池的方法哪个用的最多?

​ 三个都不用,用自定义的

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式|这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下:

  1. FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
  2. CachedThreadPool 和 ScheduledThregPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM

自定义线程池

ExecutorService executorService = new ThreadPoolExecutor(2,
5,
2l,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

四种策略

策略1: ThreadPoolExecutor.AbortPolicy

超出线程数时抛出RejectedExecutionException异常

策略2:ThreadPoolExecutor.CallerRunsPolicy
用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。

策略3: ThreadPoolExecutor.DiscardOldestPolicy;

丢弃任务队列中最旧任务

策略4:ThreadPoolExecutor.DiscardPolicy
用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。

补充:

cpu密集型 maximumPoolSize 通常为cpu核数+1