什么是JUC JUC是java.util.concurrent
包的缩写,这个包是Java的并发编程的工具类,所以JUC编程可以理解为Java的并发编程。
一些基本的概念 程序,进程与线程
程序是一组计算机能识别和执行的指令,只是一个静态的概念。
进程是操作系统资源分配的基本单位,可以理解为一个程序的运行就为一个进程,它是一个动态的实体。
线程可以理解为进程之内为了完成某一个任务的线路,一个进程为了完成任务,可能就会将该任务拆分为很多子任务,并开辟多个线程来完成这些子任务,从而加快完成总任务的速度。
开启线程的方式
继承Thread
类,重写run()
方法,调用start()
方法
实现Runnable
接口,重写run()
方法,创建Thread
实例,调用其中的start()
方法。
实现Calleble
接口
串行,并发和并行
串行很好理解,每个任务都会排队执行,上一个任务没有结束,下一个任务就无法开始。
并发就是不用等待上一个任务执行完毕,下一个任务就能开始执行,每个任务交替执行,形成每个任务同时进行的假象。
并行就是任务同时进行,而不是像并发一样交替执行。
简单理解为并发就是交替做一件事,并行就是同时做不同的事 。
即使是单核的CPU也能进行多线程的操作,但是做不到并行。
上下文切换 即使是单核的CPU也是支持多线程执行的,CPU通过给每个线程分配CPU时间片来实现这个机制,这个时间片非常短,所以我们觉得多个线程是一起执行的,但其实是CPU在不断的切换线程执行。
在切换线程前会保存这个线程任务的状态,然后加载下个线程任务的状态,这个过程就是一次上下文切换。
并发是否比串行执行速度快 刚刚提到了上下文切换,可以知道并发其实并不一定会比串行的速度快,因为并发会消耗一部分时间在上下文切换上,所以当并发执行的操作不超过百万次时,速度会比串行执行操作要慢。
Java进程的状态 从Java的Thread
类中就可以看到Java进程的全部状态了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
synchronized 首先来看个简单的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Test implements Runnable { private int ticket = 30 ; private boolean flag = true ; public static void main (String[] args) { Test test = new Test (); new Thread (test,"第一个人" ).start(); new Thread (test,"第二个人" ).start(); new Thread (test,"第三个人" ).start(); } public void buyTicket () { if (ticket <= 0 ) { flag = false ; return ; } try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "获得了第" + ticket-- + "张票" ); } @Override public void run () { while (flag) { buyTicket(); } } }
从上面这个例子就会出现两种不正常的情况。
先买到了第n张票之后才买到第n-1张票
出现负数的票
因为多线程难以统一资源所以会出现的这些异常情况,这是我们不想看到的。处理并发的有效的手段就是加锁,所以我们需要用加锁来解决这个问题。
这里使用了synchronized
来对买票的方法进行加锁,可以解决一些简单的并发问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.yw;public class Test implements Runnable { private int ticket = 30 ; private boolean flag = true ; public static void main (String[] args) { Test test = new Test (); new Thread (test,"第一个人" ).start(); new Thread (test,"第二个人" ).start(); new Thread (test,"第三个人" ).start(); } public synchronized void buyTicket () { if (ticket <= 0 ) { flag = false ; return ; } try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "获得了第" + ticket-- + "张票" ); } @Override public void run () { while (flag) { buyTicket(); } } }
synchronized分为三种使用情况。
在普通方法上使用synchronized关键字,锁住的是实例对象,每个实例都各自有自己的锁,互不干扰。
在静态方法上使用synchronized关键字,锁住的是类,即每个实例对象共用这个锁。
同步方法块,锁的对象不定,看个人行为。
Lock锁 也可以不用synchronized
来加锁,这里就来讲讲第二种方式。
这里就只学习Lock类的ReentrantLock
这个实现类。
先探究一下这个实现类的构造方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ReentrantLock () { sync = new NonfairSync (); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
由此可以看出,不加参数使用该实现类,这个锁就是非公平锁
,加了true的参数,就是公平锁
。
简单的解释一下公平锁与非公平锁。
公平锁,表示先来先服务,按照申请锁的顺序去获得锁,每个申请锁的线程都会形成一个队列,一个个来获得锁。
非公平锁,表示每个线程都可以去申请获得锁,获取不到就等待,获取得到就可以获得锁。
很多情况默认都是是非公平锁,synchronized也算是非公平锁,谁先抢到,谁就进行。
现在就用该实现类来解决一下上面买票的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Test implements Runnable { private int ticket = 30 ; private boolean flag = true ; Lock lock = new ReentrantLock (); public static void main (String[] args) { Test test = new Test (); new Thread (test,"第一个人" ).start(); new Thread (test,"第二个人" ).start(); new Thread (test,"第三个人" ).start(); } public void buyTicket () { lock.lock(); try { if (ticket <= 0 ) { flag = false ; return ; } Thread.sleep(100 ); System.out.println(Thread.currentThread().getName() + "获得了第" + ticket-- + "张票" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } @Override public void run () { while (flag) { buyTicket(); } } }
这样就可以进行简单的加锁来解决一些线程出现的问题。
synchronized和ReentrantLock区别
synchronized是一个关键字,是JVM层面的,而ReentrantLock是一个来实现类,是API层面的。
synchronized是自动加锁和释放锁的,而ReentrantLock是手动加锁的,所以也需要手动释放锁。
synchronized的锁是不可中断的,除非是临界区内的代码出现异常或者是正常结束释放锁,而ReentrantLock的锁是可以中断的,可以使用trylock
方法设置超时方法。
synchronized是非公平锁,而ReentrantLock默认是非公平锁,但是可以通过构造方法设置成公平锁。
synchronized的线程阻塞后是只能随机唤醒进程,而ReentrantLock通过Condition类可以精准唤醒被绑定的线程。
生产者和消费者的问题 synchronzied 我们计划,当商品数量为1的时候,消费者进行消费,当商品数量为0的时候,生产者进行生产。
所以这会涉及到线程之间的通信,商品数量为0的时候消费者就会进入等待,并通知生成者进入就绪状态。商品数量为1的时候生成者就会进入等待,并通知消费者进入就绪状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class Test { public static void main (String[] args) { Shop shop = new Shop (); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生成者" ).start(); } } class Shop { private int shopnum = 1 ; public synchronized void buyShop () throws InterruptedException { if (shopnum == 0 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum--); this .notifyAll(); } public synchronized void AddShop () throws InterruptedException { if (shopnum == 1 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum++); this .notifyAll(); } }
这个代码还会有一个虚假唤醒
的问题,当线程数增加到了两个以上,就会出现一些异常情况,比如票数会到达0与1之外的范围,这是因为使用if来进行判断等待状态,只会判断一次,所以可能会出现两个线程都判断成功的情况,所以必须要使用while来进行判断,这样可以防止虚假唤醒的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class Test { public static void main (String[] args) { Shop shop = new Shop (); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者" ).start(); } } class Shop { private int shopnum = 1 ; public synchronized void buyShop () throws InterruptedException { while (shopnum == 0 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum--); this .notifyAll(); } public synchronized void AddShop () throws InterruptedException { while (shopnum == 1 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum++); this .notifyAll(); } }
Lock锁 这里的Lock接口是没办法实现线程等待和唤醒的,所以就使用了一个Condition
类的await
方法和signal
方法来代替wait和notifyall方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class Test { public static void main (String[] args) { Shop shop = new Shop (); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者1" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者1" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者2" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者2" ).start(); } } class Shop { private int shopnum = 1 ; Lock lock = new ReentrantLock (); Condition condition = lock.newCondition(); public void buyShop () throws InterruptedException { lock.lock(); try { while (shopnum == 0 ) { condition.await(); } System.out.println(Thread.currentThread().getName() + shopnum--); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void AddShop () throws InterruptedException { lock.lock(); try { while (shopnum == 1 ) { condition.await(); } System.out.println(Thread.currentThread().getName() + shopnum++); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
读写锁 读写锁,这个类其实和上面两个加锁方法差不多,该锁可以将封锁粒度减少一点。读写锁其实就是之前学到的排他锁和共享锁。
线程对一个资源加上排它锁,表示只允许该线程进行读写资源,不允许其他线程读写,直到该线程释放了排它锁。
线程对一个资源加上共享锁,表示该线程可以对该资源进行读操作,但不能修改资源,其他的线程可以对该资源加共享锁,但不能加排它锁进行修改该资源,直到所有线程释放了对该资源的共享锁才能为该资源加排它锁进行修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class Test { public static void main (String[] args) { Mycache mycache = new Mycache (); for (int i = 1 ; i <= 10 ; i++) { final int temp = i; new Thread (()->{ mycache.put(temp + "" ,temp + "" ); },String.valueOf(i)).start(); } for (int i = 1 ; i <= 10 ; i++) { final int temp = i; new Thread (()->{ mycache.get(temp + "" ); },String.valueOf(i)).start(); } } } class Mycache { private volatile Map<String,String> map = new HashMap <>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock (); public void put (String key,String value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key,value); System.out.println(Thread.currentThread().getName() + "写入完成" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public void get (String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "读取" + key); String value = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
集合类 经常使用的一些集合类在并发操作下很多都是不安全的,多个线程进行写入操作就会出现并发修改异常ConcurrentModificationException
。所以现在来探讨一下解决的方案。
List
使用Vector
实现类,这个实现类是线程安全的,为什么安全可以看一下它的add方法,它是使用了synchronized
关键字的。
1 2 3 4 5 6 public synchronized boolean add (E e) { modCount++; ensureCapacityHelper(elementCount + 1 ); elementData[elementCount++] = e; return true ; }
使用Collections工具类将Arraylist转换成安全的实现类。
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { List<String> list = Collections.synchronizedList(new ArrayList <>()); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ list.add("1111" ); System.out.println(list); },String.valueOf(i)).start(); } } }
使用CopyOnWriteArrayList
实现类。
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { List<String> list = new CopyOnWriteArrayList <>(); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ list.add("1111" ); System.out.println(list); },String.valueOf(i)).start(); } } }
Set HashSet也和ArrayList一样不安全,所以需要进行一些操作。
使用Collections工具类将HashSet转换成安全的实现类。
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Set<String> set = Collections.synchronizedSet(new HashSet <>()); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ set.add(UUID.randomUUID().toString()); System.out.println(set); },String.valueOf(i)).start(); } } }
使用CopyOnWriteArrayList
实现类。
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Set<String> set = new CopyOnWriteArraySet <>(); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ set.add(UUID.randomUUID().toString()); System.out.println(set); },String.valueOf(i)).start(); } } }
Map
使用Collections工具类将HashMap转换成安全的实现类。
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Map<String,String> map = Collections.synchronizedMap(new HashMap <>()); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ map.put(Thread.currentThread().getName(),UUID.randomUUID().toString()); System.out.println(map); },String.valueOf(i)).start(); } } }
使用ConcurrentHashMap
实现类。
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Map<String,String> map = new ConcurrentHashMap <>(); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ map.put(Thread.currentThread().getName(),UUID.randomUUID().toString()); System.out.println(map); },String.valueOf(i)).start(); } } }
Callable Callable是第三种开启线程的方式,但是线程的开启都是通过Thread类的start方法来开启的,而Callable的实现类是无法传入Thread类中的,所以需要借用中间类来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { HelloTest helloTest = new HelloTest (); FutureTask futureTask = new FutureTask (helloTest); new Thread (futureTask,"线程" ).start(); String msg = (String) futureTask.get(); System.out.println(msg); } } class HelloTest implements Callable <String>{ @Override public String call () throws Exception { System.out.println("call方法调用" ); return "hello from Callable" ; } }
其中的FutureTask是实现了RunnableFuture,而RunnableFuture接口继承了Runnable,其实借用FutureTask类打开线程本质就是接用Runnable来开启线程的。
常用辅助类 CountDownLatch 这是一个减法计数器,可以使一个线程等很多线程执行完毕后在执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class Test { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (10 ); for (int i = 1 ; i <= 10 ; i++) { new Thread (()->{ System.out.println(Thread.currentThread().getName() + "线程执行" ); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("程序结束" ); } }
CyclicBarrier 这是一个加法计数器,可以使很多线程到达一个目的地后在执行线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Test { public static void main (String[] args) throws InterruptedException, BrokenBarrierException { CyclicBarrier cyclicBarrier = new CyclicBarrier (10 ,()->{ System.out.println("所有线程开启成功" ); }); for (int i = 1 ; i <= 10 ; i++) { new Thread (()->{ System.out.println(Thread.currentThread().getName() + "线程启动" ); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
Semaphore 一个计数信号量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Test { public static void main (String[] args) { Semaphore semaphore = new Semaphore (2 ); for (int i = 0 ; i <= 10 ; i++) { new Thread (() ->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "开启了线程" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "结束了线程" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } },String.valueOf(i)).start(); } } }
阻塞队列 阻塞队列顾名思义就是会阻塞的队列,是队列的子类,其类为BlockingQueue
,数组的实现方式是ArrayBlockingQueue
和链表的实现方式是LinkedBlockingQueue
。
有返回值会抛出异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Test { public static void main (String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); arrayBlockingQueue.add("1" ); arrayBlockingQueue.add("2" ); arrayBlockingQueue.add("3" ); arrayBlockingQueue.add("4" ); System.out.println(arrayBlockingQueue.element()); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public boolean add (E e) { return super .add(e); } public boolean add (E e) { if (offer(e)) return true ; else throw new IllegalStateException ("Queue full" ); } public E remove () { E x = poll(); if (x != null ) return x; else throw new NoSuchElementException (); } public E element () { E x = peek(); if (x != null ) return x; else throw new NoSuchElementException (); }
这里可以看出这三个方法都是调用了下面不抛出异常的三个方法,加以修饰让其抛出异常而已。
有返回值不抛出异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Test { public static void main (String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); System.out.println(arrayBlockingQueue.offer("1" )); System.out.println(arrayBlockingQueue.offer("2" )); System.out.println(arrayBlockingQueue.offer("3" )); System.out.println(arrayBlockingQueue.offer("4" )); System.out.println(arrayBlockingQueue.peek()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public boolean offer (E e) { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } } public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
由此可以看出这三个方法都是用到了锁的,使得上面抛出异常的三个方法也是加锁了的。
阻塞等待 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Test { public static void main (String[] args) throws InterruptedException { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); arrayBlockingQueue.put("1" ); arrayBlockingQueue.put("2" ); arrayBlockingQueue.put("3" ); arrayBlockingQueue.put("4" ); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
超时等待 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Test { public static void main (String[] args) throws InterruptedException { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); arrayBlockingQueue.offer("1" ); arrayBlockingQueue.offer("2" ); arrayBlockingQueue.offer("3" ); arrayBlockingQueue.offer("4" ,2 , TimeUnit.SECONDS); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(2 ,TimeUnit.SECONDS)); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0 ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true ; } finally { lock.unlock(); } } public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) { if (nanos <= 0 ) return null ; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
总结
方法
有返回值抛出异常
有返回值不抛出异常
阻塞等待
超时等待
入队
add
offer
put
带参数的offer
出队
remove
poll
take
带参数的poll
返回队首元素
element
peek
-
-
同步队列 BlockingQueue
还有一个实现类为同步队列SynchronousQueue
,这个队列只能容纳一个元素,当元素满了再入队就会等待,当元素空了再出队也会等待,类似于上述的阻塞等待,但是只能容纳一个元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Test { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue <>(); new Thread (()->{ try { System.out.println(Thread.currentThread().getName() + "put 1" ); blockingQueue.put("1" ); System.out.println(Thread.currentThread().getName() + "put 2" ); blockingQueue.put("2" ); System.out.println(Thread.currentThread().getName() + "put 3" ); blockingQueue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } },"线程一" ).start(); new Thread (()->{ try { TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + "take 1" ); System.out.println(blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + "take 2" ); System.out.println(blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + "take 3" ); System.out.println(blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"线程二" ).start(); } }
线程池 四个方法
创建线程池的方法
1 2 3 4 5 6 7 8 ExecutorService pool1 = Executors.newSingleThreadExecutor(); ExecutorService pool2 = Executors.newFixedThreadPool(5 ); ExecutorService pool3 = Executors.newCachedThreadPool(); ExecutorService pool4 = Executors.newScheduledThreadPool(5 );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }
阿里巴巴编程规范不建议用这种方式开启线程池,因为可能会因为创建线程过多或者请求队列的线程过多而产生OOM,所以建议使用原生的ThreadPoolExecutor来开辟。
使用线程池里的线程
1 2 3 4 5 6 pool1.execute(); pool1.execute(()->{ System.out.println(Thread.currentThread().getName()); });
使用完记得关闭线程池
七大参数 七大参数其实就是ThreadPoolExecutor
的参数,先看一下构造方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
参数
含义
int corePoolSize
核心线程数
int maximumPoolSize
最大并发的线程数,只有阻塞队列满了才会开启更多的线程直到最大并发数
long keepAliveTime
超时等候时间,时间到了就会释放线程
TimeUnit unit
设置时间长度,与上一个参数搭配
BlockingQueue workQueue
阻塞队列
ThreadFactory threadFactory
线程工厂,一般固定
RejectedExecutionHandler handler
拒绝策略,即线程数多到使阻塞队列满了而且最大线程数不够用了才执行的策略
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Test { public static void main (String[] args) { ExecutorService pool = new ThreadPoolExecutor ( 2 , 5 , 3 , TimeUnit.SECONDS, new ArrayBlockingQueue <>(5 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy() ); } }
四个拒绝策略 拒绝策略共有四个,都是当线程数大于最大并发数加阻塞队列容量的时候,才会采取的策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 * <ol> * * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the * handler throws a runtime {@link RejectedExecutionException} upon * rejection. </li> * * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread * that invokes {@code execute} itself runs the task. This provides a * simple feedback control mechanism that will slow down the rate that * new tasks are submitted. </li> * * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that * cannot be executed is simply dropped. </li> * * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the * executor is not shut down, the task at the head of the work queue * is dropped, and then execution is retried (which can fail again, * causing this to be repeated.) </li>* * </ol>
AbortPolicy
丢弃任务并抛出RejectedExecutionException
异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException ("Task " + r.toString() + " rejected from " + e.toString()); } }
这是线程池默认的拒绝策略,在类中也可以看到。
1 2 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy ();
CallerRunsPolicy
调用任务的run()
方法绕过线程池直接执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardPolicy
丢弃任务,但是不会抛出异常,这是不推荐使用的拒绝策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy
抛弃队列中等待最久的任务,然后把当前任务加入队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
关于最大并发线程数的设置
CPU密集型 CPU有几核就设置为多少个线程。
1 2 Runtime.getRuntime().availableProcessors();
IO密集型 根据具体情况确定,若有15个任务,那最大并发线程数可以设置为大于15,比如设置为30