目录
java.util.concurrent
包是专门用来处理Java并发的工具类,专为多线程环境设计,能帮助简化并发编程中的线程安全问题。除此之外,concurrent
包中还有很多其他非常重要和常用的工具类和机制,可以用来处理不同的并发场景。
在Java并发编程中,**java.util.concurrent
**包提供了丰富的工具类和机制,帮助开发者应对各种并发场景:
- Executor框架简化了线程池的使用。
- 并发集合类如
ConcurrentHashMap
、ConcurrentLinkedQueue
等解决了多线程下的数据共享问题。 - 并发控制类如
CountDownLatch
、Semaphore
等用于线程间的协调和同步。 - 原子类提供了高效的无锁操作。
- 锁机制如
ReentrantLock
、StampedLock
提供了更灵活的锁定机制。 - ForkJoin框架是用于并行处理任务的强大工具。
- CompletableFuture 和 Future为异步编程提供了支持。
下面详细解释其中关键点:
1. Executor框架
Executor
框架是Java提供的一种用来简化线程管理的机制,主要通过线程池来执行任务,避免手动创建和管理线程。通过ExecutorService
接口和ThreadPoolExecutor
类,你可以更高效地管理线程,并控制线程的创建、任务的执行、线程的销毁等。
常见实现:
ThreadPoolExecutor
:一个功能非常强大的线程池类,支持核心线程数、最大线程数、任务队列、线程回收机制、拒绝策略等高度可定制的配置。ScheduledThreadPoolExecutor
:支持定时或周期性执行任务的线程池实现,类似于Timer
,但功能更加强大和灵活。
Executors
工具类:提供了几个常用的线程池工厂方法,比如newFixedThreadPool
(固定大小的线程池)、newCachedThreadPool
(按需创建线程的线程池)和newScheduledThreadPool
(用于定时任务的线程池)。
代码示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExecutorExample {
public static void main(String[] args) {
// 创建一个固定大小为3的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交三个任务到线程池执行
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
System.out.println("Thread: " + Thread.currentThread().getName() + " is executing task.");
try {
Thread.sleep(1000); // 模拟任务执行耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown(); // 关闭线程池
}
}
Executor
框架是 Java 提供的用于简化并发任务管理的工具。通常使用线程池来管理线程资源,比如用 newFixedThreadPool()
创建固定大小的线程池。这样可以控制系统中同时运行的线程数量,避免频繁创建和销毁线程导致的性能开销。在使用线程池时,合理设置线程池的大小非常重要,过大的线程池会增加 CPU 上下文切换的成本,而过小的线程池可能导致任务处理延迟。
2. 并发集合类
ConcurrentHashMap
:已经讨论过的线程安全的哈希表,支持高效的并发读写。ConcurrentHashMap
是一种线程安全的哈希表,在多线程环境下可以高效地进行读写操作。它通过分段锁(Java 7 及之前)或细粒度的CAS操作(Java 8 及之后)实现了比Hashtable
更好的性能。ConcurrentLinkedQueue
:基于无界链表的非阻塞队列,使用CAS实现的高效无锁队列,适用于高并发环境下的任务队列或缓冲区。CopyOnWriteArrayList
:线程安全的ArrayList
,通过在写入操作时创建副本来保证线程安全,适用于读操作远远多于写操作的场景。CopyOnWriteArraySet
:类似CopyOnWriteArrayList
,但用于集合Set
,也是在写操作时创建副本。ConcurrentSkipListMap
和ConcurrentSkipListSet
:线程安全的有序映射和集合,内部基于跳表实现,支持并发访问和排序操作。
ConcurrentHashMap代码示例:
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 插入元素
map.put("key1", 1);
map.put("key2", 2);
// 多线程读写操作
new Thread(() -> map.put("key3", 3)).start();
new Thread(() -> System.out.println("key1 value: " + map.get("key1"))).start();
}
}
ConcurrentHashMap
提供了比 Hashtable
更好的并发性能,因为它在 Java 7 中使用了分段锁,而在 Java 8 中采用了 CAS 和 synchronized
结合的方式进行并发控制。相比传统的 Hashtable
,ConcurrentHashMap
在读操作中几乎是无锁的,而在写操作中也只锁定需要修改的部分结构,因此在高并发场景下性能更好。
BlockingQueue代码示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
// 生产者线程
new Thread(() -> {
try {
queue.put(1);
System.out.println("Produced 1");
queue.put(2);
System.out.println("Produced 2");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
System.out.println("Consumed: " + queue.take());
System.out.println("Consumed: " + queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
BlockingQueue
是 Java 提供的用于线程安全队列的实现,支持线程间的阻塞式通信。它可以用来轻松实现生产者-消费者模式。在生产者生产太快或者消费者消费太快的情况下,BlockingQueue
会自动进行阻塞,保证数据的正确传递和系统的稳定运行。
3. 并发控制类
CountDownLatch
:一个允许一个或多个线程等待其他线程完成操作的同步工具。它是通过维护一个计数器来实现的,当计数器归零时,所有等待的线程都会继续执行。非常适合用于让主线程等待多个子线程的完成。CyclicBarrier
:与CountDownLatch
类似,但CyclicBarrier
允许一组线程互相等待,直到所有线程都到达某个屏障点。可以重复使用,即”循环”的屏障。Semaphore
:一个用于控制对资源访问的计数信号量。它可以用于限制同时访问某一资源的线程数量。常用于实现连接池、限流器等场景。Exchanger
:两个线程可以在某个同步点交换数据。适用于两个线程需要彼此通信或交换数据的场景。Phaser
:类似于CyclicBarrier
和CountDownLatch
,但功能更灵活,支持分阶段的任务同步和动态调整参与线程的数量。
CountDownLatch代码示例:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
// 启动三个线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
System.out.println("Thread finished");
latch.countDown(); // 线程完成工作,计数器减1
}).start();
}
latch.await(); // 等待所有线程完成
System.out.println("All threads finished.");
}
}
CountDownLatch
是一种同步工具,可以让一个或多个线程等待其他线程完成操作。它的实现方式是维护一个计数器,每当一个线程完成操作时,调用 countDown()
使计数器减1,当计数器归零时,主线程或者其他等待线程可以继续执行。它非常适合解决主线程等待多个子线程完成的场景。
多说一嘴,Java 的 CountDownLatch
和 Go 语言中的 sync.WaitGroup
确实非常相似,它们的主要功能都是用来协调多个并发任务,确保主线程或者某个操作等待其他协作线程完成后再继续执行。
相似点:
- 等待多个任务完成:两者都可以用来等待多个并发任务执行完成,然后再继续执行主线程或其他逻辑。
- 计数器机制:两者都维护一个内部计数器,每当一个任务完成时,计数器会减少。当计数器归零时,等待的线程或主线程就会继续执行。
区别:
- 重用性:
CountDownLatch
:不能重用。计数器归零后,CountDownLatch
无法重置,必须重新创建一个新的实例。sync.WaitGroup
:可以重用。WaitGroup
可以在多个任务执行后再次使用,通过Add()
方法增加计数。
- API设计差异:
- Java
CountDownLatch
:通过在主线程调用await()
方法来等待其他线程完成工作,子线程通过countDown()
来通知计数器减1。 - Go
sync.WaitGroup
:通过Add(n)
来增加任务数量,子任务完成时调用Done()
减少计数,主线程调用Wait()
阻塞,直到计数器归零。
- Java
Go sync.WaitGroup
示例:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
threadCount := 3
wg.Add(threadCount) // 增加计数器
for i := 0; i < threadCount; i++ {
go func(id int) {
defer wg.Done() // 任务完成,计数器减1
fmt.Printf("Task %d finished.\n", id)
}(i)
}
wg.Wait() // 阻塞直到计数器归零
fmt.Println("All tasks are finished.")
}
CyclicBarrier代码示例:
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> System.out.println("All threads reached the barrier"));
// 启动三个线程
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " reached the barrier");
barrier.await(); // 等待其他线程到达屏障
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
CyclicBarrier
允许多个线程在某个点等待彼此,直到所有线程都到达这个屏障,然后继续执行后续任务。与 CountDownLatch
不同的是,它可以重用,因此适用于多轮次的任务协作场景。比如模拟并发用户请求,所有用户达到某个条件后再继续执行。
Semaphore代码示例:
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2); // 只允许2个线程同时访问
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " acquired semaphore");
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}
Semaphore
控制同时访问某个资源的线程数,可以用来限制资源的并发访问量,比如连接池。通过 acquire()
和 release()
来获取和释放信号量,它可以保证不会有超过预定义数量的线程同时访问某一资源,避免系统过载。
4. 锁机制
Java 5 引入了 Lock
接口及其实现,用于代替传统的sychronized
关键字,提供更灵活的锁机制。
ReentrantLock
:可重入锁,类似sychronized
,但它允许手动加锁和解锁,提供了更灵活的锁定操作,如手动加锁/解锁、可中断锁等待、尝试加锁、锁的公平性。它支持公平锁和非公平锁两种模式。ReentrantLock
是synchronized
的更好选择。ReentrantReadWriteLock
:读写锁,允许多个读线程并发访问,但写操作是互斥的。适合读多写少的场景,可以显著提高并发性能。StampedLock
:Java 8引入的锁,提供了更加细粒度的控制,支持乐观读锁和悲观读锁。乐观读锁在没有写操作的情况下允许读操作无锁进行,进一步提高了并发性能。
ReentrantLock代码示例:
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
public void performTask() {
lock.lock(); // 手动加锁
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is performing task.");
} finally {
lock.unlock(); // 手动解锁
}
}
public static void main(String[] args) {
ReentrantLockExample example = new ReentrantLockExample();
for (int i = 0; i < 3; i++) {
new Thread(example::performTask).start();
}
}
}
5. 原子操作类
Java 的 原子类(AtomicInteger
、AtomicLong
、AtomicBoolean
、AtomicReference
等)是为了解决并发环境中数据的安全性问题而设计的。它们使用了 CAS(Compare-And-Swap) 操作来保证线程安全,避免了传统锁的开销。原子类适合需要频繁进行自增、自减等操作的高并发场景。
常见的原子类包括:
AtomicInteger
:用于对int
类型的变量进行原子操作。AtomicLong
:用于对long
类型的变量进行原子操作。AtomicBoolean
:用于对boolean
类型的变量进行原子操作。AtomicReference
:用于对引用类型进行原子操作。
代码示例
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private static AtomicInteger atomicCount = new AtomicInteger(0);
public static void main(String[] args) {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
atomicCount.incrementAndGet(); // 原子自增
}
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final count: " + atomicCount.get());
}
}
6. Future 和 CompletableFuture
Future
:是 Java 5 引入的一种机制,用于表示一个异步计算的结果。你可以通过Future
接口查询任务是否完成、获取结果或取消任务。但Future
的主要局限是只能阻塞地获取结果,没有提供回调机制。CompletableFuture
:Java 8引入的更强大的异步任务处理工具,与Future
不同,CompletableFuture
支持链式调用、异步执行回调函数,并且可以通过thenApply()
、thenAccept()
等方法处理任务的结果而不需要显式等待。它适用于处理复杂的异步任务工作流,避免了阻塞操作。与Future
相比,CompletableFuture
更加适合处理复杂的异步工作流。
Future
代码示例:
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000); // 模拟任务耗时
return 42;
});
System.out.println("Task submitted, doing other work...");
Integer result = future.get(); // 阻塞直到任务完成
System.out.println("Task result: " + result);
executor.shutdown();
}
}
CompletableFuture
代码示例:
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟任务耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
return 42;
});
future.thenAccept(result -> System.out.println("Task result: " + result));
System.out.println("Task submitted, doing other work...");
// 为了确保主线程等待异步任务执行完毕
future.join();
}
}
7. ForkJoin框架
ForkJoinPool
:是Java 7引入的一个并行执行框架,适用于将大任务拆分为多个小任务并行处理。内部使用了工作窃取算法,可以最大化地利用CPU的多核资源,非常适合递归任务的并行化处理。RecursiveTask
和RecursiveAction
:与ForkJoinPool
配合使用的任务类,分别用于有返回值和无返回值的任务。
8. ThreadLocal
ThreadLocal
是 Java 提供的一个工具类,用于给每个线程保存独立的变量副本。通常用于在多线程环境中避免共享变量的并发问题。它确保每个线程都有自己的一份变量,其他线程无法访问或修改,从而避免数据不一致问题。适合需要在线程间传递数据、或保证线程安全的场景,比如数据库连接、事务管理、用户上下文等。
代码示例:
public class ThreadLocalExample {
private static ThreadLocal<Integer> threadLocalValue = ThreadLocal.withInitial(() -> 1);
public static void main(String[] args) {
Runnable task = () -> {
System.out.println(Thread.currentThread().getName() + " initial value: " + threadLocalValue.get());
threadLocalValue.set(threadLocalValue.get() + 1); // 修改当前线程的ThreadLocal变量
System.out.println(Thread.currentThread().getName() + " updated value: " + threadLocalValue.get());
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
}
}
输出:
Thread-0 initial value: 1
Thread-1 initial value: 1
Thread-0 updated value: 2
Thread-1 updated value: 2