並行処理(Concurrent API、Fork/Join、ExecutorService)
並行処理(Concurrent API、Fork/Join、ExecutorService)
Section titled “並行処理(Concurrent API、Fork/Join、ExecutorService)”Javaの並行処理機能について、java.util.concurrentパッケージを中心に詳しく解説します。
なぜ並行処理が必要なのか
Section titled “なぜ並行処理が必要なのか”現代のアプリケーションが直面する課題
Section titled “現代のアプリケーションが直面する課題”現代のアプリケーションは、以下のような課題に直面しています:
1. パフォーマンスの要求
// シーケンシャル処理: 1000件のデータを順次処理public void processSequentially(List<Data> dataList) { long start = System.currentTimeMillis();
for (Data data : dataList) { processData(data); // 各処理に100msかかると仮定 }
long duration = System.currentTimeMillis() - start; System.out.println("Sequential: " + duration + "ms"); // 1000件 × 100ms = 100秒}
// 並行処理: 10スレッドで並列処理public void processConcurrently(List<Data> dataList) { long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(10); List<Future<?>> futures = new ArrayList<>();
for (Data data : dataList) { futures.add(executor.submit(() -> processData(data))); }
// すべてのタスクの完了を待機 for (Future<?> future : futures) { try { future.get(); } catch (Exception e) { e.printStackTrace(); } }
executor.shutdown(); long duration = System.currentTimeMillis() - start; System.out.println("Concurrent: " + duration + "ms"); // 1000件 ÷ 10スレッド × 100ms ≈ 10秒(約10倍高速化)}2. リソースの有効活用
// 問題: CPUがアイドル状態のまま、I/O待機で時間を無駄にしているpublic void inefficientProcessing() { // データベースからデータを取得(I/O待機: 100ms) List<User> users = userRepository.findAll();
// CPU処理(10ms) for (User user : users) { processUser(user); }
// 外部API呼び出し(I/O待機: 200ms) sendNotification(users);
// 合計: 310ms(CPUは10msしか使用していない)}
// 解決: I/O待機中に他の処理を実行public void efficientProcessing() { ExecutorService executor = Executors.newFixedThreadPool(5);
// データ取得と処理を並行実行 CompletableFuture<List<User>> usersFuture = CompletableFuture.supplyAsync(() -> userRepository.findAll(), executor);
CompletableFuture<Void> processFuture = usersFuture.thenAcceptAsync(users -> { for (User user : users) { processUser(user); } }, executor);
CompletableFuture<Void> notifyFuture = usersFuture.thenAcceptAsync(users -> { sendNotification(users); }, executor);
// すべての処理の完了を待機 CompletableFuture.allOf(processFuture, notifyFuture).join();
executor.shutdown(); // I/O待機中にCPU処理を実行できる(効率的)}3. レスポンス性の向上
// 問題: 1つのリクエストが他のリクエストをブロックpublic class BlockingServer { public void handleRequest(Request request) { // 重い処理(5秒かかる) processHeavyTask(request);
// この間、他のリクエストは待機 }}
// 解決: 非同期処理でレスポンス性を向上public class NonBlockingServer { private ExecutorService executor = Executors.newFixedThreadPool(10);
public CompletableFuture<Response> handleRequest(Request request) { // 非同期で処理を開始 return CompletableFuture.supplyAsync(() -> { return processHeavyTask(request); }, executor);
// すぐにレスポンスを返せる(他のリクエストも処理可能) }}並行処理の本質的な価値
Section titled “並行処理の本質的な価値”1. スループットの向上
複数のタスクを同時に実行することで、全体の処理時間を短縮できます。
2. リソース利用率の向上
I/O待機中にCPUを他のタスクに使用できます。
3. レスポンス性の向上
ユーザーインターフェースがブロックされず、応答性が向上します。
4. スケーラビリティ
マルチコアプロセッサを有効活用できます。
スレッドの基本
Section titled “スレッドの基本”スレッドとは
Section titled “スレッドとは”スレッドは、プログラム内の独立した実行単位です。1つのプロセス内で複数のスレッドが同時に実行できます。
なぜスレッドが必要か:
// シングルスレッド: 1つのタスクが完了するまで次のタスクを実行できないpublic void singleThreaded() { downloadFile("file1.txt"); // 5秒 processFile("file1.txt"); // 2秒 downloadFile("file2.txt"); // 5秒 processFile("file2.txt"); // 2秒 // 合計: 14秒}
// マルチスレッド: 複数のタスクを並行実行public void multiThreaded() { ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> { downloadFile("file1.txt"); processFile("file1.txt"); }, executor);
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> { downloadFile("file2.txt"); processFile("file2.txt"); }, executor);
CompletableFuture.allOf(task1, task2).join(); executor.shutdown(); // 合計: 約7秒(並行実行により約半分の時間)}スレッドの作成
Section titled “スレッドの作成”// 方法1: Threadクラスを継承class MyThread extends Thread { @Override public void run() { System.out.println("Thread running: " + Thread.currentThread().getName()); }}
MyThread thread = new MyThread();thread.start();
// 方法2: Runnableインターフェースを実装Runnable runnable = () -> { System.out.println("Thread running: " + Thread.currentThread().getName());};
Thread thread2 = new Thread(runnable);thread2.start();
// 方法3: ラムダ式を使用Thread thread3 = new Thread(() -> { System.out.println("Thread running: " + Thread.currentThread().getName());});thread3.start();ExecutorService
Section titled “ExecutorService”ExecutorServiceは、スレッドプールを管理し、タスクの実行を効率化します。
基本的な使い方
Section titled “基本的な使い方”// 固定サイズのスレッドプールを作成ExecutorService executor = Executors.newFixedThreadPool(10);
// タスクを実行executor.submit(() -> { System.out.println("Task executed: " + Thread.currentThread().getName());});
// 複数のタスクを実行for (int i = 0; i < 100; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed"); });}
// スレッドプールをシャットダウンexecutor.shutdown();try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); }} catch (InterruptedException e) { executor.shutdownNow();}スレッドプールの種類
Section titled “スレッドプールの種類”// 1. 固定サイズのスレッドプールExecutorService fixedPool = Executors.newFixedThreadPool(10);
// 2. キャッシュされたスレッドプール(必要に応じてスレッドを作成)ExecutorService cachedPool = Executors.newCachedThreadPool();
// 3. 単一スレッドのエグゼキューター(順次実行)ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 4. スケジュール付きエグゼキューターScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
// 遅延実行scheduledPool.schedule(() -> { System.out.println("Delayed task");}, 5, TimeUnit.SECONDS);
// 定期実行scheduledPool.scheduleAtFixedRate(() -> { System.out.println("Periodic task");}, 0, 1, TimeUnit.SECONDS);FutureとCallable
Section titled “FutureとCallable”ExecutorService executor = Executors.newFixedThreadPool(10);
// Callable: 値を返すタスクCallable<Integer> task = () -> { Thread.sleep(1000); return 42;};
// Future: 非同期処理の結果を取得Future<Integer> future = executor.submit(task);
try { // 結果を取得(ブロッキング) Integer result = future.get(); System.out.println("Result: " + result);
// タイムアウト付きで結果を取得 Integer result2 = future.get(5, TimeUnit.SECONDS);} catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace();}
// 複数のFutureを処理List<Callable<Integer>> tasks = Arrays.asList( () -> 1, () -> 2, () -> 3);
List<Future<Integer>> futures = executor.invokeAll(tasks);for (Future<Integer> f : futures) { System.out.println(f.get());}CompletableFuture
Section titled “CompletableFuture”CompletableFutureは、非同期処理をより柔軟に扱えるクラスです。
// 非同期処理の作成CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Hello";});
// 結果の処理future.thenApply(s -> s + " World") .thenAccept(System.out::println);
// エラーハンドリングCompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("Error"); } return "Success";});
future2.handle((result, error) -> { if (error != null) { System.out.println("Error: " + error.getMessage()); return "Default"; } return result;});
// 複数のFutureを結合CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2, (a, b) -> a + " " + b);System.out.println(combined.get()); // "Hello World"
// すべてのFutureが完了するまで待機CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);allOf.thenRun(() -> System.out.println("All tasks completed"));synchronized
Section titled “synchronized”public class Counter { private int count = 0;
// synchronizedメソッド public synchronized void increment() { count++; }
// synchronizedブロック public void increment2() { synchronized (this) { count++; } }
public synchronized int getCount() { return count; }}ReentrantLock
Section titled “ReentrantLock”import java.util.concurrent.locks.ReentrantLock;
public class Counter { private int count = 0; private ReentrantLock lock = new ReentrantLock();
public void increment() { lock.lock(); try { count++; } finally { lock.unlock(); } }
// tryLock: ロックを取得できるか試行 public boolean tryIncrement() { if (lock.tryLock()) { try { count++; return true; } finally { lock.unlock(); } } return false; }}CountDownLatch
Section titled “CountDownLatch”CountDownLatch latch = new CountDownLatch(3);
// 3つのタスクを実行for (int i = 0; i < 3; i++) { executor.submit(() -> { try { // タスクの処理 Thread.sleep(1000); } finally { latch.countDown(); // カウントダウン } });}
// すべてのタスクが完了するまで待機latch.await();System.out.println("All tasks completed");CyclicBarrier
Section titled “CyclicBarrier”CyclicBarrier barrier = new CyclicBarrier(3, () -> { System.out.println("All threads reached the barrier");});
for (int i = 0; i < 3; i++) { executor.submit(() -> { try { // 処理 Thread.sleep(1000); barrier.await(); // バリアに到達 // バリア通過後の処理 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } });}Semaphore
Section titled “Semaphore”Semaphore semaphore = new Semaphore(3); // 3つの許可
for (int i = 0; i < 10; i++) { executor.submit(() -> { try { semaphore.acquire(); // 許可を取得 // リソースを使用 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { semaphore.release(); // 許可を解放 } });}並行コレクション
Section titled “並行コレクション”ConcurrentHashMap
Section titled “ConcurrentHashMap”ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// スレッドセーフな操作map.put("Alice", 25);map.put("Bob", 30);
// アトミック操作map.computeIfAbsent("Charlie", k -> 20);map.computeIfPresent("Alice", (k, v) -> v + 1);
// 並行処理map.forEach(2, (key, value) -> { System.out.println(key + ": " + value);});BlockingQueue
Section titled “BlockingQueue”BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// プロデューサーexecutor.submit(() -> { try { queue.put("Item 1"); queue.put("Item 2"); queue.put("Item 3"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }});
// コンシューマーexecutor.submit(() -> { try { while (true) { String item = queue.take(); // ブロッキング System.out.println("Consumed: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); }});Fork/Joinフレームワーク
Section titled “Fork/Joinフレームワーク”大きなタスクを小さなタスクに分割して並列処理するフレームワークです。
import java.util.concurrent.RecursiveTask;import java.util.concurrent.ForkJoinPool;
public class SumTask extends RecursiveTask<Long> { private static final int THRESHOLD = 1000; private int[] array; private int start; private int end;
public SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { int length = end - start;
if (length <= THRESHOLD) { // しきい値以下なら直接計算 long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { // タスクを分割 int mid = start + length / 2; SumTask left = new SumTask(array, start, mid); SumTask right = new SumTask(array, mid, end);
left.fork(); // 左のタスクを非同期実行 long rightResult = right.compute(); // 右のタスクを実行 long leftResult = left.join(); // 左のタスクの結果を取得
return leftResult + rightResult; } }}
// 使用例int[] array = new int[10000];for (int i = 0; i < array.length; i++) { array[i] = i;}
ForkJoinPool pool = new ForkJoinPool();SumTask task = new SumTask(array, 0, array.length);Long result = pool.invoke(task);System.out.println("Sum: " + result);並行処理によるデータ処理
Section titled “並行処理によるデータ処理”public class DataProcessor { private ExecutorService executor = Executors.newFixedThreadPool(10);
public CompletableFuture<List<ProcessedData>> processData(List<RawData> rawDataList) { List<CompletableFuture<ProcessedData>> futures = rawDataList.stream() .map(data -> CompletableFuture.supplyAsync(() -> process(data), executor)) .collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }
private ProcessedData process(RawData data) { // データ処理 return new ProcessedData(data); }}並行処理のポイント:
- ExecutorService: スレッドプールの管理
- Future/Callable: 非同期処理の結果取得
- CompletableFuture: 柔軟な非同期処理
- 同期機構: synchronized、ReentrantLock、CountDownLatch、CyclicBarrier、Semaphore
- 並行コレクション: ConcurrentHashMap、BlockingQueue
- Fork/Join: 大きなタスクの並列処理
適切な並行処理の実装により、パフォーマンスを向上させられます。