一、首先我们先测试一下,普通的阻塞调用的效果(调用10次):
调用者端串行调用;
被调用者端每个请求睡眠1秒;
这样的结果就是:调用者端每一次请求由于等不到响应,都会阻塞等待(Http请求是同步阻塞的),最终耗时略大于10秒;
1、调用者端serviceImpl方法:
@Override public void testRest() { String targetMethod = "最终返回结果为:%s"; //模拟目标对象.方法() StopWatch stopWatch = new StopWatch("testRest"); stopWatch.start(); for (int i = 1; i <= 10; i++) { ResponseEntity<BaseResponse> resp = restTemplate.getForEntity("http://localhost:9000/resp/" + String.valueOf(i), BaseResponse.class); if (resp.getStatusCode().is2xxSuccessful()){ String result = String.format(targetMethod, resp.getBody().getData()); log.info(result); } } stopWatch.stop(); log.info("总耗时:{}秒", stopWatch.getTotalTimeSeconds()); }
2、被调用者端Controller方法:
@RestController public class ClientBController { @GetMapping("/resp/{origin}") public BaseResponse<String> test(@PathVariable("origin") String origin){ try { TimeUnit.SECONDS.sleep(1); //模拟1秒处理时间 } catch (InterruptedException e) { e.printStackTrace(); } return BaseResponse.success("TEST-REST-"+origin); } }
3、最终调用后打印结果:
2023-05-09 11:11:06.406 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-1 2023-05-09 11:11:07.422 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-2 2023-05-09 11:11:08.442 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-3 2023-05-09 11:11:09.463 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-4 2023-05-09 11:11:10.483 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-5 2023-05-09 11:11:11.500 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-6 2023-05-09 11:11:12.519 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-7 2023-05-09 11:11:13.543 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-8 2023-05-09 11:11:14.548 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-9 2023-05-09 11:11:15.564 INFO ...ReqServiceImpl : 最终返回结果为:TEST-REST-10 2023-05-09 11:11:15.564 INFO ...ReqServiceImpl : 总耗时:10.3514294秒
同步阻塞调用效果不理想!
二、借助中间件Kafka将同步阻塞调用优化为异步非阻塞调用
1、调用者端Kafka的配置:
application.yml:
spring: kafka: bootstrap-servers: ${KAFKA_SERVERS:10.206.98.2:9092} producer: retries: 0 batch-size: 16384 buffer-memory: 33444432 key-deserializer: org.apache.kafka.common.serialization.StringSerializer # value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer value-deserializer: org.apache.kafka.common.serialization.StringSerializer acks: all consumer: enable-auto-commit: true auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 1000
KafkaConfig类:
// 使用10个分区 @Configuration public class KafkaConfig { @Bean public NewTopic requestTopic(){ return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1); } }
2、调用者端serviceImpl方法:
ConcurrentHashMap<String, JiguiquanCallback<String>> pendingRequest = new ConcurrentHashMap<>(); public void testNonBlock() throws InterruptedException { String targetMethod = "最终返回结果为:%s"; //模拟目标对象.方法() StopWatch stopWatch = new StopWatch("testNonBlock"); stopWatch.start(); CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <= 10; i++) { String reqId = UUID.randomUUID().toString(); JiguiquanCallback<String> callback = new JiguiquanCallback<String>() { @Override public void onSuccess(String msg) { String result = String.format(targetMethod, msg); log.info(result); countDownLatch.countDown(); } @Override public void onError(Throwable e) { log.error("请求失败{}", e.getMessage()); } }; pendingRequest.put(reqId, callback); JiguiquanKafkaMsg<String> reqMsg = new JiguiquanKafkaMsg<>(); reqMsg.getHeaders().put("reqId", reqId); reqMsg.setData(String.valueOf(i)); kafkaTemplate.send(KafkaConstant.KAFKA_REQUEST_TOPIC, String.valueOf(i), JSONUtil.toJsonStr(reqMsg)).addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { log.error("请求发送失败:{},移除回调", ex.getMessage()); pendingRequest.remove(reqId); } @Override public void onSuccess(Object result) { // log.info("请求发送成功:{},等待回调", result.toString()); } }); } countDownLatch.await(); stopWatch.stop(); log.info("总耗时:{}秒", stopWatch.getTotalTimeSeconds()); } @KafkaListener(topics = KafkaConstant.KAFKA_RESPONSE_TOPIC, groupId = "clientA") public void consumer(ConsumerRecord<String, String> record) { JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class); String reqId = String.valueOf(msg.getHeaders().get("reqId")); String value = String.valueOf(msg.getData()); JiguiquanCallback<String> callback = pendingRequest.get(reqId); callback.onSuccess(value); }
3、其中我定义的回调类:
public interface JiguiquanCallback<T> { JiguiquanCallback<Void> EMPTY = new JiguiquanCallback<Void>() { @Override public void onSuccess(Void msg) { } @Override public void onError(Throwable e) { } }; void onSuccess(T msg); void onError(Throwable e); }
4、简单Kafka消息包装类:
@Data public class JiguiquanKafkaMsg<T> { private Map<String, String> headers = new HashMap<>(); private T data; }
5、被调用者的Kafka配置:
application.yml:
spring: kafka: bootstrap-servers: ${KAFKA_SERVERS:10.206.98.2:9092} producer: retries: 0 batch-size: 16384 buffer-memory: 33444432 key-deserializer: org.apache.kafka.common.serialization.StringSerializer # value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer value-deserializer: org.apache.kafka.common.serialization.StringSerializer acks: all consumer: enable-auto-commit: true auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 1000
KafkaConfig类:
// 使用10个分区 @Configuration public class KafkaConfig { @Bean public NewTopic requestTopic(){ return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1); } }
6、被调用者端的Kafka消费处理逻辑:
@Component @Slf4j public class RequestConsumer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @KafkaListener(topics = KafkaConstant.KAFKA_REQUEST_TOPIC, groupId = "clinetB", concurrency = "10") public void consumer(ConsumerRecord<String, String> record) { log.info("收到数据:{}", record); try { TimeUnit.SECONDS.sleep(1); //模拟1秒处理时间 } catch (InterruptedException e) { e.printStackTrace(); } JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class); String reqId = String.valueOf(msg.getHeaders().get("reqId")); String value = String.valueOf(msg.getData()); JiguiquanKafkaMsg respMsg = new JiguiquanKafkaMsg(); respMsg.getHeaders().put("reqId", reqId); respMsg.setData("TEST-NONBLOCK" + value); kafkaTemplate.send(KafkaConstant.KAFKA_RESPONSE_TOPIC, JSONUtil.toJsonStr(respMsg)); } }
7、最终调用后打印结果:
2023-05-09 11:24:14.157 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK2 2023-05-09 11:24:14.158 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK3 2023-05-09 11:24:14.158 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK10 2023-05-09 11:24:14.158 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK8 2023-05-09 11:24:14.159 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK5 2023-05-09 11:24:14.159 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK1 2023-05-09 11:24:15.042 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK7 2023-05-09 11:24:15.042 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK4 2023-05-09 11:24:15.043 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK6 2023-05-09 11:24:16.055 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK9 2023-05-09 11:24:16.056 - [nio-8000-exec-1] c.j.www.service.impl.ReqServiceImpl : 总耗时:3.6761809秒
8、分析:为什么不是1秒?
虽然效率提升了很多,但是我其实期望的执行时间为1秒,但是总耗时却是3秒?
那是因为,有一些消息被投递到了同一个kafka分区,而消费者在消费同一个分区的数据时,是顺序的,所以执行时间就变长了;
通过关差ClinetB的消费记录就可以知道:
可以看到,partition=3的分区,被投放了3条消息!
我们的优化思路就是,让kafka的投递,尽可能地均匀,10条数据均匀投递到10个分区中;
三、自定义kafka分区策略,让消息投递更均匀,将执行时间优化至1秒
1、复习一下Kafka生产者的分区策略:
kafka使用的默认的分区器是:org.apache.kafka.clients.producer.internals.DefaultPartitioner
这个分区器由传递的参数决定具体的分区策略:
-
当不指定分区key时 —— 黏性分区策略(2.4.0之前为轮询);
-
指定分区key时 —— hash(key)%分区数;
-
指定分区号 —— 全部投递到指定分区号上;
可见,都不能保证完全均匀地分散到每一个分区上,我们要想实现绝对均匀,要么降低kafka版本,要么自定义分区器;
2、自定义分区器(最简单的实现,如果要绝对轮询,可以自己计数,这里不是重点)
public class CustomizePartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return Integer.valueOf(String.valueOf(key)) % 10; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
3、将自定义的分区器给Kafka安排上(ClientA):
@Configuration @EnableConfigurationProperties(KafkaProperties.class) public class KafkaConfig { @Bean public NewTopic requestTopic(){ return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1); } @Bean public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties kafkaProperties) { Map<String, Object> properties = kafkaProperties.buildProducerProperties(); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class); return new DefaultKafkaProducerFactory<>(properties); } }
4、现在我们再来测试下效果:
2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK3 2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK5 2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK8 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK6 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK2 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK1 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK9 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK4 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK7 2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK10 2023-05-09 13:47:30.281 - [nio-8000-exec-2] c.j.www.service.impl.ReqServiceImpl : 总耗时:1.0996998秒
通过ClientB的消费记录,可以看到,10条消息均匀地投递到了10个分区之上:
四、如果前端Http调用ClinetA,等待返回结果怎么办?
我们从上面的代码中可以知道,我们自定义的回调类是没有返回值的,那是因为在多线程的知识点中,普通的runnable是没有返回值的;
如果需要返回值,我们得需要使用Future,而JDK8之后,为我们提供了更加好用的CompleteFuture,我们可以使用这个类,来等待返回结果,返回给前端!
究其根本,是前端调用后端的过程,还没有办法显示异步非阻塞调用;(不是说真没有,而是大部分公司的前端没有这种优化的概念)
1、我们创建一个回调管理类ResponseHandler:
public class ResponseHandler { private static ConcurrentHashMap<String, CompletableFuture<String>> mapping = new ConcurrentHashMap<>(); public static void addCallback(String requestID, CompletableFuture<String> cf){ mapping.putIfAbsent(requestID, cf); } public static CompletableFuture<String> getCallBack(String requestID){ return mapping.get(requestID); } public static void runCallback(String requestID, String msg){ CompletableFuture<String> cf = mapping.get(requestID); if (Objects.isNull(cf)){ return; //可能并不是本集群节点发出的请求 } cf.complete(msg); removeCallback(requestID); } public static void removeCallback(String requestID) { mapping.remove(requestID); } }
2、controller中定义的接口:
@GetMapping("/test/{id}") public BaseResponse<String> test(@PathVariable("id") String id){ try { return BaseResponse.success(reqService.test(id)); } catch (Exception e) { throw ZidanApiException.create("500", "请求失败2", "request failed"); } }
3、具体的serviceImpl方法实现:
public String test(String id) throws ExecutionException, InterruptedException { String reqId = UUID.randomUUID().toString(); CompletableFuture<String> cf = new CompletableFuture<>(); ResponseHandler.addCallback(reqId, cf); JiguiquanKafkaMsg<String> reqMsg = new JiguiquanKafkaMsg<>(); reqMsg.getHeaders().put("reqId", reqId); reqMsg.setData(id); kafkaTemplate.send(KafkaConstant.KAFKA_REQUEST_TOPIC, JSONUtil.toJsonStr(reqMsg)).addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { log.error("请求发送失败:{},移除回调", ex.getMessage()); ResponseHandler.removeCallback(reqId); } @Override public void onSuccess(Object result) { // log.info("请求发送成功:{},等待回调", result.toString()); } }); if (Objects.isNull(ResponseHandler.getCallBack(reqId))){ throw ZidanApiException.create("500", "请求失败1", "request failed"); } else { return cf.get(); } }
4、调用端等待response的消费者代码:
@Slf4j public class ResponseConsumer { @KafkaListener(topics = KafkaConstant.KAFKA_RESPONSE_TOPIC, groupId = "clientA") public void consumer(ConsumerRecord<String, String> record) { JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class); String reqId = String.valueOf(msg.getHeaders().get("reqId")); String value = String.valueOf(msg.getData()); ResponseHandler.runCallback(reqId, value); //value值将被cf.get()获取到 } }
5、测试调用效果:
除了使用CompleteFuture.get()阻塞等待外,还可以通过将httpResponse传递到我们自己定义的callback的onSuccess()方法中,主动写出,这样就不需要借助CompleteFuture的阻塞了;这点在我们处理netty网络框架实施的需要自己主动写出的逻辑时,很实用;
如mqtt连接时,我们通过将ctx传递给callback的onSuccess()方法,就可以很方便地在获得验证结果返回后,给mqtt client端以响应!