Java多线程

Java多线程

概述

多线程是Java的重要特性之一,允许程序同时执行多个任务,提高程序的执行效率和响应性。本文档详细介绍Java多线程的核心概念、实现方式和最佳实践。

线程基础

进程与线程

  • 进程:操作系统分配资源的基本单位,拥有独立的内存空间
  • 线程:程序执行的最小单位,同一进程内的线程共享内存空间

线程的生命周期

NEW → RUNNABLE → BLOCKED/WAITING/TIMED_WAITING → TERMINATED
  1. NEW:线程创建但未启动
  2. RUNNABLE:线程正在运行或等待CPU调度
  3. BLOCKED:线程被阻塞,等待获取锁
  4. WAITING:线程等待其他线程的通知
  5. TIMED_WAITING:线程等待指定时间
  6. TERMINATED:线程执行完毕

创建线程的方式

1. 继承Thread类

java
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程执行:" + Thread.currentThread().getName());
    }
}

// 使用
MyThread thread = new MyThread();
thread.start();
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程执行:" + Thread.currentThread().getName());
    }
}

// 使用
MyThread thread = new MyThread();
thread.start();

2. 实现Runnable接口

java
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("线程执行:" + Thread.currentThread().getName());
    }
}

// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("线程执行:" + Thread.currentThread().getName());
    }
}

// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();

3. 实现Callable接口

java
public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "任务执行结果";
    }
}

// 使用
FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
String result = futureTask.get(); // 获取结果
public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "任务执行结果";
    }
}

// 使用
FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
String result = futureTask.get(); // 获取结果

4. 使用线程池

java
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(() -> {
    System.out.println("线程池执行任务");
});
executor.shutdown();
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(() -> {
    System.out.println("线程池执行任务");
});
executor.shutdown();

线程同步

synchronized关键字

同步方法

java
public synchronized void synchronizedMethod() {
    // 同步代码
}
public synchronized void synchronizedMethod() {
    // 同步代码
}

同步代码块

java
public void method() {
    synchronized (this) {
        // 同步代码
    }
}
public void method() {
    synchronized (this) {
        // 同步代码
    }
}

静态同步方法

java
public static synchronized void staticSynchronizedMethod() {
    // 同步代码
}
public static synchronized void staticSynchronizedMethod() {
    // 同步代码
}

Lock接口

java
private final ReentrantLock lock = new ReentrantLock();

public void method() {
    lock.lock();
    try {
        // 临界区代码
    } finally {
        lock.unlock();
    }
}
private final ReentrantLock lock = new ReentrantLock();

public void method() {
    lock.lock();
    try {
        // 临界区代码
    } finally {
        lock.unlock();
    }
}

volatile关键字

java
private volatile boolean flag = false;

public void setFlag() {
    flag = true; // 保证可见性
}
private volatile boolean flag = false;

public void setFlag() {
    flag = true; // 保证可见性
}

线程通信

wait()、notify()、notifyAll()

java
public class ProducerConsumer {
    private final Object lock = new Object();
    private boolean hasData = false;
    
    public void produce() {
        synchronized (lock) {
            while (hasData) {
                try {
                    lock.wait(); // 等待消费
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            // 生产数据
            hasData = true;
            lock.notifyAll(); // 通知消费者
        }
    }
    
    public void consume() {
        synchronized (lock) {
            while (!hasData) {
                try {
                    lock.wait(); // 等待生产
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            // 消费数据
            hasData = false;
            lock.notifyAll(); // 通知生产者
        }
    }
}
public class ProducerConsumer {
    private final Object lock = new Object();
    private boolean hasData = false;
    
    public void produce() {
        synchronized (lock) {
            while (hasData) {
                try {
                    lock.wait(); // 等待消费
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            // 生产数据
            hasData = true;
            lock.notifyAll(); // 通知消费者
        }
    }
    
    public void consume() {
        synchronized (lock) {
            while (!hasData) {
                try {
                    lock.wait(); // 等待生产
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            // 消费数据
            hasData = false;
            lock.notifyAll(); // 通知生产者
        }
    }
}

Condition接口

java
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

public void awaitMethod() throws InterruptedException {
    lock.lock();
    try {
        condition.await(); // 等待
    } finally {
        lock.unlock();
    }
}

public void signalMethod() {
    lock.lock();
    try {
        condition.signal(); // 唤醒
    } finally {
        lock.unlock();
    }
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

public void awaitMethod() throws InterruptedException {
    lock.lock();
    try {
        condition.await(); // 等待
    } finally {
        lock.unlock();
    }
}

public void signalMethod() {
    lock.lock();
    try {
        condition.signal(); // 唤醒
    } finally {
        lock.unlock();
    }
}

并发工具类

CountDownLatch

java
CountDownLatch latch = new CountDownLatch(3);

// 工作线程
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        // 执行任务
        latch.countDown(); // 计数减1
    }).start();
}

// 主线程等待
latch.await(); // 等待计数归零
CountDownLatch latch = new CountDownLatch(3);

// 工作线程
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        // 执行任务
        latch.countDown(); // 计数减1
    }).start();
}

// 主线程等待
latch.await(); // 等待计数归零

CyclicBarrier

java
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程都到达屏障点");
});

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            // 执行任务
            barrier.await(); // 等待其他线程
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
}
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程都到达屏障点");
});

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            // 执行任务
            barrier.await(); // 等待其他线程
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
}

Semaphore

java
Semaphore semaphore = new Semaphore(2); // 允许2个线程同时访问

public void accessResource() {
    try {
        semaphore.acquire(); // 获取许可
        // 访问资源
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } finally {
        semaphore.release(); // 释放许可
    }
}
Semaphore semaphore = new Semaphore(2); // 允许2个线程同时访问

public void accessResource() {
    try {
        semaphore.acquire(); // 获取许可
        // 访问资源
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } finally {
        semaphore.release(); // 释放许可
    }
}

线程池

线程池类型

java
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);

自定义线程池

java
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,                      // 核心线程数
    10,                     // 最大线程数
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new LinkedBlockingQueue<>(100), // 工作队列
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,                      // 核心线程数
    10,                     // 最大线程数
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new LinkedBlockingQueue<>(100), // 工作队列
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

并发集合

ConcurrentHashMap

java
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.compute("key", (k, v) -> v == null ? 1 : v + 1);
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.compute("key", (k, v) -> v == null ? 1 : v + 1);

CopyOnWriteArrayList

java
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.add("item2");
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.add("item2");

BlockingQueue

java
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 生产者
queue.put("item"); // 阻塞式添加

// 消费者
String item = queue.take(); // 阻塞式获取
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 生产者
queue.put("item"); // 阻塞式添加

// 消费者
String item = queue.take(); // 阻塞式获取

最佳实践

  1. 优先使用线程池而不是直接创建线程
  2. 合理设置线程池参数,避免资源浪费或不足
  3. 使用并发集合替代同步集合
  4. 避免死锁:按固定顺序获取锁
  5. 正确处理中断:检查中断状态并适当响应
  6. 使用ThreadLocal避免线程间数据共享问题
  7. 选择合适的同步机制:synchronized vs Lock vs 原子类

常见问题

死锁

java
// 避免死锁的方法:按固定顺序获取锁
public void transfer(Account from, Account to, int amount) {
    Account firstLock = from.getId() < to.getId() ? from : to;
    Account secondLock = from.getId() < to.getId() ? to : from;
    
    synchronized (firstLock) {
        synchronized (secondLock) {
            from.withdraw(amount);
            to.deposit(amount);
        }
    }
}
// 避免死锁的方法:按固定顺序获取锁
public void transfer(Account from, Account to, int amount) {
    Account firstLock = from.getId() < to.getId() ? from : to;
    Account secondLock = from.getId() < to.getId() ? to : from;
    
    synchronized (firstLock) {
        synchronized (secondLock) {
            from.withdraw(amount);
            to.deposit(amount);
        }
    }
}

内存可见性

使用volatile、synchronized或Lock确保内存可见性。

原子性

使用原子类(AtomicInteger、AtomicReference等)保证操作的原子性。

总结

Java多线程编程是一个复杂但重要的主题。掌握线程的创建、同步、通信和并发工具类的使用,对于开发高性能的Java应用程序至关重要。在实际开发中,应该根据具体场景选择合适的并发策略和工具。