《Java并发编程的艺术》读书笔记四

《Java并发编程的艺术》读书笔记

并发容器

ConcurrentHashMap

ConcurrentHashMap是线程安全的HashMap。HashMap在多线程下会导致死循环,而线程安全的HashTable则由于每个方法都使用了synchronized而导致效率底下。ConcurrentHashMap通过将数据分成一段一段地存储,然后给每段数据都配一把锁,当一个线程占用锁访问其中一个数据段的时候,其他线程也能同时访问其他数据段的数据,从而大大地提高了效率。

ConcurrentHashMap由Segment数组结构和HashEntry数据结构组成,Segment是一种可重入锁,HashEntry存储键值对数据。每个Segment守护着一个HashEntry数组中的元素,要对HashEntry进行修改,需要获取对应的Segment锁。

ConcurrentHashMap的get方法不需要加锁就可以获取,put方法先定位Segment,获取到Segment锁后,先判断是否要扩容HashEntry数组,然后添加,size方法要统计所有的Segment下的HashEntry的大小才能获得总的大小,ConcurrentHashMap先尝试2次不锁住所有的Segment的方式来统计,如果统计过程中容器的大小发生变化(通过modCount检测)则再使用锁住Segment的方式来统计。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是使用非阻塞的方式来实现的线程安全队列。

ConcurrentLinkedQueue的入队操作分两步,先定位尾节点,然后将入队节点设为尾节点。出队通过获取头结点并通过CAS设置头结点为null来实现。

阻塞队列

阻塞队列是支持阻塞入队和阻塞出队的队列。

插入和移除的4种处理方式

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek() 不可用 不可用

JDK7中又7种阻塞队列:

  • ArrayBlockingQueue : 一个数组结构组成的有界阻塞队列
  • LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列
  • PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列
  • DelayQueue : 一个使用优先级队列实现的无界阻塞队列
  • SynchronousQueue : 一个不存储元素的阻塞队列
  • LinkedTransferQueue : 一个由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque : 一个由链表结构组成的双向阻塞队列

DelayQueue是支持延时获取元素的无界阻塞队列,其中的元素必须实现Delay接口,实现方式可以参考ScheduledThreadPoolExecutor里ScheduledFutureTask类的实现。

SynchronousQueue是一个不存储元素的阻塞队列,每个put操作必须等待一个take操作,否则不能继续添加元素。

Fork/Join框架

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

使用Fork/Join框架时需要清楚如何分割任务,在什么时候停止分割,如何汇总结果,有结果的任务需要继承RecursiveTask类。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.*;

public class CountTask extends RecursiveTask<Integer> {

private static final int THRESHOLD = 2; // 阀值
private int start;
private int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//如果任务大于阀值,就分割成两个子任务来计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完成,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任务的结果
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务, 负责计算 1 + 2 + 3 + 4
CountTask task = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException ignore) {}
}
}

Fork/Join框架提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了。

1
2
3
if (task.isCompletedAbnormally()) {
System.out.println(task.getException());
}

getException方法在任务被取消了返回CancellationException,抛出异常则返回Throwable对象,任务没有完成或者没有抛出异常则返回null。

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组存放提交给ForkJoinPool的任务,ForkJoinWorkerThread数组负责执行这些任务。

原子操作类

在java.util.concurrent.atomic包中提供了13类,用来提供原子操作。

操作基本类型

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

三个类提供的方法类似,有addAndGet、compareAndSet、getAndIncrement、lazySet和getAndSet等。

操作数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

操作引用类型

  • AtomicReference
  • AtomicReferenceFieldUpdater
  • AtomicMarkableReference

操作字段

  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater
  • AtomicStampedReference