利用中间件(Kafka)实现高性能异步非阻塞调用,替代Http阻塞调用

一、首先我们先测试一下,普通的阻塞调用的效果(调用10次):

  • 调用者端串行调用;

  • 被调用者端每个请求睡眠1秒;

这样的结果就是:调用者端每一次请求由于等不到响应,都会阻塞等待(Http请求是同步阻塞的),最终耗时略大于10秒;

1、调用者端serviceImpl方法:

@Override
public void testRest() {
    String targetMethod = "最终返回结果为:%s";  //模拟目标对象.方法()
    StopWatch stopWatch = new StopWatch("testRest");
    stopWatch.start();
    for (int i = 1; i <= 10; i++) {
        ResponseEntity<BaseResponse> resp = restTemplate.getForEntity("http://localhost:9000/resp/" + String.valueOf(i), BaseResponse.class);
        if (resp.getStatusCode().is2xxSuccessful()){
            String result = String.format(targetMethod, resp.getBody().getData());
            log.info(result);
        }
    }
    stopWatch.stop();
    log.info("总耗时:{}秒", stopWatch.getTotalTimeSeconds());
}

2、被调用者端Controller方法:

@RestController
public class ClientBController {
    @GetMapping("/resp/{origin}")
    public BaseResponse<String> test(@PathVariable("origin") String origin){
        try {
            TimeUnit.SECONDS.sleep(1); //模拟1秒处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return BaseResponse.success("TEST-REST-"+origin);

            }
}

3、最终调用后打印结果:

2023-05-09 11:11:06.406  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-1
2023-05-09 11:11:07.422  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-2
2023-05-09 11:11:08.442  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-3
2023-05-09 11:11:09.463  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-4
2023-05-09 11:11:10.483  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-5
2023-05-09 11:11:11.500  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-6
2023-05-09 11:11:12.519  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-7
2023-05-09 11:11:13.543  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-8
2023-05-09 11:11:14.548  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-9
2023-05-09 11:11:15.564  INFO ...ReqServiceImpl      : 最终返回结果为:TEST-REST-10
2023-05-09 11:11:15.564  INFO ...ReqServiceImpl      : 总耗时:10.3514294秒

同步阻塞调用效果不理想!


二、借助中间件Kafka将同步阻塞调用优化为异步非阻塞调用

1、调用者端Kafka的配置:

application.yml:

spring:
  kafka:
    bootstrap-servers: ${KAFKA_SERVERS:10.206.98.2:9092}
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33444432
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      #  value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

    consumer:
      enable-auto-commit: true
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1000

KafkaConfig类:

// 使用10个分区
@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic requestTopic(){
        return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1);
    }
}

2、调用者端serviceImpl方法:

ConcurrentHashMap<String, JiguiquanCallback<String>> pendingRequest = new ConcurrentHashMap<>();

public void testNonBlock() throws InterruptedException {
    String targetMethod = "最终返回结果为:%s";  //模拟目标对象.方法()
    StopWatch stopWatch = new StopWatch("testNonBlock");
    stopWatch.start();
    CountDownLatch countDownLatch = new CountDownLatch(10);
    for (int i = 1; i <= 10; i++) {
        String reqId = UUID.randomUUID().toString();
        JiguiquanCallback<String> callback = new JiguiquanCallback<String>() {
            @Override
            public void onSuccess(String msg) {
                String result = String.format(targetMethod, msg);
                log.info(result);
                countDownLatch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                log.error("请求失败{}", e.getMessage());
            }
        };
        pendingRequest.put(reqId, callback);

        JiguiquanKafkaMsg<String> reqMsg = new JiguiquanKafkaMsg<>();
        reqMsg.getHeaders().put("reqId", reqId);
        reqMsg.setData(String.valueOf(i));
        kafkaTemplate.send(KafkaConstant.KAFKA_REQUEST_TOPIC, String.valueOf(i), JSONUtil.toJsonStr(reqMsg)).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("请求发送失败:{},移除回调", ex.getMessage());
                pendingRequest.remove(reqId);
            }
            @Override
            public void onSuccess(Object result) {
//                    log.info("请求发送成功:{},等待回调", result.toString());
            }
        });
    }
    countDownLatch.await();
    stopWatch.stop();
    log.info("总耗时:{}秒", stopWatch.getTotalTimeSeconds());
}



@KafkaListener(topics = KafkaConstant.KAFKA_RESPONSE_TOPIC, groupId = "clientA")
public void consumer(ConsumerRecord<String, String> record) {
    JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class);
    String reqId = String.valueOf(msg.getHeaders().get("reqId"));
    String value = String.valueOf(msg.getData());
    JiguiquanCallback<String> callback = pendingRequest.get(reqId);
    callback.onSuccess(value);
}

3、其中我定义的回调类:

public interface JiguiquanCallback<T> {
    JiguiquanCallback<Void> EMPTY = new JiguiquanCallback<Void>() {
        @Override
        public void onSuccess(Void msg) {

        }

        @Override
        public void onError(Throwable e) {

        }
    };

    void onSuccess(T msg);

    void onError(Throwable e);
}

4、简单Kafka消息包装类:

@Data
public class JiguiquanKafkaMsg<T> {
    private Map<String, String> headers = new HashMap<>();
    private T data;
}

5、被调用者的Kafka配置:

application.yml:

spring:
  kafka:
    bootstrap-servers: ${KAFKA_SERVERS:10.206.98.2:9092}
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33444432
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      #  value-deserializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

    consumer:
      enable-auto-commit: true
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1000

KafkaConfig类:

// 使用10个分区
@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic requestTopic(){
        return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1);
    }
}

6、被调用者端的Kafka消费处理逻辑:

@Component
@Slf4j
public class RequestConsumer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = KafkaConstant.KAFKA_REQUEST_TOPIC, groupId = "clinetB", concurrency = "10")
    public void consumer(ConsumerRecord<String, String> record) {
        log.info("收到数据:{}", record);
        try {
            TimeUnit.SECONDS.sleep(1); //模拟1秒处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class);
        String reqId = String.valueOf(msg.getHeaders().get("reqId"));
        String value = String.valueOf(msg.getData());
        JiguiquanKafkaMsg respMsg = new JiguiquanKafkaMsg();
        respMsg.getHeaders().put("reqId", reqId);
        respMsg.setData("TEST-NONBLOCK" + value);
        kafkaTemplate.send(KafkaConstant.KAFKA_RESPONSE_TOPIC, JSONUtil.toJsonStr(respMsg));
    }
}

7、最终调用后打印结果:

2023-05-09 11:24:14.157  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK2
2023-05-09 11:24:14.158  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK3
2023-05-09 11:24:14.158  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK10
2023-05-09 11:24:14.158  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK8
2023-05-09 11:24:14.159  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK5
2023-05-09 11:24:14.159  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK1
2023-05-09 11:24:15.042  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK7
2023-05-09 11:24:15.042  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK4
2023-05-09 11:24:15.043  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK6
2023-05-09 11:24:16.055  - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK9
2023-05-09 11:24:16.056  - [nio-8000-exec-1] c.j.www.service.impl.ReqServiceImpl : 总耗时:3.6761809秒

8、分析:为什么不是1秒?

虽然效率提升了很多,但是我其实期望的执行时间为1秒,但是总耗时却是3秒?

那是因为,有一些消息被投递到了同一个kafka分区,而消费者在消费同一个分区的数据时,是顺序的,所以执行时间就变长了;

通过关差ClinetB的消费记录就可以知道:

1683610643109026.png

可以看到,partition=3的分区,被投放了3条消息!

我们的优化思路就是,让kafka的投递,尽可能地均匀,10条数据均匀投递到10个分区中;


三、自定义kafka分区策略,让消息投递更均匀,将执行时间优化至1秒

1、复习一下Kafka生产者的分区策略:

kafka使用的默认的分区器是:org.apache.kafka.clients.producer.internals.DefaultPartitioner

这个分区器由传递的参数决定具体的分区策略:

  • 当不指定分区key时 —— 黏性分区策略(2.4.0之前为轮询);

  • 指定分区key时 —— hash(key)%分区数;

  • 指定分区号 —— 全部投递到指定分区号上;

    可见,都不能保证完全均匀地分散到每一个分区上,我们要想实现绝对均匀,要么降低kafka版本,要么自定义分区器;

2、自定义分区器(最简单的实现,如果要绝对轮询,可以自己计数,这里不是重点)

public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return Integer.valueOf(String.valueOf(key)) % 10;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3、将自定义的分区器给Kafka安排上(ClientA):

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {

    @Bean
    public NewTopic requestTopic(){
        return new NewTopic(KafkaConstant.KAFKA_REQUEST_TOPIC, 10, (short)1);
    }

    @Bean
    public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties kafkaProperties) {
        Map<String, Object> properties = kafkaProperties.buildProducerProperties();
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomizePartitioner.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }
}

4、现在我们再来测试下效果:

2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:TEST-NONBLOCK3
2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK5
2023-05-09 13:47:30.241 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK8
2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK6
2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK2
2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK1
2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK9
2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK4
2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK7
2023-05-09 13:47:30.281 - [ntainer#0-0-C-1] ReqServiceImpl : 最终返回结果为:最终返回结果为:TEST-NONBLOCK10
2023-05-09 13:47:30.281 - [nio-8000-exec-2] c.j.www.service.impl.ReqServiceImpl  : 总耗时:1.0996998秒

通过ClientB的消费记录,可以看到,10条消息均匀地投递到了10个分区之上:

1683611477902831.png


四、如果前端Http调用ClinetA,等待返回结果怎么办?

我们从上面的代码中可以知道,我们自定义的回调类是没有返回值的,那是因为在多线程的知识点中,普通的runnable是没有返回值的;

如果需要返回值,我们得需要使用Future,而JDK8之后,为我们提供了更加好用的CompleteFuture,我们可以使用这个类,来等待返回结果,返回给前端!

究其根本,是前端调用后端的过程,还没有办法显示异步非阻塞调用;(不是说真没有,而是大部分公司的前端没有这种优化的概念)

1、我们创建一个回调管理类ResponseHandler:

public class ResponseHandler {
    private static ConcurrentHashMap<String, CompletableFuture<String>> mapping = new ConcurrentHashMap<>();

    public static void addCallback(String requestID, CompletableFuture<String> cf){
        mapping.putIfAbsent(requestID, cf);
    }

    public static CompletableFuture<String> getCallBack(String requestID){
        return mapping.get(requestID);
    }

    public static void runCallback(String requestID, String msg){
        CompletableFuture<String> cf = mapping.get(requestID);
        if (Objects.isNull(cf)){
            return; //可能并不是本集群节点发出的请求
        }
        cf.complete(msg);
        removeCallback(requestID);
    }

    public static void removeCallback(String requestID) {
        mapping.remove(requestID);
    }
}

2、controller中定义的接口:

@GetMapping("/test/{id}")
public BaseResponse<String> test(@PathVariable("id") String id){
    try {
        return BaseResponse.success(reqService.test(id));
    } catch (Exception e) {
        throw ZidanApiException.create("500", "请求失败2", "request failed");
    }
}

3、具体的serviceImpl方法实现:

public String test(String id) throws ExecutionException, InterruptedException {
    String reqId = UUID.randomUUID().toString();
    CompletableFuture<String> cf = new CompletableFuture<>();
    ResponseHandler.addCallback(reqId, cf);

    JiguiquanKafkaMsg<String> reqMsg = new JiguiquanKafkaMsg<>();
    reqMsg.getHeaders().put("reqId", reqId);
    reqMsg.setData(id);
    kafkaTemplate.send(KafkaConstant.KAFKA_REQUEST_TOPIC, JSONUtil.toJsonStr(reqMsg)).addCallback(new ListenableFutureCallback() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("请求发送失败:{},移除回调", ex.getMessage());
            ResponseHandler.removeCallback(reqId);
        }
        @Override
        public void onSuccess(Object result) {
//                log.info("请求发送成功:{},等待回调", result.toString());
        }
    });

    if (Objects.isNull(ResponseHandler.getCallBack(reqId))){
        throw ZidanApiException.create("500", "请求失败1", "request failed");
    } else {
        return cf.get();
    }
}

4、调用端等待response的消费者代码:

@Slf4j
public class ResponseConsumer {

    @KafkaListener(topics = KafkaConstant.KAFKA_RESPONSE_TOPIC, groupId = "clientA")
    public void consumer(ConsumerRecord<String, String> record) {
        JiguiquanKafkaMsg msg = JSONUtil.toBean(record.value(), JiguiquanKafkaMsg.class);
        String reqId = String.valueOf(msg.getHeaders().get("reqId"));
        String value = String.valueOf(msg.getData());
        ResponseHandler.runCallback(reqId, value);  //value值将被cf.get()获取到
    }
}

5、测试调用效果:

image.png

除了使用CompleteFuture.get()阻塞等待外,还可以通过将httpResponse传递到我们自己定义的callback的onSuccess()方法中,主动写出,这样就不需要借助CompleteFuture的阻塞了;这点在我们处理netty网络框架实施的需要自己主动写出的逻辑时,很实用;

如mqtt连接时,我们通过将ctx传递给callback的onSuccess()方法,就可以很方便地在获得验证结果返回后,给mqtt client端以响应!

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐