1、自定义异步线程池
@SpringBootApplication
public class AsyncConfigExample{
@Bean
WebMvcConfigurer configurer(){
return new WebMvcConfigurerAdapter(){
@Override
public void configureAsyncSupport (AsyncSupportConfigurer configurer) {
ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
t.setCorePoolSize(10);
t.setMaxPoolSize(100);
t.setQueueCapacity(50);
t.setAllowCoreThreadTimeOut(true);
t.setKeepAliveSeconds(120);
t.initialize();
configurer.setTaskExecutor(t);
}
};
}
public static void main (String[] args) {
SpringApplication.run(AsyncConfigExample.class, args);
}
}
2、使用异步注解@Async
@Component
public class AsyncExceptionDemo {
private static final Logger log = LoggerFactory.getLogger(AsyncExceptionDemo.class);
/**
* 最简单的异步调用,返回值为void
*/
@Async
public void asyncInvokeSimplest() {
log.info("asyncSimplest");
}
/**
* 带参数的异步调用 异步方法可以传入参数
* 对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉
* @param s
*/
@Async
public void asyncInvokeWithException(String s) {
log.info("asyncInvokeWithParameter, parementer={}", s);
throw new IllegalArgumentException(s);
}
/**
* 异常调用返回Future
* 对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
* 或者在调用方在调用Futrue.get时捕获异常进行处理
*
* @param i
* @return
*/
@Async
public Future<String> asyncInvokeReturnFuture(int i) {
log.info("asyncInvokeReturnFuture, parementer={}", i);
Future<String> future;
try {
Thread.sleep(1000 * 1);
future = new AsyncResult<String>("success:" + i);
throw new IllegalArgumentException("a");
} catch (InterruptedException e) {
future = new AsyncResult<String>("error");
} catch(IllegalArgumentException e){
future = new AsyncResult<String>("error-IllegalArgumentException");
}
return future;
}
}
3、实现AsyncConfigurer接口对异常线程池更加细粒度的控制
a) 创建线程自己的线程池
b) 对void方法抛出的异常处理的类AsyncUncaughtExceptionHandler
/**
* 通过实现AsyncConfigurer自定义异常线程池,包含异常处理
*
*/
@Service
public class MyAsyncConfigurer implements AsyncConfigurer{
private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class);
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(1);
threadPool.setMaxPoolSize(1);
threadPool.setWaitForTasksToCompleteOnShutdown(true);
threadPool.setAwaitTerminationSeconds(60 * 15);
threadPool.setThreadNamePrefix("MyAsync-");
threadPool.initialize();
return threadPool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncExceptionHandler();
}
/**
* 自定义异常处理类
* @author hry
*
*/
class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param : obj) {
log.info("Parameter value - " + param);
}
}
}
}
@SpringBootApplication
@EnableAsync // 启动异步调用
public class AsyncApplicationWithAsyncConfigurer {
private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAsyncConfigurer.class);
public static void main(String[] args) {
log.info("Start AsyncApplication.. ");
SpringApplication.run(AsyncApplicationWithAsyncConfigurer.class, args);
}
}
4、默认定时任务配置
springboot默认定时任务:
1)注解@EnableScheduling 开启定时任务
2)注解@Scheduled(cron = "") 配置定时任务(默认单线程方式,多个定时任务同时执行时,一个定时任务完成后另一个定时任务才开始执行)
3)SchedulingConfigurer 实现SchedulingConfigurer接口,重写configureTasks方法,可以配置默认定时任务的线程池
4)注解@EnableAsync 开启异步执行
5)@Async("scheduler") 异步的方式执行定时任务,不指定定时任务线程池的话,默认生成100个线程
@Bean("scheduler")
public TaskScheduler scheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setThreadNamePrefix("test");
threadPoolTaskScheduler.setPoolSize(5);
threadPoolTaskScheduler.initialize();
return threadPoolTaskScheduler;
}
6)AsyncConfigurer 可以通过实现AsyncConfigurer自定义异常线程池
@Scheduled实现多线程并发定时任务 方案一
1:通过ScheduleConfig配置文件实现SchedulingConfigurer接口,并重写setSchedulerfang方法
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
public class ScheduledConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.setScheduler(setTaskExecutors());
}
@Bean(destroyMethod="shutdown")
public Executor setTaskExecutors(){
// 10个线程来处理。
return Executors.newScheduledThreadPool(10);
}
}
2:创建Bean
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class TaskSchedulerConfig {
//线程池应该交给容器管理
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
return scheduler;
}
}
/**
* 原生(Spring)异步任务线程池装配类
*/
@Slf4j
@Configuration
public class NativeAsyncTaskExecutePool implements AsyncConfigurer {
//注入配置类
@Autowired
TaskThreadPoolConfig config;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("MyExecutor-");
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 异步任务中异常处理
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
log.error("=========================="+arg0.getMessage()+"=======================", arg0);
log.error("exception method:"+arg1.getName());
}
};
}
}