什么是JUC JUC是java.util.concurrent
一些基本的概念 程序,进程与线程
简单理解为并发就是交替做一件事,并行就是同时做不同的事 。
上下文切换 即使是单核的CPU也是支持多线程执行的,CPU通过给每个线程分配CPU时间片来实现这个机制,这个时间片非常短,所以我们觉得多个线程是一起执行的,但其实是CPU在不断的切换线程执行。
并发是否比串行执行速度快 刚刚提到了上下文切换,可以知道并发其实并不一定会比串行的速度快,因为并发会消耗一部分时间在上下文切换上,所以当并发执行的操作不超过百万次时,速度会比串行执行操作要慢。
Java进程的状态 从Java的Thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
synchronized 首先来看个简单的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Test implements Runnable { private int ticket = 30 ; private boolean flag = true ; public static void main (String[] args) { Test test = new Test (); new Thread (test,"第一个人" ).start(); new Thread (test,"第二个人" ).start(); new Thread (test,"第三个人" ).start(); } public void buyTicket () { if (ticket <= 0 ) { flag = false ; return ; } try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "获得了第" + ticket-- + "张票" ); } @Override public void run () { while (flag) { buyTicket(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.yw;public class Test implements Runnable { private int ticket = 30 ; private boolean flag = true ; public static void main (String[] args) { Test test = new Test (); new Thread (test,"第一个人" ).start(); new Thread (test,"第二个人" ).start(); new Thread (test,"第三个人" ).start(); } public synchronized void buyTicket () { if (ticket <= 0 ) { flag = false ; return ; } try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "获得了第" + ticket-- + "张票" ); } @Override public void run () { while (flag) { buyTicket(); } } }
Lock锁 也可以不用synchronized
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ReentrantLock () { sync = new NonfairSync (); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class Test implements Runnable { private int ticket = 30 ; private boolean flag = true ; Lock lock = new ReentrantLock (); public static void main (String[] args) { Test test = new Test (); new Thread (test,"第一个人" ).start(); new Thread (test,"第二个人" ).start(); new Thread (test,"第三个人" ).start(); } public void buyTicket () { lock.lock(); try { if (ticket <= 0 ) { flag = false ; return ; } Thread.sleep(100 ); System.out.println(Thread.currentThread().getName() + "获得了第" + ticket-- + "张票" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } @Override public void run () { while (flag) { buyTicket(); } } }
生产者和消费者的问题 synchronzied 我们计划,当商品数量为1的时候,消费者进行消费,当商品数量为0的时候,生产者进行生产。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class Test { public static void main (String[] args) { Shop shop = new Shop (); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生成者" ).start(); } } class Shop { private int shopnum = 1 ; public synchronized void buyShop () throws InterruptedException { if (shopnum == 0 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum--); this .notifyAll(); } public synchronized void AddShop () throws InterruptedException { if (shopnum == 1 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum++); this .notifyAll(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class Test { public static void main (String[] args) { Shop shop = new Shop (); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者" ).start(); } } class Shop { private int shopnum = 1 ; public synchronized void buyShop () throws InterruptedException { while (shopnum == 0 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum--); this .notifyAll(); } public synchronized void AddShop () throws InterruptedException { while (shopnum == 1 ) { this .wait(); } System.out.println(Thread.currentThread().getName() + shopnum++); this .notifyAll(); } }
Lock锁 这里的Lock接口是没办法实现线程等待和唤醒的,所以就使用了一个Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class Test { public static void main (String[] args) { Shop shop = new Shop (); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者1" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者1" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.buyShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者2" ).start(); new Thread (()->{ for (int i = 0 ; i < 50 ; i++) { try { shop.AddShop(); } catch (InterruptedException e) { e.printStackTrace(); } } },"生产者2" ).start(); } } class Shop { private int shopnum = 1 ; Lock lock = new ReentrantLock (); Condition condition = lock.newCondition(); public void buyShop () throws InterruptedException { lock.lock(); try { while (shopnum == 0 ) { condition.await(); } System.out.println(Thread.currentThread().getName() + shopnum--); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void AddShop () throws InterruptedException { lock.lock(); try { while (shopnum == 1 ) { condition.await(); } System.out.println(Thread.currentThread().getName() + shopnum++); condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
读写锁 读写锁,这个类其实和上面两个加锁方法差不多,该锁可以将封锁粒度减少一点。读写锁其实就是之前学到的排他锁和共享锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class Test { public static void main (String[] args) { Mycache mycache = new Mycache (); for (int i = 1 ; i <= 10 ; i++) { final int temp = i; new Thread (()->{ mycache.put(temp + "" ,temp + "" ); },String.valueOf(i)).start(); } for (int i = 1 ; i <= 10 ; i++) { final int temp = i; new Thread (()->{ mycache.get(temp + "" ); },String.valueOf(i)).start(); } } } class Mycache { private volatile Map<String,String> map = new HashMap <>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock (); public void put (String key,String value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key,value); System.out.println(Thread.currentThread().getName() + "写入完成" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public void get (String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "读取" + key); String value = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
集合类 经常使用的一些集合类在并发操作下很多都是不安全的,多个线程进行写入操作就会出现并发修改异常ConcurrentModificationException
1 2 3 4 5 6 public synchronized boolean add (E e) { modCount++; ensureCapacityHelper(elementCount + 1 ); elementData[elementCount++] = e; return true ; }
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { List<String> list = Collections.synchronizedList(new ArrayList <>()); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ list.add("1111" ); System.out.println(list); },String.valueOf(i)).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { List<String> list = new CopyOnWriteArrayList <>(); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ list.add("1111" ); System.out.println(list); },String.valueOf(i)).start(); } } }
Set HashSet也和ArrayList一样不安全,所以需要进行一些操作。
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Set<String> set = Collections.synchronizedSet(new HashSet <>()); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ set.add(UUID.randomUUID().toString()); System.out.println(set); },String.valueOf(i)).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Set<String> set = new CopyOnWriteArraySet <>(); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ set.add(UUID.randomUUID().toString()); System.out.println(set); },String.valueOf(i)).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Map<String,String> map = Collections.synchronizedMap(new HashMap <>()); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ map.put(Thread.currentThread().getName(),UUID.randomUUID().toString()); System.out.println(map); },String.valueOf(i)).start(); } } }
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { Map<String,String> map = new ConcurrentHashMap <>(); for (int i = 1 ; i <= 20 ; i++) { new Thread (()->{ map.put(Thread.currentThread().getName(),UUID.randomUUID().toString()); System.out.println(map); },String.valueOf(i)).start(); } } }
Callable Callable是第三种开启线程的方式,但是线程的开启都是通过Thread类的start方法来开启的,而Callable的实现类是无法传入Thread类中的,所以需要借用中间类来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { HelloTest helloTest = new HelloTest (); FutureTask futureTask = new FutureTask (helloTest); new Thread (futureTask,"线程" ).start(); String msg = (String) futureTask.get(); System.out.println(msg); } } class HelloTest implements Callable <String>{ @Override public String call () throws Exception { System.out.println("call方法调用" ); return "hello from Callable" ; } }
常用辅助类 CountDownLatch 这是一个减法计数器,可以使一个线程等很多线程执行完毕后在执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class Test { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (10 ); for (int i = 1 ; i <= 10 ; i++) { new Thread (()->{ System.out.println(Thread.currentThread().getName() + "线程执行" ); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println("程序结束" ); } }
CyclicBarrier 这是一个加法计数器,可以使很多线程到达一个目的地后在执行线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Test { public static void main (String[] args) throws InterruptedException, BrokenBarrierException { CyclicBarrier cyclicBarrier = new CyclicBarrier (10 ,()->{ System.out.println("所有线程开启成功" ); }); for (int i = 1 ; i <= 10 ; i++) { new Thread (()->{ System.out.println(Thread.currentThread().getName() + "线程启动" ); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
Semaphore 一个计数信号量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Test { public static void main (String[] args) { Semaphore semaphore = new Semaphore (2 ); for (int i = 0 ; i <= 10 ; i++) { new Thread (() ->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "开启了线程" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException interruptedException) { interruptedException.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "结束了线程" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } },String.valueOf(i)).start(); } } }
阻塞队列 阻塞队列顾名思义就是会阻塞的队列,是队列的子类,其类为BlockingQueue
有返回值会抛出异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Test { public static void main (String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); arrayBlockingQueue.add("1" ); arrayBlockingQueue.add("2" ); arrayBlockingQueue.add("3" ); arrayBlockingQueue.add("4" ); System.out.println(arrayBlockingQueue.element()); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); arrayBlockingQueue.remove(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public boolean add (E e) { return super .add(e); } public boolean add (E e) { if (offer(e)) return true ; else throw new IllegalStateException ("Queue full" ); } public E remove () { E x = poll(); if (x != null ) return x; else throw new NoSuchElementException (); } public E element () { E x = peek(); if (x != null ) return x; else throw new NoSuchElementException (); }
有返回值不抛出异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Test { public static void main (String[] args) { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); System.out.println(arrayBlockingQueue.offer("1" )); System.out.println(arrayBlockingQueue.offer("2" )); System.out.println(arrayBlockingQueue.offer("3" )); System.out.println(arrayBlockingQueue.offer("4" )); System.out.println(arrayBlockingQueue.peek()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public boolean offer (E e) { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } } public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
阻塞等待 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Test { public static void main (String[] args) throws InterruptedException { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); arrayBlockingQueue.put("1" ); arrayBlockingQueue.put("2" ); arrayBlockingQueue.put("3" ); arrayBlockingQueue.put("4" ); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); System.out.println(arrayBlockingQueue.take()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
超时等待 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Test { public static void main (String[] args) throws InterruptedException { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue <>(3 ); arrayBlockingQueue.offer("1" ); arrayBlockingQueue.offer("2" ); arrayBlockingQueue.offer("3" ); arrayBlockingQueue.offer("4" ,2 , TimeUnit.SECONDS); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(2 ,TimeUnit.SECONDS)); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0 ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true ; } finally { lock.unlock(); } } public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) { if (nanos <= 0 ) return null ; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
同步队列 BlockingQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Test { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue <>(); new Thread (()->{ try { System.out.println(Thread.currentThread().getName() + "put 1" ); blockingQueue.put("1" ); System.out.println(Thread.currentThread().getName() + "put 2" ); blockingQueue.put("2" ); System.out.println(Thread.currentThread().getName() + "put 3" ); blockingQueue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } },"线程一" ).start(); new Thread (()->{ try { TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + "take 1" ); System.out.println(blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + "take 2" ); System.out.println(blockingQueue.take()); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + "take 3" ); System.out.println(blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"线程二" ).start(); } }
线程池 四个方法
1 2 3 4 5 6 7 8 ExecutorService pool1 = Executors.newSingleThreadExecutor(); ExecutorService pool2 = Executors.newFixedThreadPool(5 ); ExecutorService pool3 = Executors.newCachedThreadPool(); ExecutorService pool4 = Executors.newScheduledThreadPool(5 );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); } public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }
1 2 3 4 5 6 pool1.execute(); pool1.execute(()->{ System.out.println(Thread.currentThread().getName()); });
七大参数 七大参数其实就是ThreadPoolExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
int corePoolSize
int maximumPoolSize
long keepAliveTime
TimeUnit unit
BlockingQueue workQueue
ThreadFactory threadFactory
RejectedExecutionHandler handler
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Test { public static void main (String[] args) { ExecutorService pool = new ThreadPoolExecutor ( 2 , 5 , 3 , TimeUnit.SECONDS, new ArrayBlockingQueue <>(5 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy() ); } }
四个拒绝策略 拒绝策略共有四个,都是当线程数大于最大并发数加阻塞队列容量的时候,才会采取的策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 * <ol> * * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the * handler throws a runtime {@link RejectedExecutionException} upon * rejection. </li> * * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread * that invokes {@code execute} itself runs the task. This provides a * simple feedback control mechanism that will slow down the rate that * new tasks are submitted. </li> * * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that * cannot be executed is simply dropped. </li> * * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the * executor is not shut down, the task at the head of the work queue * is dropped, and then execution is retried (which can fail again, * causing this to be repeated.) </li>* * </ol>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException ("Task " + r.toString() + " rejected from " + e.toString()); } }
1 2 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy ();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
CPU密集型 CPU有几核就设置为多少个线程。
1 2 Runtime.getRuntime().availableProcessors();
IO密集型 根据具体情况确定,若有15个任务,那最大并发线程数可以设置为大于15,比如设置为30