Spring Bootでの並列処理の実装方法を記述していきます。SpringBootでは非同期処理を簡単に使える仕組みが用意されており、基本的には並列処理を行いたいメソッドに@Asyncをつけ、アプリケーション全体の設定で@EnableAsyncをつければ良いみたいです。今回は試しに10秒間停止するだけの並列メソッドを作成してみます。
メイン処理
Controller
並列処理の呼び出しを行います。thenAcceptAsyncメソッドでComplerableFutureにラップされた結果を受け取ります。
CompletableFuture<String> parallelProcess = service.parallelAct();
// parallelProcess が完全に終了した際に実行される処理
parallelProcess.thenAcceptAsync(parallelProcesssResult -> {
log.info("finished");
});
Service
開始ログ出力、10秒停止、終了ログ出力を行っています。
@Slf4j
@Service
public class Service {
@Async
public CompletableFuture<String> parallelAct() throws InterruptedException {
log.info("Async start");
Thread.sleep(10000);
log.info("Async start");
return CompletableFuture.completedFuture("");
}
}
設定処理
Application.java
アプリケーションの設定を行っています。@EnableAsyncを設定し、並列処理を許可し、getAsyncExecutorで並列処理の詳細設定を行います。
@EnableAsync
当アノテーションを追加して、非同期処理を有効化します。
setCorePoolSize
並列処理プールサイズを設定します。未設定の場合のデフォルト値は1。
setQueueCapacity
タスクのキューサイズを設定します。キューに貯められたタスクをアイドルのスレッドが処理していきます。未設定の場合のデフォルト値はInteger.MAX_VALUE。
setMaxPoolSize
並列処理のプール最大数を設定します。未設定の場合のデフォルト値はInteger.MAX_VALUE。
setThreadNamePrefix
非同期処理用のスレッド名を設定します。
setKeepAliveSeconds
CorePoolSize以上でアイドルのスレッドの生存時間を指定します。デフォルトは60秒です。
@SpringBootApplication
@EnableAsync
public class Application extends AsyncConfigurerSupport {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
/**
* 並列処理の設定
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); //プール数
executor.setQueueCapacity(10); // キュー最大数
executor.setMaxPoolSize(15); // 並列処理最大数
executor.setThreadNamePrefix("parallelAct-"); // 非同期処理用のスレッド名を設定
executor.initialize();
return executor;
}
}
プールサイズの上限に達した場合
要求数がMaxPoolSize + QueueCapacity以上になった場合にRejectedExecutionExceptionという例外がスローされます。エラー時に任意の処理を実行したい場合はRejectedExecutionHandlerインターフェースを実装したクラスを下記のようにThreadPoolTaskExecutorへ渡します。明示しない場合のデフォルトはThreadPoolExecutor#AbortPolicyとなります。
プールサイズエラー時の処理
parallelProcess.setRejectedExecutionHandler((r,e) -> {
// 実行したい任意の処理を実装
throw new RejectedExecutionException("Task:" + r.toString() + " rejected from " + e.toString());
});
実行結果
2020/08/06 14:28:08 INFO [jp.co.test.application.service] [parallelAct-1] - asynctest-start
2020/08/06 14:28:09 INFO [jp.co.test.application.service] [parallelAct-2] - asynctest-start
2020/08/06 14:28:10 INFO [jp.co.test.application.service] [parallelAct-3] - asynctest-start
2020/08/06 14:28:10 INFO [jp.co.test.application.service] [parallelAct-4] - asynctest-start
2020/08/06 14:28:10 INFO [jp.co.test.application.service] [parallelAct-5] - asynctest-start
2020/08/06 14:28:10 INFO [jp.co.test.application.service] [parallelAct-6] - asynctest-start
2020/08/06 14:28:11 INFO [jp.co.test.application.service] [parallelAct-7] - asynctest-start
2020/08/06 14:28:11 INFO [jp.co.test.application.service] [parallelAct-8] - asynctest-start
2020/08/06 14:28:12 INFO [jp.co.test.application.service] [parallelAct-9] - asynctest-start
2020/08/06 14:28:12 INFO [jp.co.test.application.service] [parallelAct-10] - asynctest-start
2020/08/06 14:28:13 INFO [jp.co.test.application.service] [parallelAct-11] - asynctest-start
2020/08/06 14:28:13 INFO [jp.co.test.application.service] [parallelAct-12] - asynctest-start
2020/08/06 14:28:14 INFO [jp.co.test.application.service] [parallelAct-13] - asynctest-start
2020/08/06 14:28:15 INFO [jp.co.test.application.service] [parallelAct-14] - asynctest-start
2020/08/06 14:28:15 INFO [jp.co.test.application.service] [parallelAct-15] - asynctest-start
2020/08/06 14:28:18 INFO [jp.co.test.application.service] [parallelAct-1] - asynctest-end
2020/08/06 14:28:18 INFO [jp.co.test.application.service] [parallelAct-1] - asynctest-start
2020/08/06 14:28:19 INFO [jp.co.test.application.service] [parallelAct-2] - asynctest-end
2020/08/06 14:28:20 INFO [jp.co.test.application.service] [parallelAct-3] - asynctest-end
2020/08/06 14:28:20 INFO [jp.co.test.application.service] [parallelAct-4] - asynctest-end
2020/08/06 14:28:20 INFO [jp.co.test.application.service] [parallelAct-5] - asynctest-end
2020/08/06 14:28:20 INFO [jp.co.test.application.service] [parallelAct-6] - asynctest-end
2020/08/06 14:28:21 INFO [jp.co.test.application.service] [parallelAct-7] - asynctest-end
2020/08/06 14:28:21 INFO [jp.co.test.application.service] [parallelAct-8] - asynctest-end
2020/08/06 14:28:22 INFO [jp.co.test.application.service] [parallelAct-9] - asynctest-end
2020/08/06 14:28:22 INFO [jp.co.test.application.service] [parallelAct-10] - asynctest-end
2020/08/06 14:28:23 INFO [jp.co.test.application.service] [parallelAct-11] - asynctest-end
2020/08/06 14:28:23 INFO [jp.co.test.application.service] [parallelAct-12] - asynctest-end
2020/08/06 14:28:24 INFO [jp.co.test.application.service] [parallelAct-13] - asynctest-end
2020/08/06 14:28:25 INFO [jp.co.test.application.service] [parallelAct-14] - asynctest-end
2020/08/06 14:28:25 INFO [jp.co.test.application.service] [parallelAct-15] - asynctest-end