一、首先我们先测试一下,普通的阻塞调用的效果(调用10次):
调用者端串行调用;
被调用者端每个请求睡眠1秒;
这样的结果就是:调用者端每一次请求由于等不到响应,都会阻塞等待(Http请求是同步阻塞的),最终耗时略大于10秒;
1、调用者端serviceImpl方法:
@Override
public void testRest() {
String targetMethod = "最终返回结果为:%s"; //模拟目标对象.方法()
StopWatch stopWatch = new StopWatch("testRest");
stopWatch.start();
for (int i = 1; i <= 10; i++) {
ResponseEntity<BaseResponse> resp = restTemplate.getForEntity("http://localhost:9000/resp/" + String.valueOf(i), BaseResponse.class);
if (resp.getStatusCode().is2xxSuccessful()){
String result = String.format(targetMethod, resp.getBody().getData());
log.info(result);
}
}
stopWatch.stop();
log.info("总耗时:{}秒", stopWatch.getTotalTimeSeconds());
}
2、被调用者端Controller方法:
@RestController
public class ClientBController {
@GetMapping("/resp/{origin}")
public BaseResponse<String> test(@PathVariable("origin") String origin){
try {
TimeUnit.SECONDS.sleep(1); //模拟1秒处理时间
} catch (InterruptedException e) {
e.printStackTrace();
}
return BaseResponse.success("TEST-REST-"+origin);
}
}
3、最终调用后打印结果:
2023-05-09 11:11:06.406 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-1 2023-05-09 11:11:07.422 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-2 2023-05-09 11:11:08.442 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-3 2023-05-09 11:11:09.463 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-4 2023-05-09 11:11:10.483 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-5 2023-05-09 11:11:11.500 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-6 2023-05-09 11:11:12.519 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-7 2023-05-09 11:11:13.543 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-8 2023-05-09 11:11:14.548 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-9 2023-05-09 11:11:15.564 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-10 2023-05-09 11:11:15.564 INFO ...ReqServiceImpl : 总耗时:10.3514294秒
同步阻塞调用效果不理想!
二、借助中间件Kafka将同步阻塞调用优化为异步非阻塞调用
1、调用者端Kafka的配置:
application.yml:
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVERS:10.206.98.2:9092}
producer:
retries: 0
batch-size: 16384
buffer-memory: 33444432
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
# value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
consumer:
enable-auto-commit: true
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1000
KafkaConfig类:
// 使用10个分区
@Configuration
public class KafkaConfig {
@Bean
public NewTopic requestTopic(){
return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1);
}
}
2、调用者端serviceImpl方法:
ConcurrentHashMap<String, JiguiquanCallback<String>> pendingRequest = new ConcurrentHashMap<>();
public void testNonBlock() throws InterruptedException {
String targetMethod = "最终返回结果为:%s"; //模拟目标对象.方法()
StopWatch stopWatch = new StopWatch("testNonBlock");
stopWatch.start();
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 1; i <= 10; i++) {
String reqId = UUID.randomUUID().toString();
JiguiquanCallback<String> callback = new JiguiquanCallback<String>() {
@Override
public void onSuccess(String msg) {
String result = String.format(targetMethod, msg);
log.info(result);
countDownLatch.countDown();
}
@Override
public void onError(Throwable e) {
log.error("请求失败{}", e.getMessage());
}
};
pendingRequest.put(reqId, callback);
JiguiquanKafkaMsg<String> reqMsg = new JiguiquanKafkaMsg<>();
reqMsg.getHeaders().put("reqId", reqId);
reqMsg.setData(String.valueOf(i));
kafkaTemplate.send(KafkaConstant.KAFKA_REQUEST_TOPIC, String.valueOf(i), JSONUtil.toJsonStr(reqMsg)).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("请求发送失败:{},移除回调", ex.getMessage());
pendingRequest.remove(reqId);
}
@Override
public void onSuccess(Object result) {
// log.info("请求发送成功:{},等待回调", result.toString());
}
});
}
countDownLatch.await();
stopWatch.stop();
log.info("总耗时:{}秒", stopWatch.getTotalTimeSeconds());
}
@KafkaListener(topics = KafkaConstant.KAFKA_RESPONSE_TOPIC, groupId = "clientA")
public void consumer(ConsumerRecord<String, String> record) {
JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class);
String reqId = String.valueOf(msg.getHeaders().get("reqId"));
String value = String.valueOf(msg.getData());
JiguiquanCallback<String> callback = pendingRequest.get(reqId);
callback.onSuccess(value);
}
3、其中我定义的回调类:
public interface JiguiquanCallback<T> {
JiguiquanCallback<Void> EMPTY = new JiguiquanCallback<Void>() {
@Override
public void onSuccess(Void msg) {
}
@Override
public void onError(Throwable e) {
}
};
void onSuccess(T msg);
void onError(Throwable e);
}
4、简单Kafka消息包装类:
@Data
public class JiguiquanKafkaMsg<T> {
private Map<String, String> headers = new HashMap<>();
private T data;
}
5、被调用者的Kafka配置:
application.yml:
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVERS:10.206.98.2:9092}
producer:
retries: 0
batch-size: 16384
buffer-memory: 33444432
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
# value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
consumer:
enable-auto-commit: true
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 1000
KafkaConfig类:
// 使用10个分区
@Configuration
public class KafkaConfig {
@Bean
public NewTopic requestTopic(){
return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1);
}
}
6、被调用者端的Kafka消费处理逻辑:
@Component
@Slf4j
public class RequestConsumer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = KafkaConstant.KAFKA_REQUEST_TOPIC, groupId = "clinetB", concurrency = "10")
public void consumer(ConsumerRecord<String, String> record) {
log.info("收到数据:{}", record);
try {
TimeUnit.SECONDS.sleep(1); //模拟1秒处理时间
} catch (InterruptedException e) {
e.printStackTrace();
}
JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class);
String reqId = String.valueOf(msg.getHeaders().get("reqId"));
String value = String.valueOf(msg.getData());
JiguiquanKafkaMsg respMsg = new JiguiquanKafkaMsg();
respMsg.getHeaders().put("reqId", reqId);
respMsg.setData("TEST-NONBLOCK" + value);
kafkaTemplate.send(KafkaConstant.KAFKA_RESPONSE_TOPIC, JSONUtil.toJsonStr(respMsg));
}
}
7、最终调用后打印结果:
2023-05-09 11:24:14.157 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK2 2023-05-09 11:24:14.158 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK3 2023-05-09 11:24:14.158 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK10 2023-05-09 11:24:14.158 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK8 2023-05-09 11:24:14.159 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK5 2023-05-09 11:24:14.159 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK1 2023-05-09 11:24:15.042 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK7 2023-05-09 11:24:15.042 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK4 2023-05-09 11:24:15.043 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK6 2023-05-09 11:24:16.055 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK9 2023-05-09 11:24:16.056 - [nio-8000-exec-1] c.j.www.service.impl.ReqServiceImpl : 总耗时:3.6761809秒
8、分析:为什么不是1秒?
虽然效率提升了很多,但是我其实期望的执行时间为1秒,但是总耗时却是3秒?
那是因为,有一些消息被投递到了同一个kafka分区,而消费者在消费同一个分区的数据时,是顺序的,所以执行时间就变长了;
通过关差ClinetB的消费记录就可以知道:

可以看到,partition=3的分区,被投放了3条消息!
我们的优化思路就是,让kafka的投递,尽可能地均匀,10条数据均匀投递到10个分区中;
三、自定义kafka分区策略,让消息投递更均匀,将执行时间优化至1秒
1、复习一下Kafka生产者的分区策略:
kafka使用的默认的分区器是:org.apache.kafka.clients.producer.internals.DefaultPartitioner
这个分区器由传递的参数决定具体的分区策略:
-
当不指定分区key时 —— 黏性分区策略(2.4.0之前为轮询);
-
指定分区key时 —— hash(key)%分区数;
-
指定分区号 —— 全部投递到指定分区号上;
可见,都不能保证完全均匀地分散到每一个分区上,我们要想实现绝对均匀,要么降低kafka版本,要么自定义分区器;
2、自定义分区器(最简单的实现,如果要绝对轮询,可以自己计数,这里不是重点)
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return Integer.valueOf(String.valueOf(key)) % 10;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
3、将自定义的分区器给Kafka安排上(ClientA):
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
@Bean
public NewTopic requestTopic(){
return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1);
}
@Bean
public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties kafkaProperties) {
Map<String, Object> properties = kafkaProperties.buildProducerProperties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class);
return new DefaultKafkaProducerFactory<>(properties);
}
}
4、现在我们再来测试下效果:
2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK3 2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK5 2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK8 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK6 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK2 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK1 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK9 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK4 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK7 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK10 2023-05-09 13:47:30.281 - [nio-8000-exec-2] c.j.www.service.impl.ReqServiceImpl : 总耗时:1.0996998秒
通过ClientB的消费记录,可以看到,10条消息均匀地投递到了10个分区之上:

四、如果前端Http调用ClinetA,等待返回结果怎么办?
我们从上面的代码中可以知道,我们自定义的回调类是没有返回值的,那是因为在多线程的知识点中,普通的runnable是没有返回值的;
如果需要返回值,我们得需要使用Future,而JDK8之后,为我们提供了更加好用的CompleteFuture,我们可以使用这个类,来等待返回结果,返回给前端!
究其根本,是前端调用后端的过程,还没有办法显示异步非阻塞调用;(不是说真没有,而是大部分公司的前端没有这种优化的概念)
1、我们创建一个回调管理类ResponseHandler:
public class ResponseHandler {
private static ConcurrentHashMap<String, CompletableFuture<String>> mapping = new ConcurrentHashMap<>();
public static void addCallback(String requestID, CompletableFuture<String> cf){
mapping.putIfAbsent(requestID, cf);
}
public static CompletableFuture<String> getCallBack(String requestID){
return mapping.get(requestID);
}
public static void runCallback(String requestID, String msg){
CompletableFuture<String> cf = mapping.get(requestID);
if (Objects.isNull(cf)){
return; //可能并不是本集群节点发出的请求
}
cf.complete(msg);
removeCallback(requestID);
}
public static void removeCallback(String requestID) {
mapping.remove(requestID);
}
}
2、controller中定义的接口:
@GetMapping("/test/{id}")
public BaseResponse<String> test(@PathVariable("id") String id){
try {
return BaseResponse.success(reqService.test(id));
} catch (Exception e) {
throw ZidanApiException.create("500", "请求失败2", "request failed");
}
}
3、具体的serviceImpl方法实现:
public String test(String id) throws ExecutionException, InterruptedException {
String reqId = UUID.randomUUID().toString();
CompletableFuture<String> cf = new CompletableFuture<>();
ResponseHandler.addCallback(reqId, cf);
JiguiquanKafkaMsg<String> reqMsg = new JiguiquanKafkaMsg<>();
reqMsg.getHeaders().put("reqId", reqId);
reqMsg.setData(id);
kafkaTemplate.send(KafkaConstant.KAFKA_REQUEST_TOPIC, JSONUtil.toJsonStr(reqMsg)).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("请求发送失败:{},移除回调", ex.getMessage());
ResponseHandler.removeCallback(reqId);
}
@Override
public void onSuccess(Object result) {
// log.info("请求发送成功:{},等待回调", result.toString());
}
});
if (Objects.isNull(ResponseHandler.getCallBack(reqId))){
throw ZidanApiException.create("500", "请求失败1", "request failed");
} else {
return cf.get();
}
}
4、调用端等待response的消费者代码:
@Slf4j
public class ResponseConsumer {
@KafkaListener(topics = KafkaConstant.KAFKA_RESPONSE_TOPIC, groupId = "clientA")
public void consumer(ConsumerRecord<String, String> record) {
JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class);
String reqId = String.valueOf(msg.getHeaders().get("reqId"));
String value = String.valueOf(msg.getData());
ResponseHandler.runCallback(reqId, value); //value值将被cf.get()获取到
}
}
5、测试调用效果:

除了使用CompleteFuture.get()阻塞等待外,还可以通过将httpResponse传递到我们自己定义的callback的onSuccess()方法中,主动写出,这样就不需要借助CompleteFuture的阻塞了;这点在我们处理netty网络框架实施的需要自己主动写出的逻辑时,很实用;
如mqtt连接时,我们通过将ctx传递给callback的onSuccess()方法,就可以很方便地在获得验证结果返回后,给mqtt client端以响应!



