多线程的基础知识,温故知新 在 Java 5.0 提供了 java.util.concurrent
(简称JUC)包,在此包中增加了在并发编程中很常用的工具类, 用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中 的 Collection 实现等;
多线程基础 多线程书写模板 1.在高内聚低耦合的前提下考虑三要素线程 操作(对外暴露的调用方法) 资源类
线程的创建采用匿名内部类new Thread(runable target,String name)
lock的使用
Lock lock = new ReentrantLock(); lock.lock(); try { } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }
mysql默认端口3306 redis默认端口6379
接口之前不允许有默认实现,1.8后面有default实现,只允许一个默认实现
静态方法实现(属于整个类,类名调用)
当接口中就一个方法时可以采用lambda表达式
new Thread(()->{ try { } catch (InterruptedException e) { e.printStackTrace(); } },"ThreadName" ).start();
线程间的通信 关键词:判断/干活/通知
wait() notifyAll()
在多线程的交互中,要避免虚假唤醒 判断要用while,不能用if
生产者消费者防止虚假唤醒
synchronized wait notify
lock 和 condition配合使用 condition = lock.newCondition() await signal signalAll
Condition condition = lock.newCondition(); condition.await(); condition.signalAll();
lock新功能解决synchronize没解决的问题 多线程之间按顺序调用实现A->B->C A打印5次B打印10次C打印15次 10轮
private int number = 1 ; Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); public void print5 () throws InterruptedException { lock.lock(); try { while (number != 1 ){ condition1.await();} for (int i =1 ;i<=5 ;i++){ System.out.println(Thread.currentThread().getName()+"\t" +i); } number=2 ; condition2.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
注意标志位的设置
锁的思考 class Phone { public synchronized void sendEmail () { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("---sendEmail" ); } public synchronized void sendSMS () { System.out.println("---sendSMS" ); } public void sayHello () { System.out.println("---Hello" ); } } public class Lock8 { public static void main (String[] args) { Phone phone = new Phone(); new Thread(()->{ try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } },"A" ).start(); new Thread(()->{ try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } },"B" ).start(); new Thread(()->{ try { phone.sayHello(); } catch (Exception e) { e.printStackTrace(); } },"C" ).start(); } }
不知道 随机
先邮件
先Hello
先短信
先邮件
先邮件
先短信
先短信
5,6两锁静态同步(static synchronized)方法锁的是.class文件,不是实例对象
list线程不安全 ConcurrentModificationException并发修改异常
原因
解决方案一
Vector线程安全 实现加了synchronized (不允许用,淘汰)
小数据量使用List <String> list = Collection.synchronizedList(new ArrayList())
HashMap HashSet 线程不安全 也可以使用Collection
解决方案二(juc)
CopyOnWriteArrayList
原理
写时复制,读写分离思想
set线程不安全 同理 HashMap HashSet 线程不安全 也可以使用Collection
List <String> list = Collection.synchronizedSet(new HashSet())
CopyOnWriteArraySet **结构严格意义来说是一个集合,它的底层实现是利用 数组,它的上层实现是 CopyOnWriteArrayList**。
HashSet底层HashMap
HashSet入值只有一个为什么
底层调用的HashMap的put,键(Key)为Set入值,值(Value)为Object常量
补充
HashMap底层 Node类型数组+链表+红黑树
初始容量16,负载因子0.75
即默认情况下数组长度是16*0.75=12时,触发扩容操作。
当map添加12个元素到的时候就发生扩容,创建新的数组的大小2*16=32,然后重新计算每个元素在新数组中的位置
Map的线程安全实现为: ConcurrentHashMap
Callable接口 第三种获取多线程得方式
和Runable接口区别
方法为call()
call()抛出异常
call()方法有返回值
使用
FutureTask futureTask = new FutureTask(); new Thread(futureTask,"A" ).start();futureTask.get();
使用细节
get方法一般请放在最后一行
ReadWriteLock 读写锁 写时加锁 读时共享
未加锁时,事务原子性不满足
加锁
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readWriteLock.writeLock().lock(); readWriteLock.writeLock().unlock(); readWriteLock.readLock().lock(); readWriteLock.readLock().unlock();
效果
class myCache { private volatile Map<String,Object> map = new HashMap<>(); ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put (String key,Object object) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"---写入数据" ); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (Exception e) { e.printStackTrace(); } map.put(key,object); System.out.println(Thread.currentThread().getName()+"---写入完成" ); } catch (Exception e) { e.printStackTrace(); }finally { readWriteLock.writeLock().unlock(); } } public void get (String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+"+++读取数据" ); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (Exception e) { e.printStackTrace(); } Object object = map.get(key); System.out.println(Thread.currentThread().getName()+"+++读取完成" +object); }catch (Exception e){ e.printStackTrace(); }finally { readWriteLock.readLock().unlock(); } } } public class ReadWriteDemo { public static void main (String[] args) { myCache myCache = new myCache(); for (int i = 1 ;i<=5 ;i++){ String tmp = String.valueOf(i); new Thread(()->{ myCache.put(tmp,tmp); },String.valueOf(i)).start(); } for (int i = 1 ;i<=5 ;i++){ String tmp = String.valueOf(i); new Thread(()->{ myCache.get(tmp); },String.valueOf(i)).start(); } } }
BlockingQueue 阻塞队列
队列为空时,从队列中获取元素阻塞
队列为满时,从队列中增加元素阻塞
7个实现类
主要了解
ArrayBlockingQueue
LinkedBlockingQueue
synchronousQueue 单个元素阻塞队列
参数为队列容量
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);
核心方法
线程池 ThreadPool线程池
优势
线程复用,控制最大并发数,线程管理。
ThreadPoolExecutor类是关键!
代码api调用 ExecutorService executorService = Executors.newFixedThreadPool(5 ); try { for (int i = 1 ;i<=10 ;i++){ executorService.execute(()->{ System.out.println(Thread.currentThread().getName()+"办理业务" ); }); } }catch (Exception e){ e.printStackTrace(); }finally { executorService.shutdown(); }
效果
线程复用
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newCachedThreadPool();
三种实现的原理 底层是阻塞队列
public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
线程池的7个参数 ThreadPoolExecutor类构造方法
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
corePoolSize 线程池中常住核心线程数
maximumPoolSize 线程池中能够容纳同时执行的最大线程数
keepAliveTime 多余的空闲线程存活时间
unit keepAliveTime的单位
workQueue 任务队列,被提交但是尚未执行的队列
threadFactory 创建线程的线程工厂 默认即可
handler 拒绝策略,当队列满了,并且工作线程大于等于maximumPoolSize时如何拒绝请求
线程池底层工作原理图
线程池参数合理设置
问:实际中,三个实现线程池的方法哪个用的最多?
三个都不用,用自定义的
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式|这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors返回的线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
CachedThreadPool 和 ScheduledThregPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(2 , 5 , 2l , TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
四种策略
策略1: ThreadPoolExecutor.AbortPolicy
超出线程数时抛出RejectedExecutionException异常
策略2:ThreadPoolExecutor.CallerRunsPolicy 用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
策略3: ThreadPoolExecutor.DiscardOldestPolicy;
丢弃任务队列中最旧任务
策略4:ThreadPoolExecutor.DiscardPolicy 用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
补充:
cpu密集型 maximumPoolSize 通常为cpu核数+1