《Java并发编程的艺术》读书笔记五

《Java并发编程的艺术》读书笔记

并发工具类

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

CountDownLatch的构造函数接受一个int类型的参数作为计数器,在调用countDown方法后,计数器会减1,在计数器减为0时,在CountDownLatch上因为调用await方法而阻塞的线程会被唤醒继续执行。

CountDownLatch只能使用一次,无法重新初始化或修改计数器的值。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
new Thread(() -> {
System.out.println(Thread.currentThread());
latch.countDown();
}).start();
new Thread(() -> {
System.out.println(Thread.currentThread());
latch.countDown();
}).start();
latch.await();
System.out.println(Thread.currentThread());
}

}

main线程会在latch.await()阻塞,等待latch的计数器减为0后继续往下执行,上面两个线程在打印出线程信息后会将计数器减1,在上面两个线程执行完毕后,计数器减为0,main线程才能继续往下执行。

CyclicBarrier

CyclicBarrier让一组线程到达一个屏障时被阻塞,直到所有的线程都到达屏障时,屏障才回放行所有线程,被拦截的线程才能继续往下执行。

CyclicBarrier的构造函数带又一个int的参数,表示需要拦截多少线程,每个线程调用await方法告诉CyclicBarrier它到达屏障了,然后阻塞。CyclicBarrier还有一个带Runnable参数的构造函数,在所有线程都到达了屏障,会先执行这个Runnable。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CyclicBarrierTest implements Runnable {

private CyclicBarrier barrier = new CyclicBarrier(3, this);
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

private void doWork() {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(), ThreadLocalRandom.current().nextInt(100));
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

@Override
public void run() {
int sum = 0;
for (Integer i : map.values()) {
sum += i;
}
System.out.println(sum);
}

public static void main(String[] args) {
CyclicBarrierTest cyclicBarrierTest = new CyclicBarrierTest();
cyclicBarrierTest.doWork();
}
}

Semaphore

Semaphore用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。

Semaphore的构造函数带有一个int的参数,表示同时允许多少线程访问资源,线程在访问资源之前需要获取Semaphore许可,获得了许可的线程才能访问资源,未获得许可的线程将被阻塞。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SemaphoreTest {

private static Semaphore semaphore = new Semaphore(2);

public static void main(String[] args) {
for (int i = 0; i < 2; i++) {

new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}

main线程在获取许可的时候将会阻塞,因为之前两个线程已经获取了许可在执行操作并没有多余的许可了。

Exchanger

Exchanger是一个用于线程间协作的工具类,用于进行线程间的数据交换。Exchanger只提供了两个exchange方法,包括一个支持超时的exchange方法。两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,它会一直等待第二个线程执行exchange方法。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ExchangerTest {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println(exchanger.exchange("bbbbb"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
String v1 = exchanger.exchange("aaa");
System.out.println(v1);
new Thread(() -> {
try {
System.out.println(exchanger.exchange("ccccc"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
String v2 = exchanger.exchange("bbb");
System.out.println(v2);
}
}

线程池

使用线程池的好处:降低资源消耗、提高响应速度和提高线程的可管理性。

ThreadPoolExecutor的execute方法的执行过程:

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获取全局锁)
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获取全局锁)
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution方法。

线程池的创建可以指定这么几个参数:

  • corePoolSize: 线程池的基本大小,当提交一个任务到线程池,如果线程池线程数量没有达到corePoolSize,即使又空闲线程,线程池也会创建新的线程来执行该任务
  • runnableTaskQueue: 任务队列,用于保存等待执行的任务的阻塞队列。可以有如下几个队列可选:
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • SynchronousQueue
    • PriorityBlockingQueue
  • ThreadFactory:用于创建线程的工程
  • RejectedExecutionHandler: 饱和策略,当队列和线程池满了,使用该策略来处理新任务,JDK1.5又如下的策略:
    • AbortPolicy:直接抛出异常
    • CallerRunsPolicy: 只用调用者所在线程来执行任务
    • DiscardOldestPolicy: 丢弃队头的任务,并执行当前任务
    • DiscardPolicy: 不处理,丢弃。
  • keepAliveTime:线程活动保持时间,超出线程池corePoolSize的工作线程空闲后,保持存活的时间。
  • TimeUnit:keepAliveTime的单位

线程池通过execute方法提交一个无返回值的任务,使用submit提交一个有返回值的任务,该方法返回一个Future。

线程池通过shutdown和shutdownNow方法来关闭线程池,它们通过遍历线程池中的线程,调用线程的interrupt方法来中断线程,所以无法响应中断的任务无法终止。shutdownNow会中断正在执行或暂停的任务,而shutdown只中断没有执行的任务。

线程池的配置需要根据任务的特点和系统的配置来进行。CPU密集型的任务可以将线程池中的线程数量配置得和CPU数量的大小,IO密集型的任务则可以配置更多的线程,混合型的可以将任务分成CPU密集和IO秘籍两类来进行。阻塞队列推荐使用有界队列。

Executor框架

Executor框架包括3大部分:

  • 任务:包括执行任务需要实现的接口 Runnable接口和Callable接口
  • 任务的执行:包括任务执行机制的核心接口Executor,以及继承Executor的ExecutorService接口,其实现类有ThreadPoolExecutor和ScheduledThreadPoolExecutor。
  • 异步计算的结果:包括Future接口和FutureTask实现类

Executors工具类可以用来创建ThreadPoolExecutor和ScheduledThreadPoolExecutor实例。

ThreadPoolExecutor有3种类型

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

ScheduledThreadPoolExecutor有2种类型

  • ScheduledThreadPoolExecutor
  • SingleThreadScheduledExecturor