1. 基础创建方法
1.1 创建已完成的 Future
java
// 创建一个已完成的CompletableFuture
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello World");
System.out.println(completedFuture.get()); // 输出: Hello World1.2 创建异步任务
java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
// 无返回值的异步任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("异步任务执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 有返回值的异步任务
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务结果";
});
System.out.println(future2.get()); // 输出: 任务结果2. 回调方法
2.1 thenApply - 转换结果
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
System.out.println(future.get()); // 输出: HELLO WORLD2.2 thenAccept - 消费结果
java
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(result -> {
System.out.println("消费结果: " + result);
});
future.get(); // 输出: 消费结果: Hello2.3 thenRun - 不依赖结果执行操作
java
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> {
System.out.println("前一个阶段已完成,但不关心结果");
});
future.get();2.4 whenComplete - 完成时处理(含异常)
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功";
}).whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("发生异常: " + ex.getMessage());
} else {
System.out.println("正常完成: " + result);
}
});3. 异常处理
3.1 exceptionally - 异常恢复
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("出错了!");
}).exceptionally(ex -> {
System.out.println("异常恢复: " + ex.getMessage());
return "默认值";
});
System.out.println(future.get()); // 输出: 默认值3.2 handle - 同时处理结果和异常
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
return "成功";
} else {
throw new RuntimeException("失败");
}
}).handle((result, ex) -> {
if (ex != null) {
return "异常处理: " + ex.getMessage();
}
return "结果: " + result;
});
System.out.println(future.get());4. 组合多个 Future
4.1 thenCompose - 串联两个 Future
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
System.out.println(future.get()); // 输出: Hello World4.2 thenCombine - 合并两个独立 Future
java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
System.out.println(combined.get()); // 输出: Hello World4.3 allOf - 等待所有完成
java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "任务2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "任务3";
});
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
all.thenRun(() -> {
try {
System.out.println("所有任务完成: " + future1.get() + ", " + future2.get() + ", " + future3.get());
} catch (Exception e) {
e.printStackTrace();
}
}).get();4.4 anyOf - 任意一个完成
java
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "慢任务";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "快任务";
});
CompletableFuture<Object> any = CompletableFuture.anyOf(future1, future2);
System.out.println("第一个完成: " + any.get()); // 输出: 快任务5. 异步回调
5.1 使用 Async 后缀方法
java
// 异步执行回调
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(s -> {
sleep(1000);
return s + " World";
})
.thenApplyAsync(String::toUpperCase);
System.out.println(future.get()); // 输出: HELLO WORLD6. 超时控制
6.1 使用 completeOnTimeout
java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(3000); // 模拟耗时操作
return "结果";
}).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
System.out.println(future.get()); // 2秒后输出: 超时默认值7. 完整示例
java
import java.util.concurrent.*;
import java.util.function.Supplier;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
demo1();
demo2();
demo3();
demo4();
}
// 示例1: 基本使用
static void demo1() throws Exception {
System.out.println("\n=== 示例1: 基本使用 ===");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行任务...");
sleep(1000);
return "任务完成";
});
future.thenAccept(result -> {
System.out.println("收到结果: " + result);
});
future.get();
}
// 示例2: 异常处理
static void demo2() throws Exception {
System.out.println("\n=== 示例2: 异常处理 ===");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int num = 10 / 0; // 故意制造异常
return num;
}).exceptionally(ex -> {
System.out.println("捕获异常: " + ex.getMessage());
return 0; // 返回默认值
});
System.out.println("结果: " + future.get());
}
// 示例3: 组合多个Future
static void demo3() throws Exception {
System.out.println("\n=== 示例3: 组合多个Future ===");
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
sleep(800);
return "用户数据";
});
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "订单数据";
});
userFuture.thenCombine(orderFuture, (user, order) -> {
return "组合结果: " + user + " + " + order;
}).thenAccept(System.out::println);
sleep(1000);
}
// 示例4: 并行处理
static void demo4() throws Exception {
System.out.println("\n=== 示例4: 并行处理 ===");
long start = System.currentTimeMillis();
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "任务1完成";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
sleep(1200);
return "任务2完成";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
sleep(800);
return "任务3完成";
});
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
allTasks.thenRun(() -> {
try {
System.out.println(task1.get());
System.out.println(task2.get());
System.out.println(task3.get());
long end = System.currentTimeMillis();
System.out.println("总耗时: " + (end - start) + "ms");
} catch (Exception e) {
e.printStackTrace();
}
});
allTasks.get();
}
static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}8. 线程池配置
java
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) throws Exception {
// 创建自定义线程池
ExecutorService customExecutor = Executors.newFixedThreadPool(3);
// 使用自定义线程池执行异步任务
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> {
System.out.println("线程: " + Thread.currentThread().getName());
return "任务1";
}, customExecutor);
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
System.out.println("线程: " + Thread.currentThread().getName());
return "任务2";
}, customExecutor);
// 等待完成
CompletableFuture.allOf(future1, future2).join();
// 关闭线程池
customExecutor.shutdown();
}
}主要特点总结:
异步编程:非阻塞式异步编程模型
链式调用:支持函数式编程风格
组合能力:强大的多Future组合能力
异常处理:完善的异常处理机制
线程池支持:可配置自定义线程池
超时控制:支持超时处理
CompletableFuture 是 Java 8 引入的强大异步编程工具,合理使用可以大幅提升程序性能和响应速度。