有这样一个需求在Spring Boot中利用多线程技术实现数据的批量处理并反馈批量处理的结果到前端RESTFul接口上,想要实现这个操作,我们可以考虑通过使用ExecutorService来管理线程池,以便处理批量数据,具体操作如下所示。
使用线程池进行批量处理
使用ExecutorService来管理线程池,代码实现如下所示。
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@Service
public class BatchProcessingService {
private final Executor executor;
public BatchProcessingService() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(100);
taskExecutor.initialize();
this.executor = taskExecutor;
}
public CompletableFuture<String> processBatchData(List<String> dataBatch) {
return CompletableFuture.supplyAsync(() -> {
// 处理批量数据的逻辑
for (String data : dataBatch) {
// 处理每个数据项
processData(data);
}
return "Batch processing completed";
}, executor);
}
private void processData(String data) {
// 模拟处理数据
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
在上述代码中我们通过ExecutorService来管理线程池,并且在线程池中模拟了批量数据操作,并且模拟了数据耗时操作。
创建RESTful接口以触发批量处理
既然要模拟反馈结果给前端,那么就需要创建一个RESTful的接口类用来触发批量操作,如下所示。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/api/batch")
public class BatchProcessingController {
@Autowired
private BatchProcessingService batchProcessingService;
@GetMapping("/process")
public CompletableFuture<String> processBatch(@RequestParam List<String> data) {
List<String> dataBatch = Arrays.asList(data.toArray(new String[0]));
return batchProcessingService.processBatchData(dataBatch);
}
}
对于结果的反馈,我们用到了CompletableFuture<String>该对象表示异步计算的结果。RESTful接口在接收到请求后,会立即返回这个 CompletableFuture,而不是等待批量处理完成。客户端可以通过这种方式异步地获取处理结果。
前端获取处理状态
如果前端需要持续跟踪处理状态,可以设计一个状态查询接口。例如,可以将每次处理的任务ID返回给前端,前端通过任务ID查询处理进度,如下所示。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@RestController
@RequestMapping("/api/batch")
public class BatchProcessingController {
private ConcurrentHashMap<String, CompletableFuture<String>> taskMap = new ConcurrentHashMap<>();
@Autowired
private BatchProcessingService batchProcessingService;
@GetMapping("/process")
public String processBatch(@RequestParam List<String> data) {
String taskId = UUID.randomUUID().toString();
List<String> dataBatch = Arrays.asList(data.toArray(new String[0]));
CompletableFuture<String> future = batchProcessingService.processBatchData(dataBatch);
taskMap.put(taskId, future);
return taskId;
}
@GetMapping("/status")
public String getStatus(@RequestParam String taskId) {
CompletableFuture<String> future = taskMap.get(taskId);
if (future == null) {
return "Task ID not found";
}
if (future.isDone()) {
return future.join(); // 返回处理结果
} else {
return "Processing";
}
}
}
总结
通过上述代码,Spring Boot应用可以在后台使用多线程技术进行批量数据处理,并能够通过RESTful接口将处理结果反馈给前端。这样既提高了应用的并发处理能力,又确保了前端能够实时了解处理进度和结果。