Preface
CAS实现 、volatile语意 -> AQS(AbstractQueuedSynchronizer)
java.util.concurrent
Utility classes commonly useful in concurrent programming.This package includes a few small standardized extensible frameworks, as well as some classes that provide useful functionality and are otherwise tedious or difficult to implement.
实用程序类通常用于并发编程。这个包, 包括一些小型的标准化可扩展框架,以及一些提供有用功能的类,通常这些类是单调乏味或难以实现的。
Executors
Interfaces
Executor
Executor
is a simple standardized interface for defining custom thread-like subsystems, including thread pools, asynchronous I/O, and lightweight task frameworks.
Depending on which concrete Executor class is being used, tasks may execute in a newly created thread, an existing task-execution thread, or the thread calling execute
, and may execute sequentially or concurrently.
public interface Executor {
void execute(Runnable command);
}
ExecutorService
ExecutorService
provides a more complete asynchronous task execution framework. An ExecutorService manages queuing and scheduling of tasks, and allows controlled shutdown.
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
不同于submit
方法invokeAny,invokeAll具有阻塞性。
- invokeAny取得第一个方法的返回值,当第一个任务结束后,会调用interrupt方法中断其它任务。
- invokeAll等线程任务执行完毕后,取得全部任务的结果值。
ScheduledExecutorService
The ScheduledExecutorService
subinterface and associated interfaces add support for delayed and periodic task execution.
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
Callable
ExecutorServices provide methods arranging asynchronous execution of any function expressed as Callable
, the result-bearing analog of Runnable
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Future
A Future
returns the results of a function, allows determination of whether execution has completed, and provides a means to cancel execution.
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFuture
A RunnableFuture
is a Future
that possesses a run
method that upon execution, sets its results.
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
ThreadFactory
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
构造新的线程,初始化 优先级 、名字 等
Implementations
ThreadPoolExecutor & ScheduledThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{...}
public static class xxx implements rejectedExecution{...}
implements Executor{..}
implements ExecutorService{..}
...
}
这个类很有意思呀,包含了运行状态,worker数量的计算, Worker的实现,拒绝策略的实现,Executor的实现,另外注意构造函数,注意理解各个参数的含义⚠️
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {
/*
* This class specializes ThreadPoolExecutor implementation by
*
* 1. Using a custom task type, ScheduledFutureTask for
* tasks, even those that don't require scheduling (i.e.,
* those submitted using ExecutorService execute, not
* ScheduledExecutorService methods) which are treated as
* delayed tasks with a delay of zero.
*
* 2. Using a custom queue (DelayedWorkQueue), a variant of
* unbounded DelayQueue. The lack of capacity constraint and
* the fact that corePoolSize and maximumPoolSize are
* effectively identical simplifies some execution mechanics
* (see delayedExecute) compared to ThreadPoolExecutor.
*
* 3. Supporting optional run-after-shutdown parameters, which
* leads to overrides of shutdown methods to remove and cancel
* tasks that should NOT be run after shutdown, as well as
* different recheck logic when task (re)submission overlaps
* with a shutdown.
*
* 4. Task decoration methods to allow interception and
* instrumentation, which are needed because subclasses cannot
* otherwise override submit methods to get this effect. These
* don't have any impact on pool control logic though.
*/
...
}
与ThreadPoolExecutor不同的是,使用的是task,延迟队列,重写了onshutdown
Executors
The Executors
class provides factory methods for the most common kinds and configurations of Executors, as well as a few utility methods for using them.
类似于Collections, Executors类为最常见的executor类型和配置提供工厂方法,以及一些使用它们的实用程序方法。
FutureTask & ExecutorCompletionService
Other utilities based on Executors
include the concrete class FutureTask
providing a common extensible implementation of Futures, and ExecutorCompletionService
, that assists in coordinating the processing of groups of asynchronous tasks.
基于executor的其他实用程序包括提供future公共可扩展实现的具体类FutureTask和ExecutorCompletionService,后者帮助协调异步任务组的处理。
ForkJoinPool & ForkJoinTask & ForkJoinWorkerThread(@since 1.7)
ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
Queues
BlockingQueue
A Queue
that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
BlockingQueue
methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future:
one throws an exception,
the second returns a special value (either null
or false
, depending on the operation),
the third blocks the current thread indefinitely until the operation can succeed,
and the fourth blocks for only a given maximum time limit before giving up.
These methods are summarized in the following table:
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
Remove | remove() |
poll() |
take() |
poll(time, unit) |
Examine | element() |
peek() |
not applicable | not applicable |
public interface BlockingQueue<E> extends Queue<E> {
/*
* imple Queue
*/
boolean add(E e);
boolean offer(E e);
/*
* imple Collection
*/
boolean remove(Object o);
public boolean contains(Object o);
/*
* BlockingQueue methods put take
*/
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/*
* Retrieves and removes the head of this queue
*/
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
}
实现了Queue接口 , 而Queue接口又实现了Collection, 而BlockingQueue 新增的有阻塞式的 put和take ,
提供阻塞超时机制的, offer、poll(扩展了Queue的方法)
BlockingDeque(@since 1.6) & TransferQueue(@since 1.7)
BlockingDeque与BlockingQueue类似
LinkedBlockingDeque
The BlockingDeque
interface extends BlockingQueue
to support both FIFO and LIFO (stack-based) operations.
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
...
void putFirst(E e) throws InterruptedException;
void putLast(E e) throws InterruptedException;
E takeFirst() throws InterruptedException;
E takeLast() throws InterruptedException;
...
}
TransferQueue
LinkedTransferQueue
introduce a synchronous transfer
method (along with related features) in which a producer may optionally block awaiting its consumer
TransferQueue是一个聪明的队列,它是ConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 无界的LinkedBlockingQueues等的超集。
🌟这是个很厉害的队列~ 只有在消费者等待的情况下才会生产, 解决消费者和生产者速度不匹配的问题, 例如 out of memory .
@see https://segmentfault.com/a/1190000011266361
Implementations.
Five implementations in java.util.concurrent
support the extended BlockingQueue
interface, that defines blocking versions of put and take: LinkedBlockingQueue
, ArrayBlockingQueue
, SynchronousQueue
, PriorityBlockingQueue
, and DelayQueue
. The different classes cover the most common usage contexts for producer-consumer, messaging, parallel tasking, and related concurrent designs.
LinkedBlockingQueue
有限阻塞队列,底层存储链表结构, 可以不指定大小, 默认Integer.MAX_VALUE
ArrayBlockingQueue
有限阻塞队列,底层存储是数组,需要指定大小初始化 , 另一个是可以指定ReentrantLock公平锁或者非公平锁
SynchronousQueue
是一个空队列,用于传递数据
SynchronousQueue使用两个队列(一个用于正在等待的生产者、另一个用于正在等待的消费者)和一个用来保护两个队列的锁。而LinkedTransferQueue使用CAS操作实现一个非阻塞的方法,这是避免序列化处理任务的关键。
@see https://segmentfault.com/a/1190000011207824
PriorityBlockingQueue
优先级队列, 实现上使用了数组, 默认初始化大小是 DEFAULT_INITIAL_CAPACITY = 11
使用PriorityQueue,用于Serializable
DelayQueue.
延迟队列,实现上使用了PriorityQueue,默认初始化大小是 DEFAULT_INITIAL_CAPACITY = 11(没有实现java.io.Serializable)
Timing
The TimeUnit
class provides multiple granularities (including nanoseconds) for specifying and controlling time-out based operations. Most classes in the package contain operations based on time-outs in addition to indefinite waits. In all cases that time-outs are used, the time-out specifies the minimum time that the method should wait before indicating that it timed-out. Implementations make a “best effort” to detect time-outs as soon as possible after they occur. However, an indefinite amount of time may elapse between a time-out being detected and a thread actually executing again after that time-out. All methods that accept timeout parameters treat values less than or equal to zero to mean not to wait at all. To wait “forever”, you can use a value of Long.MAX_VALUE
.
TimeUnit类提供了多个粒度(包括纳秒),用于指定和控制基于超时的操作。
包中的大多数类除了不确定的等待之外,还包含基于超时的操作。在所有使用超时的情况下,超时指定方法在指示超时之前应该等待的最小时间。实现会在超时发生后“尽最大努力”尽快检测超时。然而,在检测到超时与实际在超时之后再次执行的线程之间可能会经过一段不确定的时间。
所有接受超时参数的方法都将小于或等于零的值视为根本不等待。要等待“永远”,可以使用Long.MAX_VALUE值。
public enum TimeUnit {
/**
* Time unit representing one thousandth of a microsecond
*/
NANOSECONDS {
public long toNanos(long d) { return d; }
public long toMicros(long d) { return d/(C1/C0); }
public long toMillis(long d) { return d/(C2/C0); }
public long toSeconds(long d) { return d/(C3/C0); }
public long toMinutes(long d) { return d/(C4/C0); }
public long toHours(long d) { return d/(C5/C0); }
public long toDays(long d) { return d/(C6/C0); }
public long convert(long d, TimeUnit u) { return u.toNanos(d); }
int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
},
MICROSECONDS {
...
},
...
public long convert(long sourceDuration, TimeUnit sourceUnit) {
throw new AbstractMethodError();
}
abstract int excessNanos(long d, long m);
public void timedWait(Object obj, long timeout)
throws InterruptedException {
if (timeout > 0) {
long ms = toMillis(timeout);
int ns = excessNanos(timeout, ms);
obj.wait(ms, ns);
}
}
public void timedJoin(Thread thread, long timeout)
throws InterruptedException {
if (timeout > 0) {
long ms = toMillis(timeout);
int ns = excessNanos(timeout, ms);
thread.join(ms, ns);
}
}
public void sleep(long timeout) throws InterruptedException {
if (timeout > 0) {
long ms = toMillis(timeout);
int ns = excessNanos(timeout, ms);
Thread.sleep(ms, ns);
}
}
}
Synchronizers
Five classes aid common special-purpose synchronization idioms.
Semaphore
is a classic concurrency tool.CountDownLatch
is a very simple yet very common utility for blocking until a given number of signals, events, or conditions hold.- A
CyclicBarrier
is a resettable multiway synchronization point useful in some styles of parallel programming. - A
Phaser
provides a more flexible form of barrier that may be used to control phased computation among multiple threads. - An
Exchanger
allows two threads to exchange objects at a rendezvous point, and is useful in several pipeline designs.
Semaphore
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource.
计数信号量。从概念上讲,信号量维护一组许可证。
如果有必要,每个acquire()块都会被阻塞,直到获得许可证为止,然后获取许可证。
每个release()都会添加一个许可证,潜在地释放一个阻塞的收购者。
但是,没有使用实际的许可证对象;信号量只保留可用数量的计数,并相应地执行操作。
信号量通常用于限制能够访问某些(物理或逻辑)资源的线程数。
public class Semaphore implements java.io.Serializable {
private final Sync sync;
/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
}
static final class NonfairSync extends Sync {
...
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
...
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
}
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
...
}
基于AbstractQueuedSynchronizer 实现,主要有acquire 和 release方法
CountDownLatch
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
一种同步辅助,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
}
CyclicBarrier
与 CountDownLatch 类似, 最大的不同点在于调用时点是否阻塞 ,注意实现也是不同的
从使用场景上来说,CyclicBarrier是让多个线程互相等待某一事件的发生,然后同时被唤醒。而上文讲的CountDownLatch是让某一线程等待多个线程的状态,然后该线程被唤醒。
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | ReentrantLock |
计数为0时,无法重置 | 计数置为0,重新开始 |
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 | 调用await()方法计数count减1,减1后的值不等于构造方法的值,则线程阻塞,否则执行barrierCommand |
public class CyclicBarrier {
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
...
}
Phaser(@since 1.7)
A reusable synchronization barrier, similar in functionality to CyclicBarrier
and CountDownLatch
but supporting more flexible usage.
Registration
Synchronization
Termination
Tiering (分层)
Monitoring
CountDownLatch和CyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能与CountDownLatch和CyclicBarrier有部分重叠,同时也提供了更丰富的语义和更灵活的用法。
Phaser顾名思义,与阶段相关。Phaser比较适合这样一种场景,一种任务可以分为多个阶段,现希望多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。
这种场景可以使用多个CyclicBarrier来实现,每个CyclicBarrier负责等待一个阶段的任务全部完成。但是使用CyclicBarrier的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而Phaser可同时解决这两个问题。
Exchanger
A ==synchronization== point at which threads can pair and swap elements within pairs.
Each thread presents some object on entry to the exchange
method, matches with a partner thread, and receives its partner’s object on return.
An Exchanger may be viewed as a bidirectional form of a SynchronousQueue
.
Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
在以上的描述中,有几个要点:
- 此类提供对外的操作是同步的;
- 用于成对出现的线程之间交换数据;
- 可以视作双向的同步队列;
- 可应用于基因算法、流水线设计等场景。
public class Exchanger<V> {
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null
}
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
public Exchanger() {
participant = new Participant();
}
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Object v;
Object item = (x == null) ? NULL_ITEM : x;
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
((Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null)))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
}
}
有点类似于生产者/消费者
Concurrent Collections
用于并发访问的集合(List、Map、Set)
Besides Queues, this package supplies Collection implementations designed for use in multithreaded contexts: ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
When many threads are expected to access a given collection, a ConcurrentHashMap
is normally preferable to a synchronized HashMap
, and a ConcurrentSkipListMap
is normally preferable to a synchronized TreeMap
. A CopyOnWriteArrayList
is preferable to a synchronized ArrayList
when the expected number of reads and traversals greatly outnumber the number of updates to a list.
与synchronized封装的类不同, Concurrent开头的支持并发访问
The “Concurrent” prefix used with some classes in this package is a shorthand indicating several differences from similar “synchronized” classes.
For example java.util.Hashtable
and Collections.synchronizedMap(new HashMap())
are synchronized. But ConcurrentHashMap
is “concurrent”. A concurrent collection is thread-safe, but not governed by a single exclusion lock. In the particular case of ConcurrentHashMap, it safely permits any number of concurrent reads as well as a tunable number of concurrent writes.
“Synchronized” classes can be useful when you need to prevent all access to a collection via a single lock, at the expense of poorer scalability. In other cases in which multiple threads are expected to access a common collection, “concurrent” versions are normally preferable. And unsynchronized collections are preferable when either collections are unshared, or are accessible only when holding other locks.
提供弱一致的遍历,而不是快速失败
Most concurrent Collection implementations (including most Queues) also differ from the usual java.util
conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:
- they may proceed concurrently with other operations
- they will never throw
ConcurrentModificationException
- they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
Memory Consistency Properties (内存一致性)
Java语言规范第17章定义了内存操作上的happens-before关系,比如共享变量的读写。只有当写操作发生时(在读操作之前),一个线程的写操作的结果才保证对另一个线程的读操作可见。同步和volatile构造,以及Thread.start()和Thread.join()方法,可以形成happens-before关系。
Chapter 17 of the Java Language Specification defines the happens-before relation on memory operations such as reads and writes of shared variables. The results of a write by one thread are guaranteed to be visible to a read by another thread only if the write operation happens-before the read operation. The synchronized and volatile constructs, as well as the Thread.start() and Thread.join() methods, can form happens-before relationships. In particular:
- Each action in a thread happens-before every action in that thread that comes later in the program’s order.
- An unlock (
synchronized
block or method exit) of a monitor happens-before every subsequent lock (synchronized
block or method entry) of that same monitor. And because the happens-before relation is transitive, all actions of a thread prior to unlocking happen-before all actions subsequent to any thread locking that monitor. - A write to a
volatile
field happens-before every subsequent read of that same field. Writes and reads ofvolatile
fields have similar memory consistency effects as entering and exiting monitors, but do not entail mutual exclusion locking. - A call to
start
on a thread happens-before any action in the started thread. - All actions in a thread happen-before any other thread successfully returns from a
join
on that thread.
在juc中,所有classes的 happens-before 关系
The methods of all classes in java.util.concurrent
- Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.
- Actions in a thread prior to the submission of a
Runnable
to anExecutor
happen-before its execution begins. Similarly forCallables
submitted to anExecutorService
. - Actions taken by the asynchronous computation represented by a
Future
happen-before actions subsequent to the retrieval of the result viaFuture.get()
in another thread. - Actions prior to “releasing” synchronizer methods such as
Lock.unlock
,Semaphore.release
, andCountDownLatch.countDown
happen-before actions subsequent to a successful “acquiring” method such asLock.lock
,Semaphore.acquire
,Condition.await
, andCountDownLatch.await
on the same synchronizer object in another thread. - For each pair of threads that successfully exchange objects via an
Exchanger
, actions prior to theexchange()
in each thread happen-before those subsequent to the correspondingexchange()
in another thread. - Actions prior to calling
CyclicBarrier.await
andPhaser.awaitAdvance
(as well as its variants) happen-before actions performed by the barrier action, and actions performed by the barrier action happen-before actions subsequent to a successful return from the correspondingawait
in other threads.
java.util.concurrent.atomic
一个支持单变量无锁线程安全编程的类的小工具包。本质上,这个包中的类将volatile值、字段和数组元素的概念扩展到那些还提供 compareAndSet 的原子条件更新操作的类
A small toolkit of classes that support lock-free thread-safe programming on single variables.
In essence, the classes in this package extend the notion of volatile
values, fields, and array elements to those that also provide an atomic conditional update operation of the form:
boolean compareAndSet(expectedValue, updateValue);
This method (which varies in argument types across different classes) atomically sets a variable to the updateValue
if it currently holds the expectedValue
, reporting true
on success.
The classes in this package also contain methods to get and unconditionally set values, as well as a weaker conditional atomic update operation weakCompareAndSet
described below.
The memory effects for accesses and updates of atomics generally follow the rules for volatiles, as stated in The Java Language Specification (17.4 Memory Model):
get
has the memory effects of reading avolatile
variable.set
has the memory effects of writing (assigning) avolatile
variable.lazySet
has the memory effects of writing (assigning) avolatile
variable except that it permits reorderings with subsequent (but not previous) memory actions that do not themselves impose reordering constraints with ordinary non-volatile
writes. Among other usage contexts,lazySet
may apply when nulling out, for the sake of garbage collection, a reference that is never accessed again.weakCompareAndSet
atomically reads and conditionally writes a variable but does not create any happens-before orderings, so provides no guarantees with respect to previous or subsequent reads and writes of any variables other than the target of theweakCompareAndSet
.compareAndSet
and all other read-and-update operations such asgetAndIncrement
have the memory effects of both reading and writingvolatile
variables.
Atomic Values
Instances of classes AtomicBoolean
, AtomicInteger
, AtomicLong
, and AtomicReference
each provide access and updates to a single variable of the corresponding type. Each class also provides appropriate utility methods for that type. For example, classes AtomicLong
and AtomicInteger
provide atomic increment methods.
public class AtomicInteger extends Number implements java.io.Serializable {
private volatile int value;
/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}
}
Atomic Fields
In addition to classes representing single values, this package contains Updater classes that can be used to obtain compareAndSet
operations on any selected volatile
field of any selected class. AtomicReferenceFieldUpdater
, AtomicIntegerFieldUpdater
, and AtomicLongFieldUpdater
are reflection-based utilities that provide access to the associated field types. These are mainly of use in atomic data structures in which several volatile
fields of the same node (for example, the links of a tree node) are independently subject to atomic updates. These classes enable greater flexibility in how and when to use atomic updates, at the expense of more awkward reflection-based setup, less convenient usage, and weaker guarantees.
Atomic Arrays
The AtomicIntegerArray
, AtomicLongArray
, and AtomicReferenceArray
classes further extend atomic operation support to arrays of these types. These classes are also notable in providing volatile
access semantics for their array elements, which is not supported for ordinary arrays.
Class Summary
Class | Description |
---|---|
AtomicBoolean | A boolean value that may be updated atomically. |
AtomicInteger | An int value that may be updated atomically. |
AtomicIntegerArray | An int array in which elements may be updated atomically. |
AtomicIntegerFieldUpdater |
A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes. |
AtomicLong | A long value that may be updated atomically. |
AtomicLongArray | A long array in which elements may be updated atomically. |
AtomicLongFieldUpdater |
A reflection-based utility that enables atomic updates to designated volatile long fields of designated classes. |
AtomicMarkableReference |
An AtomicMarkableReference maintains an object reference along with a mark bit, that can be updated atomically. |
AtomicReference |
An object reference that may be updated atomically. |
AtomicReferenceArray |
An array of object references in which elements may be updated atomically. |
AtomicReferenceFieldUpdater<T,V> | A reflection-based utility that enables atomic updates to designated volatile reference fields of designated classes. |
AtomicStampedReference |
An AtomicStampedReference maintains an object reference along with an integer “stamp”, that can be updated atomically. |
DoubleAccumulator | One or more variables that together maintain a running double value updated using a supplied function. |
DoubleAdder | One or more variables that together maintain an initially zero double sum. |
LongAccumulator | One or more variables that together maintain a running long value updated using a supplied function. |
LongAdder | One or more variables that together maintain an initially zero long sum. |
Designed
原子类主要设计为用于实现非阻塞数据结构和相关基础结构类的构建块。该compareAndSet
方法不是锁定的一般替代方法。仅当对象的关键更新仅限于单个变量时,它才适用。
原子类不是通用替换 java.lang.Integer
和相关类。他们没有 定义方法,如equals
,hashCode
和 compareTo
。(因为预期原子变量会发生变异,所以它们对于散列表键的选择很差。)此外,只为那些在预期应用程序中常用的类型提供类。例如,没有用于表示的原子类byte
。在您想要这样做的不常见的情况下,您可以使用a AtomicInteger
来保存 byte
值,并适当地进行投射。您还可以使用Float.floatToRawIntBits(float)
和 Float.intBitsToFloat(int)
转换使用浮点数 ,并使用Double.doubleToRawLongBits(double)
和 Double.longBitsToDouble(long)
转换加倍。
原文 :
Atomic classes are designed primarily as building blocks for implementing non-blocking data structures and related infrastructure classes. The
compareAndSet
method is not a general replacement for locking. It applies only when critical updates for an object are confined to a single variable.Atomic classes are not general purpose replacements for
java.lang.Integer
and related classes. They do not define methods such asequals
,hashCode
andcompareTo
. (Because atomic variables are expected to be mutated, they are poor choices for hash table keys.) Additionally, classes are provided only for those types that are commonly useful in intended applications. For example, there is no atomic class for representingbyte
. In those infrequent cases where you would like to do so, you can use anAtomicInteger
to holdbyte
values, and cast appropriately. You can also hold floats usingFloat.floatToRawIntBits(float)
andFloat.intBitsToFloat(int)
conversions, and doubles usingDouble.doubleToRawLongBits(double)
andDouble.longBitsToDouble(long)
conversions.
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
java.util.concurrent.locks
注意ConditionObject (不知道为啥没有显示为AbstractQueuedSynchronizer inner class)
接口和类提供了一个框架,用于锁定和等待,与内置同步和监视器不同的条件。该框架允许在使用锁和条件方面具有更大的灵活性,但代价是语法更加笨拙。
Interfaces and classes providing a framework for locking and waiting for conditions that is distinct from built-in synchronization and monitors. The framework permits much greater flexibility in the use of locks and conditions, at the expense of more awkward syntax.
Locks(锁)
The Lock
interface supports locking disciplines that differ in semantics (reentrant, fair, etc), and that can be used in non-block-structured contexts including hand-over-hand and lock reordering algorithms. The main implementation is ReentrantLock
.
The ReadWriteLock
interface similarly defines locks that may be shared among readers but are exclusive to writers. Only a single implementation, ReentrantReadWriteLock
, is provided, since it covers most standard usage contexts. But programmers may create their own implementations to cover nonstandard requirements.
Lock接口支持语义不同的锁定规则(可重入、公平等),并且可以在非块结构上下文中使用,包括手动和锁重排序算法。主要实现是ReentrantLock。
ReadWriteLock接口也类似地定义了锁,这些锁可以在readers之间共享,但只能由writers使用。只提供了一个实现ReentrantReadWriteLock,因为它涵盖了大多数标准使用上下文。但是程序员可以创建他们自己的实现来满足非标准需求。
Lock
Lock与synchronized的异同
- 内存语义相同
Memory Synchronization
All Lock
implementations must enforce the same memory synchronization semantics as provided by the built-in monitor lock, as described in The Java Language Specification (17.4 Memory Model):
- A successful
lock
operation has the same memory synchronization effects as a successful Lock action. - A successful
unlock
operation has the same memory synchronization effects as a successful Unlock action.
Unsuccessful locking and unlocking operations, and reentrant locking/unlocking operations, do not require any memory synchronization effects.
synchronization是关键字(可重入,非公平锁,锁的是对象Object), Lock是接口
synchronization 使用简单,无需手动释放锁,避免错误
When locking and unlocking occur in different scopes, care must be taken to ensure that all code that is executed while the lock is held is protected by try-finally or try-catch to ensure that the lock is released when necessary.
Lock 更加灵活, 支持非阻塞获取 ,可中断获取,和可超时获取.
Lock
implementations provide additional functionality over the use ofsynchronized
methods and statements by providing a non-blocking attempt to acquire a lock (tryLock()
), an attempt to acquire the lock that can be interrupted (lockInterruptibly()
, and an attempt to acquire the lock that can timeout (tryLock(long, TimeUnit)
).
Implementation Considerations
三种形式的锁获取(可中断、不可中断和定时)在性能特征、顺序保证或其他实现质量方面可能有所不同。
此外,中断正在进行的锁获取的功能可能在给定的锁类中不可用。因此,实现不需要为所有三种形式的锁获取定义完全相同的保证或语义,也不需要支持中断正在进行的锁获取。
需要一个实现来清楚地记录每种锁定方法提供的语义和保证。它还必须遵守这个接口中定义的中断语义,只要支持锁获取中断:要么完全支持,要么只支持方法入口。
由于中断通常意味着取消,并且对中断的检查通常不频繁,因此实现更倾向于响应中断而不是正常的方法返回。即使可以显示在另一个操作之后发生的中断可能已经解除了线程阻塞,这也是正确的。实现应该记录这种行为。
The three forms of lock acquisition (interruptible, non-interruptible, and timed) may differ in their performance characteristics, ordering guarantees, or other implementation qualities.
Further, the ability to interrupt the ongoing acquisition of a lock may not be available in a given
Lock
class. Consequently, an implementation is not required to define exactly the same guarantees or semantics for all three forms of lock acquisition, nor is it required to support interruption of an ongoing lock acquisition.An implementation is required to clearly document the semantics and guarantees provided by each of the locking methods. It must also obey the interruption semantics as defined in this interface, to the extent that interruption of lock acquisition is supported: which is either totally, or only on method entry.
As interruption generally implies cancellation, and checks for interruption are often infrequent, an implementation can favor responding to an interrupt over normal method return. This is true even if it can be shown that the interrupt occurred after another action may have unblocked the thread. An implementation should document this behavior.
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
ReentrantLock
Lock的标准实现, 一种可重入互斥锁,具有与使用同步方法和语句访问的隐式监视器锁相同的基本行为和语义,但具有扩展功能。(获取等待队列,当前线程是否获得锁等监控,见 AbstractOwnableSynchronizer
)
ReentrantLock归最后一个成功锁定的线程所有,但还没有解锁它。当锁不属于其他线程时,调用锁的线程将返回并成功获取锁。如果当前线程已经拥有锁,该方法将立即返回。可以使用isHeldByCurrentThread()和getHoldCount()方法检查这一点。
A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.
A ReentrantLock is owned by the thread last successfully locking, but not yet unlocking it. A thread invoking lock will return, successfully acquiring the lock, when the lock is not owned by another thread. The method will return immediately if the current thread already owns the lock. This can be checked using methods isHeldByCurrentThread(), and getHoldCount().
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
}
static final class NonfairSync extends Sync{}
static final class FairSync extends Sync{}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public int getHoldCount() {
return sync.getHoldCount();
}
protected Thread getOwner() {
return sync.getOwner();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
/**
* 获取队列的监控,省略了获取waitting的
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public final int getQueueLength() {
return sync.getQueueLength();
}
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
...
}
ReadWriteLock
读写锁维护一对关联的锁,一个用于只读操作,另一个用于写操作。读锁可以由多个读线程同时持有,只要没有写线程。写锁是独占的。
A ReadWriteLock
maintains a pair of associated locks
, one for read-only operations and one for writing. The read lock
may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock
is exclusive.
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
ReentrantReadWriteLock
可重入读写锁, ReadWriteLock
实现,类似ReentrantLock
.
This class has the following properties:
- Acquisition order 请求顺序(公平、非公平)
- Reentrancy 可重入
- Lock downgrading 锁降级
- 可重入性还允许从写锁降级为读锁,方法是获取写锁,然后是读锁,然后释放写锁。
- Interruption of lock acquisition 可中断锁
- Condition support
- Instrumentation 监控
/*
* This lock supports a maximum of 65535 recursive write locks and 65535 read locks.
*/
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/*
* ReadWriteLock impl
*/
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
/**
* Synchronization implementation for ReentrantReadWriteLock.
* Subclassed into fair and nonfair versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
...
}
static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
...
public static class ReadLock implements Lock, java.io.Serializable {
...
}
public static class WriteLock implements Lock, java.io.Serializable {
...
}
}
Conditions(监视器)
条件将对象监视方法(wait、notify和notifyAll) 分解为不同的对象,通过将它们与任意锁实现相结合,从而实现每个对象具有多个等待集的效果。锁替换同步方法和语句的使用,条件替换对象监视器方法的使用。
Condition
factors out the Object
monitor methods (wait
, notify
and notifyAll
) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock
implementations.
Where a Lock
replaces the use of synchronized
methods and statements,
a Condition
replaces the use of the Object monitor methods.
public interface Condition {
void await() throws InterruptedException;
/*
* Causes the current thread to wait until it is signalled.
*/
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
主要实现 : java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
AbstractQueuedSynchronizers(阻塞队列)
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer类是一个有用的超类,用于定义依赖于排队阻塞线程的锁和其他同步器。
Usage
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState()
, setState(int)
and/or compareAndSetState(int, int)
:
Each of these methods by default throws UnsupportedOperationException
onlyfinal
You may also find the inherited methods from AbstractOwnableSynchronizer
useful to keep track of the thread owning an exclusive synchronizer. You are encouraged to use them – this enables monitoring and diagnostic tools to assist users in determining which threads hold locks.
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
}
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
}
这个类比较关键, 复杂度太高了,涉及的细节太多,另单独分析
AbstractQueuedLongSynchronizer
AbstractQueuedLongSynchronizer类提供了相同的功能,但是扩展了对64位同步状态的支持。
AbstractOwnableSynchronizer
它们都扩展了AbstractOwnableSynchronizer类,这是一个简单的类,帮助记录当前持有独占同步的线程。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* Empty constructor for use by subclasses.
*/
protected AbstractOwnableSynchronizer() { }
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
LockSupports
The LockSupport
class provides lower-level blocking and unblocking support that is useful for those developers implementing their own customized lock classes.
LockSupport类提供了较低级别的阻塞和解除阻塞支持,这对于实现自定义锁类的开发人员非常有用。
Record📝
TimeUnit 为啥不都抽象方法?而是有些需要实现,有些需要抽象?
java.util.concurrent.Semaphore.Sync#nonfairTryAcquireShared 和java.util.concurrent.Semaphore.Sync#reducePermits 为啥需要重复实现?
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#newCondition ?
/** * Throws {@code UnsupportedOperationException} because * {@code ReadLocks} do not support conditions. * * @throws UnsupportedOperationException always */
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 951488791@qq.com