Skip to content

並行処理(Concurrent API、Fork/Join、ExecutorService)

並行処理(Concurrent API、Fork/Join、ExecutorService)

Section titled “並行処理(Concurrent API、Fork/Join、ExecutorService)”

Javaの並行処理機能について、java.util.concurrentパッケージを中心に詳しく解説します。

現代のアプリケーションが直面する課題

Section titled “現代のアプリケーションが直面する課題”

現代のアプリケーションは、以下のような課題に直面しています:

1. パフォーマンスの要求

// シーケンシャル処理: 1000件のデータを順次処理
public void processSequentially(List<Data> dataList) {
long start = System.currentTimeMillis();
for (Data data : dataList) {
processData(data); // 各処理に100msかかると仮定
}
long duration = System.currentTimeMillis() - start;
System.out.println("Sequential: " + duration + "ms");
// 1000件 × 100ms = 100秒
}
// 並行処理: 10スレッドで並列処理
public void processConcurrently(List<Data> dataList) {
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(10);
List<Future<?>> futures = new ArrayList<>();
for (Data data : dataList) {
futures.add(executor.submit(() -> processData(data)));
}
// すべてのタスクの完了を待機
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
long duration = System.currentTimeMillis() - start;
System.out.println("Concurrent: " + duration + "ms");
// 1000件 ÷ 10スレッド × 100ms ≈ 10秒(約10倍高速化)
}

2. リソースの有効活用

// 問題: CPUがアイドル状態のまま、I/O待機で時間を無駄にしている
public void inefficientProcessing() {
// データベースからデータを取得(I/O待機: 100ms)
List<User> users = userRepository.findAll();
// CPU処理(10ms)
for (User user : users) {
processUser(user);
}
// 外部API呼び出し(I/O待機: 200ms)
sendNotification(users);
// 合計: 310ms(CPUは10msしか使用していない)
}
// 解決: I/O待機中に他の処理を実行
public void efficientProcessing() {
ExecutorService executor = Executors.newFixedThreadPool(5);
// データ取得と処理を並行実行
CompletableFuture<List<User>> usersFuture =
CompletableFuture.supplyAsync(() -> userRepository.findAll(), executor);
CompletableFuture<Void> processFuture = usersFuture.thenAcceptAsync(users -> {
for (User user : users) {
processUser(user);
}
}, executor);
CompletableFuture<Void> notifyFuture = usersFuture.thenAcceptAsync(users -> {
sendNotification(users);
}, executor);
// すべての処理の完了を待機
CompletableFuture.allOf(processFuture, notifyFuture).join();
executor.shutdown();
// I/O待機中にCPU処理を実行できる(効率的)
}

3. レスポンス性の向上

// 問題: 1つのリクエストが他のリクエストをブロック
public class BlockingServer {
public void handleRequest(Request request) {
// 重い処理(5秒かかる)
processHeavyTask(request);
// この間、他のリクエストは待機
}
}
// 解決: 非同期処理でレスポンス性を向上
public class NonBlockingServer {
private ExecutorService executor = Executors.newFixedThreadPool(10);
public CompletableFuture<Response> handleRequest(Request request) {
// 非同期で処理を開始
return CompletableFuture.supplyAsync(() -> {
return processHeavyTask(request);
}, executor);
// すぐにレスポンスを返せる(他のリクエストも処理可能)
}
}

1. スループットの向上

複数のタスクを同時に実行することで、全体の処理時間を短縮できます。

2. リソース利用率の向上

I/O待機中にCPUを他のタスクに使用できます。

3. レスポンス性の向上

ユーザーインターフェースがブロックされず、応答性が向上します。

4. スケーラビリティ

マルチコアプロセッサを有効活用できます。

スレッドは、プログラム内の独立した実行単位です。1つのプロセス内で複数のスレッドが同時に実行できます。

なぜスレッドが必要か:

// シングルスレッド: 1つのタスクが完了するまで次のタスクを実行できない
public void singleThreaded() {
downloadFile("file1.txt"); // 5秒
processFile("file1.txt"); // 2秒
downloadFile("file2.txt"); // 5秒
processFile("file2.txt"); // 2秒
// 合計: 14秒
}
// マルチスレッド: 複数のタスクを並行実行
public void multiThreaded() {
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
downloadFile("file1.txt");
processFile("file1.txt");
}, executor);
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
downloadFile("file2.txt");
processFile("file2.txt");
}, executor);
CompletableFuture.allOf(task1, task2).join();
executor.shutdown();
// 合計: 約7秒(並行実行により約半分の時間)
}
// 方法1: Threadクラスを継承
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running: " + Thread.currentThread().getName());
}
}
MyThread thread = new MyThread();
thread.start();
// 方法2: Runnableインターフェースを実装
Runnable runnable = () -> {
System.out.println("Thread running: " + Thread.currentThread().getName());
};
Thread thread2 = new Thread(runnable);
thread2.start();
// 方法3: ラムダ式を使用
Thread thread3 = new Thread(() -> {
System.out.println("Thread running: " + Thread.currentThread().getName());
});
thread3.start();

ExecutorServiceは、スレッドプールを管理し、タスクの実行を効率化します。

// 固定サイズのスレッドプールを作成
ExecutorService executor = Executors.newFixedThreadPool(10);
// タスクを実行
executor.submit(() -> {
System.out.println("Task executed: " + Thread.currentThread().getName());
});
// 複数のタスクを実行
for (int i = 0; i < 100; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executed");
});
}
// スレッドプールをシャットダウン
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
// 1. 固定サイズのスレッドプール
ExecutorService fixedPool = Executors.newFixedThreadPool(10);
// 2. キャッシュされたスレッドプール(必要に応じてスレッドを作成)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 3. 単一スレッドのエグゼキューター(順次実行)
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 4. スケジュール付きエグゼキューター
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
// 遅延実行
scheduledPool.schedule(() -> {
System.out.println("Delayed task");
}, 5, TimeUnit.SECONDS);
// 定期実行
scheduledPool.scheduleAtFixedRate(() -> {
System.out.println("Periodic task");
}, 0, 1, TimeUnit.SECONDS);
ExecutorService executor = Executors.newFixedThreadPool(10);
// Callable: 値を返すタスク
Callable<Integer> task = () -> {
Thread.sleep(1000);
return 42;
};
// Future: 非同期処理の結果を取得
Future<Integer> future = executor.submit(task);
try {
// 結果を取得(ブロッキング)
Integer result = future.get();
System.out.println("Result: " + result);
// タイムアウト付きで結果を取得
Integer result2 = future.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
// 複数のFutureを処理
List<Callable<Integer>> tasks = Arrays.asList(
() -> 1,
() -> 2,
() -> 3
);
List<Future<Integer>> futures = executor.invokeAll(tasks);
for (Future<Integer> f : futures) {
System.out.println(f.get());
}

CompletableFutureは、非同期処理をより柔軟に扱えるクラスです。

// 非同期処理の作成
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
// 結果の処理
future.thenApply(s -> s + " World")
.thenAccept(System.out::println);
// エラーハンドリング
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Error");
}
return "Success";
});
future2.handle((result, error) -> {
if (error != null) {
System.out.println("Error: " + error.getMessage());
return "Default";
}
return result;
});
// 複数のFutureを結合
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2, (a, b) -> a + " " + b);
System.out.println(combined.get()); // "Hello World"
// すべてのFutureが完了するまで待機
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
allOf.thenRun(() -> System.out.println("All tasks completed"));
public class Counter {
private int count = 0;
// synchronizedメソッド
public synchronized void increment() {
count++;
}
// synchronizedブロック
public void increment2() {
synchronized (this) {
count++;
}
}
public synchronized int getCount() {
return count;
}
}
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int count = 0;
private ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
// tryLock: ロックを取得できるか試行
public boolean tryIncrement() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
}
CountDownLatch latch = new CountDownLatch(3);
// 3つのタスクを実行
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
try {
// タスクの処理
Thread.sleep(1000);
} finally {
latch.countDown(); // カウントダウン
}
});
}
// すべてのタスクが完了するまで待機
latch.await();
System.out.println("All tasks completed");
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached the barrier");
});
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
try {
// 処理
Thread.sleep(1000);
barrier.await(); // バリアに到達
// バリア通過後の処理
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
Semaphore semaphore = new Semaphore(3); // 3つの許可
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
try {
semaphore.acquire(); // 許可を取得
// リソースを使用
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 許可を解放
}
});
}
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// スレッドセーフな操作
map.put("Alice", 25);
map.put("Bob", 30);
// アトミック操作
map.computeIfAbsent("Charlie", k -> 20);
map.computeIfPresent("Alice", (k, v) -> v + 1);
// 並行処理
map.forEach(2, (key, value) -> {
System.out.println(key + ": " + value);
});
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// プロデューサー
executor.submit(() -> {
try {
queue.put("Item 1");
queue.put("Item 2");
queue.put("Item 3");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// コンシューマー
executor.submit(() -> {
try {
while (true) {
String item = queue.take(); // ブロッキング
System.out.println("Consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

大きなタスクを小さなタスクに分割して並列処理するフレームワークです。

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000;
private int[] array;
private int start;
private int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
// しきい値以下なら直接計算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// タスクを分割
int mid = start + length / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 左のタスクを非同期実行
long rightResult = right.compute(); // 右のタスクを実行
long leftResult = left.join(); // 左のタスクの結果を取得
return leftResult + rightResult;
}
}
}
// 使用例
int[] array = new int[10000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
Long result = pool.invoke(task);
System.out.println("Sum: " + result);
public class DataProcessor {
private ExecutorService executor = Executors.newFixedThreadPool(10);
public CompletableFuture<List<ProcessedData>> processData(List<RawData> rawDataList) {
List<CompletableFuture<ProcessedData>> futures = rawDataList.stream()
.map(data -> CompletableFuture.supplyAsync(() -> process(data), executor))
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
private ProcessedData process(RawData data) {
// データ処理
return new ProcessedData(data);
}
}

並行処理のポイント:

  • ExecutorService: スレッドプールの管理
  • Future/Callable: 非同期処理の結果取得
  • CompletableFuture: 柔軟な非同期処理
  • 同期機構: synchronized、ReentrantLock、CountDownLatch、CyclicBarrier、Semaphore
  • 並行コレクション: ConcurrentHashMap、BlockingQueue
  • Fork/Join: 大きなタスクの並列処理

適切な並行処理の実装により、パフォーマンスを向上させられます。