Java 并发神器:concurrent工具包

Java 并发神器:concurrent工具包

目录

  1. Executor框架
  2. 并发集合类
  3. 并发控制类
  4. 锁机制
  5. 原子操作类
  6. Future 和 CompletableFuture
  7. ForkJoin框架
  8. ThreadLocal
java.util.concurrent 包是专门用来处理Java并发的工具类,专为多线程环境设计,能帮助简化并发编程中的线程安全问题。除此之外,concurrent 包中还有很多其他非常重要和常用的工具类和机制,可以用来处理不同的并发场景。

在Java并发编程中,**java.util.concurrent**包提供了丰富的工具类和机制,帮助开发者应对各种并发场景:

  • Executor框架简化了线程池的使用。
  • 并发集合类ConcurrentHashMapConcurrentLinkedQueue等解决了多线程下的数据共享问题。
  • 并发控制类CountDownLatchSemaphore等用于线程间的协调和同步。
  • 原子类提供了高效的无锁操作。
  • 锁机制ReentrantLockStampedLock提供了更灵活的锁定机制。
  • ForkJoin框架是用于并行处理任务的强大工具。
  • CompletableFutureFuture为异步编程提供了支持。

下面详细解释其中关键点:

1. Executor框架

Executor框架是Java提供的一种用来简化线程管理的机制,主要通过线程池来执行任务,避免手动创建和管理线程。通过ExecutorService接口和ThreadPoolExecutor类,你可以更高效地管理线程,并控制线程的创建、任务的执行、线程的销毁等。

常见实现

  • ThreadPoolExecutor:一个功能非常强大的线程池类,支持核心线程数、最大线程数、任务队列、线程回收机制、拒绝策略等高度可定制的配置。
  • ScheduledThreadPoolExecutor:支持定时或周期性执行任务的线程池实现,类似于Timer,但功能更加强大和灵活。

Executors 工具类:提供了几个常用的线程池工厂方法,比如newFixedThreadPool(固定大小的线程池)、newCachedThreadPool(按需创建线程的线程池)和newScheduledThreadPool(用于定时任务的线程池)。

代码示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorExample {
    public static void main(String[] args) {
        // 创建一个固定大小为3的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 提交三个任务到线程池执行
        for (int i = 0; i < 3; i++) {
            executor.submit(() -> {
                System.out.println("Thread: " + Thread.currentThread().getName() + " is executing task.");
                try {
                    Thread.sleep(1000); // 模拟任务执行耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.shutdown(); // 关闭线程池
    }
}

Executor 框架是 Java 提供的用于简化并发任务管理的工具。通常使用线程池来管理线程资源,比如用 newFixedThreadPool() 创建固定大小的线程池。这样可以控制系统中同时运行的线程数量,避免频繁创建和销毁线程导致的性能开销。在使用线程池时,合理设置线程池的大小非常重要,过大的线程池会增加 CPU 上下文切换的成本,而过小的线程池可能导致任务处理延迟。

2. 并发集合类

  • ConcurrentHashMap:已经讨论过的线程安全的哈希表,支持高效的并发读写。ConcurrentHashMap 是一种线程安全的哈希表,在多线程环境下可以高效地进行读写操作。它通过分段锁(Java 7 及之前)或细粒度的CAS操作(Java 8 及之后)实现了比 Hashtable 更好的性能。
  • ConcurrentLinkedQueue:基于无界链表的非阻塞队列,使用CAS实现的高效无锁队列,适用于高并发环境下的任务队列或缓冲区。
  • CopyOnWriteArrayList:线程安全的ArrayList,通过在写入操作时创建副本来保证线程安全,适用于读操作远远多于写操作的场景。
  • CopyOnWriteArraySet:类似CopyOnWriteArrayList,但用于集合Set,也是在写操作时创建副本。
  • ConcurrentSkipListMapConcurrentSkipListSet:线程安全的有序映射和集合,内部基于跳表实现,支持并发访问和排序操作。
ConcurrentHashMap代码示例:
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 插入元素
        map.put("key1", 1);
        map.put("key2", 2);

        // 多线程读写操作
        new Thread(() -> map.put("key3", 3)).start();
        new Thread(() -> System.out.println("key1 value: " + map.get("key1"))).start();
    }
}

ConcurrentHashMap 提供了比 Hashtable 更好的并发性能,因为它在 Java 7 中使用了分段锁,而在 Java 8 中采用了 CAS 和 synchronized 结合的方式进行并发控制。相比传统的 HashtableConcurrentHashMap 在读操作中几乎是无锁的,而在写操作中也只锁定需要修改的部分结构,因此在高并发场景下性能更好。

BlockingQueue代码示例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);

        // 生产者线程
        new Thread(() -> {
            try {
                queue.put(1);
                System.out.println("Produced 1");
                queue.put(2);
                System.out.println("Produced 2");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        // 消费者线程
        new Thread(() -> {
            try {
                System.out.println("Consumed: " + queue.take());
                System.out.println("Consumed: " + queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

BlockingQueue 是 Java 提供的用于线程安全队列的实现,支持线程间的阻塞式通信。它可以用来轻松实现生产者-消费者模式。在生产者生产太快或者消费者消费太快的情况下,BlockingQueue 会自动进行阻塞,保证数据的正确传递和系统的稳定运行。

3. 并发控制类

  • CountDownLatch:一个允许一个或多个线程等待其他线程完成操作的同步工具。它是通过维护一个计数器来实现的,当计数器归零时,所有等待的线程都会继续执行。非常适合用于让主线程等待多个子线程的完成。
  • CyclicBarrier:与CountDownLatch类似,但CyclicBarrier允许一组线程互相等待,直到所有线程都到达某个屏障点。可以重复使用,即”循环”的屏障。
  • Semaphore:一个用于控制对资源访问的计数信号量。它可以用于限制同时访问某一资源的线程数量。常用于实现连接池、限流器等场景。
  • Exchanger:两个线程可以在某个同步点交换数据。适用于两个线程需要彼此通信或交换数据的场景。
  • Phaser:类似于CyclicBarrierCountDownLatch,但功能更灵活,支持分阶段的任务同步和动态调整参与线程的数量。
CountDownLatch代码示例:
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);

        // 启动三个线程
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                System.out.println("Thread finished");
                latch.countDown(); // 线程完成工作,计数器减1
            }).start();
        }

        latch.await(); // 等待所有线程完成
        System.out.println("All threads finished.");
    }
}

CountDownLatch 是一种同步工具,可以让一个或多个线程等待其他线程完成操作。它的实现方式是维护一个计数器,每当一个线程完成操作时,调用 countDown() 使计数器减1,当计数器归零时,主线程或者其他等待线程可以继续执行。它非常适合解决主线程等待多个子线程完成的场景。
多说一嘴,Java 的 CountDownLatchGo 语言中的 sync.WaitGroup 确实非常相似,它们的主要功能都是用来协调多个并发任务,确保主线程或者某个操作等待其他协作线程完成后再继续执行。
相似点:

  1. 等待多个任务完成:两者都可以用来等待多个并发任务执行完成,然后再继续执行主线程或其他逻辑。
  2. 计数器机制:两者都维护一个内部计数器,每当一个任务完成时,计数器会减少。当计数器归零时,等待的线程或主线程就会继续执行。
区别:
  1. 重用性
    • CountDownLatch:不能重用。计数器归零后,CountDownLatch 无法重置,必须重新创建一个新的实例。
    • sync.WaitGroup:可以重用。WaitGroup 可以在多个任务执行后再次使用,通过 Add() 方法增加计数。
  2. API设计差异
    • Java CountDownLatch:通过在主线程调用 await() 方法来等待其他线程完成工作,子线程通过 countDown() 来通知计数器减1。
    • Go sync.WaitGroup:通过 Add(n) 来增加任务数量,子任务完成时调用 Done() 减少计数,主线程调用 Wait() 阻塞,直到计数器归零。

Go sync.WaitGroup 示例:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    threadCount := 3

    wg.Add(threadCount) // 增加计数器

    for i := 0; i < threadCount; i++ {
        go func(id int) {
            defer wg.Done() // 任务完成,计数器减1
            fmt.Printf("Task %d finished.\n", id)
        }(i)
    }

    wg.Wait() // 阻塞直到计数器归零
    fmt.Println("All tasks are finished.")
}

CyclicBarrier代码示例:
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> System.out.println("All threads reached the barrier"));

        // 启动三个线程
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " reached the barrier");
                    barrier.await(); // 等待其他线程到达屏障
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

CyclicBarrier 允许多个线程在某个点等待彼此,直到所有线程都到达这个屏障,然后继续执行后续任务。与 CountDownLatch 不同的是,它可以重用,因此适用于多轮次的任务协作场景。比如模拟并发用户请求,所有用户达到某个条件后再继续执行。

Semaphore代码示例:
import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2); // 只允许2个线程同时访问

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println(Thread.currentThread().getName() + " acquired semaphore");
                    Thread.sleep(1000); // 模拟任务执行
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
}

Semaphore 控制同时访问某个资源的线程数,可以用来限制资源的并发访问量,比如连接池。通过 acquire()release() 来获取和释放信号量,它可以保证不会有超过预定义数量的线程同时访问某一资源,避免系统过载。

4. 锁机制

Java 5 引入了 Lock 接口及其实现,用于代替传统的sychronized关键字,提供更灵活的锁机制。

  • ReentrantLock:可重入锁,类似sychronized,但它允许手动加锁和解锁,提供了更灵活的锁定操作,如手动加锁/解锁、可中断锁等待、尝试加锁、锁的公平性。它支持公平锁和非公平锁两种模式。ReentrantLocksynchronized 的更好选择。
  • ReentrantReadWriteLock:读写锁,允许多个读线程并发访问,但写操作是互斥的。适合读多写少的场景,可以显著提高并发性能。
  • StampedLock:Java 8引入的锁,提供了更加细粒度的控制,支持乐观读锁和悲观读锁。乐观读锁在没有写操作的情况下允许读操作无锁进行,进一步提高了并发性能。
ReentrantLock代码示例:
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private final ReentrantLock lock = new ReentrantLock();

    public void performTask() {
        lock.lock(); // 手动加锁
        try {
            System.out.println("Thread " + Thread.currentThread().getName() + " is performing task.");
        } finally {
            lock.unlock(); // 手动解锁
        }
    }

    public static void main(String[] args) {
        ReentrantLockExample example = new ReentrantLockExample();
        for (int i = 0; i < 3; i++) {
            new Thread(example::performTask).start();
        }
    }
}

5. 原子操作类

Java 的 原子类AtomicIntegerAtomicLongAtomicBooleanAtomicReference 等)是为了解决并发环境中数据的安全性问题而设计的。它们使用了 CAS(Compare-And-Swap) 操作来保证线程安全,避免了传统锁的开销。原子类适合需要频繁进行自增、自减等操作的高并发场景。

常见的原子类包括:

  • AtomicInteger:用于对 int 类型的变量进行原子操作。
  • AtomicLong:用于对 long 类型的变量进行原子操作。
  • AtomicBoolean:用于对 boolean 类型的变量进行原子操作。
  • AtomicReference:用于对引用类型进行原子操作。

代码示例

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
    private static AtomicInteger atomicCount = new AtomicInteger(0);

    public static void main(String[] args) {
        Runnable task = () -> {
            for (int i = 0; i < 1000; i++) {
                atomicCount.incrementAndGet(); // 原子自增
            }
        };

        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        System.out.println("Final count: " + atomicCount.get());
    }
}

6. Future 和 CompletableFuture

  • Future:是 Java 5 引入的一种机制,用于表示一个异步计算的结果。你可以通过 Future 接口查询任务是否完成、获取结果或取消任务。但 Future 的主要局限是只能阻塞地获取结果,没有提供回调机制。
  • CompletableFuture:Java 8引入的更强大的异步任务处理工具,与 Future 不同,CompletableFuture 支持链式调用、异步执行回调函数,并且可以通过 thenApply()thenAccept() 等方法处理任务的结果而不需要显式等待。它适用于处理复杂的异步任务工作流,避免了阻塞操作。与 Future 相比,CompletableFuture 更加适合处理复杂的异步工作流。
Future 代码示例:
import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> future = executor.submit(() -> {
            Thread.sleep(1000); // 模拟任务耗时
            return 42;
        });

        System.out.println("Task submitted, doing other work...");
        Integer result = future.get(); // 阻塞直到任务完成
        System.out.println("Task result: " + result);

        executor.shutdown();
    }
}
CompletableFuture 代码示例:
import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000); // 模拟任务耗时
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 42;
        });

        future.thenAccept(result -> System.out.println("Task result: " + result));
        System.out.println("Task submitted, doing other work...");

        // 为了确保主线程等待异步任务执行完毕
        future.join();
    }
}

7. ForkJoin框架

  • ForkJoinPool:是Java 7引入的一个并行执行框架,适用于将大任务拆分为多个小任务并行处理。内部使用了工作窃取算法,可以最大化地利用CPU的多核资源,非常适合递归任务的并行化处理。
  • RecursiveTaskRecursiveAction:与ForkJoinPool配合使用的任务类,分别用于有返回值和无返回值的任务。

8. ThreadLocal

  • ThreadLocal 是 Java 提供的一个工具类,用于给每个线程保存独立的变量副本。通常用于在多线程环境中避免共享变量的并发问题。它确保每个线程都有自己的一份变量,其他线程无法访问或修改,从而避免数据不一致问题。适合需要在线程间传递数据、或保证线程安全的场景,比如数据库连接、事务管理、用户上下文等。

代码示例:

public class ThreadLocalExample {
    private static ThreadLocal<Integer> threadLocalValue = ThreadLocal.withInitial(() -> 1);

    public static void main(String[] args) {
        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName() + " initial value: " + threadLocalValue.get());
            threadLocalValue.set(threadLocalValue.get() + 1); // 修改当前线程的ThreadLocal变量
            System.out.println(Thread.currentThread().getName() + " updated value: " + threadLocalValue.get());
        };

        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);

        thread1.start();
        thread2.start();
    }
}

输出:

Thread-0 initial value: 1
Thread-1 initial value: 1
Thread-0 updated value: 2
Thread-1 updated value: 2

Comments

No comments yet. Why don’t you start the discussion?

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注