CompletableFuture 用法全解

吴书松
吴书松
发布于 2026-01-26 / 1 阅读
0
0

CompletableFuture 用法全解

1. 基础创建方法

1.1 创建已完成的 Future

java

// 创建一个已完成的CompletableFuture
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello World");
System.out.println(completedFuture.get()); // 输出: Hello World

1.2 创建异步任务

java

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

// 无返回值的异步任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(1000);
        System.out.println("异步任务执行完成");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

// 有返回值的异步任务
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "任务结果";
});

System.out.println(future2.get()); // 输出: 任务结果

2. 回调方法

2.1 thenApply - 转换结果

java

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")
    .thenApply(String::toUpperCase);

System.out.println(future.get()); // 输出: HELLO WORLD

2.2 thenAccept - 消费结果

java

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(result -> {
        System.out.println("消费结果: " + result);
    });

future.get(); // 输出: 消费结果: Hello

2.3 thenRun - 不依赖结果执行操作

java

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> {
        System.out.println("前一个阶段已完成,但不关心结果");
    });

future.get();

2.4 whenComplete - 完成时处理(含异常)

java

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("随机异常");
    }
    return "成功";
}).whenComplete((result, ex) -> {
    if (ex != null) {
        System.out.println("发生异常: " + ex.getMessage());
    } else {
        System.out.println("正常完成: " + result);
    }
});

3. 异常处理

3.1 exceptionally - 异常恢复

java

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("出错了!");
}).exceptionally(ex -> {
    System.out.println("异常恢复: " + ex.getMessage());
    return "默认值";
});

System.out.println(future.get()); // 输出: 默认值

3.2 handle - 同时处理结果和异常

java

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        return "成功";
    } else {
        throw new RuntimeException("失败");
    }
}).handle((result, ex) -> {
    if (ex != null) {
        return "异常处理: " + ex.getMessage();
    }
    return "结果: " + result;
});

System.out.println(future.get());

4. 组合多个 Future

4.1 thenCompose - 串联两个 Future

java

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

System.out.println(future.get()); // 输出: Hello World

4.2 thenCombine - 合并两个独立 Future

java

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.get()); // 输出: Hello World

4.3 allOf - 等待所有完成

java

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    return "任务2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleep(1500);
    return "任务3";
});

CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
all.thenRun(() -> {
    try {
        System.out.println("所有任务完成: " + future1.get() + ", " + future2.get() + ", " + future3.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}).get();

4.4 anyOf - 任意一个完成

java

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    return "慢任务";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(500);
    return "快任务";
});

CompletableFuture<Object> any = CompletableFuture.anyOf(future1, future2);
System.out.println("第一个完成: " + any.get()); // 输出: 快任务

5. 异步回调

5.1 使用 Async 后缀方法

java

// 异步执行回调
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
    .thenApplyAsync(s -> {
        sleep(1000);
        return s + " World";
    })
    .thenApplyAsync(String::toUpperCase);

System.out.println(future.get()); // 输出: HELLO WORLD

6. 超时控制

6.1 使用 completeOnTimeout

java

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    sleep(3000); // 模拟耗时操作
    return "结果";
}).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);

System.out.println(future.get()); // 2秒后输出: 超时默认值

7. 完整示例

java

import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureDemo {
    
    public static void main(String[] args) throws Exception {
        demo1();
        demo2();
        demo3();
        demo4();
    }
    
    // 示例1: 基本使用
    static void demo1() throws Exception {
        System.out.println("\n=== 示例1: 基本使用 ===");
        
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开始执行任务...");
            sleep(1000);
            return "任务完成";
        });
        
        future.thenAccept(result -> {
            System.out.println("收到结果: " + result);
        });
        
        future.get();
    }
    
    // 示例2: 异常处理
    static void demo2() throws Exception {
        System.out.println("\n=== 示例2: 异常处理 ===");
        
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            int num = 10 / 0; // 故意制造异常
            return num;
        }).exceptionally(ex -> {
            System.out.println("捕获异常: " + ex.getMessage());
            return 0; // 返回默认值
        });
        
        System.out.println("结果: " + future.get());
    }
    
    // 示例3: 组合多个Future
    static void demo3() throws Exception {
        System.out.println("\n=== 示例3: 组合多个Future ===");
        
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
            sleep(800);
            return "用户数据";
        });
        
        CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
            sleep(500);
            return "订单数据";
        });
        
        userFuture.thenCombine(orderFuture, (user, order) -> {
            return "组合结果: " + user + " + " + order;
        }).thenAccept(System.out::println);
        
        sleep(1000);
    }
    
    // 示例4: 并行处理
    static void demo4() throws Exception {
        System.out.println("\n=== 示例4: 并行处理 ===");
        
        long start = System.currentTimeMillis();
        
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "任务1完成";
        });
        
        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            sleep(1200);
            return "任务2完成";
        });
        
        CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
            sleep(800);
            return "任务3完成";
        });
        
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
        allTasks.thenRun(() -> {
            try {
                System.out.println(task1.get());
                System.out.println(task2.get());
                System.out.println(task3.get());
                long end = System.currentTimeMillis();
                System.out.println("总耗时: " + (end - start) + "ms");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        
        allTasks.get();
    }
    
    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

8. 线程池配置

java

import java.util.concurrent.*;

public class ThreadPoolDemo {
    
    public static void main(String[] args) throws Exception {
        // 创建自定义线程池
        ExecutorService customExecutor = Executors.newFixedThreadPool(3);
        
        // 使用自定义线程池执行异步任务
        CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("线程: " + Thread.currentThread().getName());
                return "任务1";
            }, customExecutor);
        
        CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("线程: " + Thread.currentThread().getName());
                return "任务2";
            }, customExecutor);
        
        // 等待完成
        CompletableFuture.allOf(future1, future2).join();
        
        // 关闭线程池
        customExecutor.shutdown();
    }
}

主要特点总结:

  1. 异步编程:非阻塞式异步编程模型

  2. 链式调用:支持函数式编程风格

  3. 组合能力:强大的多Future组合能力

  4. 异常处理:完善的异常处理机制

  5. 线程池支持:可配置自定义线程池

  6. 超时控制:支持超时处理

CompletableFuture 是 Java 8 引入的强大异步编程工具,合理使用可以大幅提升程序性能和响应速度。


评论