SpringCloudStream-Kafka的学习

包括以下内容:

kafka的分组消费,并发消费,消费过滤,partition分区的配置,死信队列,异常处理,消费重试

/**
 * 消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。
 * 消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
 * 要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。
 * <p>
 * 对于消费队列的消费者,会有两种消费模式:集群消费(Clustering)和广播消费(Broadcasting)。
 * <p>
 * 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息,同一个分组下的多个实例会只有一个消费者消费,容灾
 * 广播消费(Broadcasting):广播消费模式下,不相同 Consumer Group 的每个 Consumer 实例都接收全量的消息,
 * 同一个topic下的不同分组下的实例,都会消费,可以解耦,场景如用户注册成功后发同一个topic,不同的分组消费者执行不同的逻辑
 * <p>
 * kafka的并发消费
 * 需要先给topic设置多分区  bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 10 --topic stream-demo
 * 然后修改配置 concurrency: 2 # 每个 Consumer 消费线程数的初始大小,默认为 1
 * 假设我们创建10个分区
 * 则两个消费者 即两个线程分别分到5个partition 各自消费
 * <p>
 * 参考:http://www.iocoder.cn/Spring-Cloud/Kafka/
  */
public interface MySource {
    String myOutput = "myOutput";   // 管道名称为"myOutput"
    String input = "input";   // 接收管道名称为"input"
    String input2 = "input2";   // 接收管道名称为"input"

    @Output(myOutput)
    MessageChannel myOutput();

    @Input(input)
    SubscribableChannel receive();

    @Input(input2)
    SubscribableChannel receive2();

}
//绑定自定义通道
@EnableBinding(MySource.class)
public class MySendService {

    @Autowired
    private MySource source;

    public void sendMsg(String msg) {
        source.myOutput().send(MessageBuilder.withPayload(msg).setHeader("flag","cq").build());
    }
}
@Slf4j
@Component
@EnableBinding(value = MySource.class)
public class RecevieInputService {
    // condition 消费过滤
    @StreamListener(value = MySource.input, condition = "headers['flag'] == 'cq'")
    public void onReceiver(byte[] msg) {
        System.out.println("消费者1收到消息:" + Thread.currentThread().getId() + " " + new String(msg));
        int value = 0;
        try {
            Thread.sleep(3000);
            value = Integer.parseInt(new String(msg));
            value = 10 / value;
            System.out.println("value:" + value);
        } catch (Exception e) {
            throw new RuntimeException("FAIL");
        }
    }
  
    @StreamListener(MySource.input2)
    public void onReceiver2(byte[] msg) {
        System.out.println("消费者2收到消息:" + new String(msg));
    }

    /**
     * 局部的异常处理:通过订阅指定错误 Channel
     * 在全局和局部异常处理都定义的情况下,错误消息仅会被符合条件的局部错误异常处理。
     * 如果没有符合条件的,错误消息才会被全局异常处理。
     * 如果异常处理方法成功,没有重新抛出异常,会认定为该消息被消费成功,就不会发到死信队列。
     *
     * @param errorMessage
     */
    @ServiceActivator(inputChannel = "stream-demo.group-1.errors")
    public void handleError(ErrorMessage errorMessage) {
        log.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
        log.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
        log.error("[handleError][headers:{}]", errorMessage.getHeaders());
    }

    /**
     * 全局的异常处理:通过订阅全局错误 Channel
     *
     * @param errorMessage
     */
    @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
    public void globalHandleError(ErrorMessage errorMessage) {
        log.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
        log.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
        log.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
    }

}
@Controller
@RequestMapping("/kafkamy")
public class MyProducerController {
    @Autowired
    private MySendService sendService;

    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        sendService.sendMsg(msg);
        return "ok";
    }

}
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 127.0.0.1:9092     # kafka地址
          zk-nodes: 127.0.0.1:2181    # zk地址 springboot2.0之后 可省略
          auto-create-topics: true
            # Kafka Binding 配置项,对应 KafkaBindingProperties 类
        bindings:
          input:
          # Kafka Consumer 配置项,对应 KafkaConsumerProperties 类
           consumer:
            enable-dlq: true # 是否开启死信队列,默认为 false 关闭
            dlq-name: error.stream-demo.group-1 # 死信队列名,默认为 `errors.{topicName}.{consumerGroup}`
      default-binder: kafka           #如果还有rabbitmq的话需要制定一个默认的
      bindings:
        output:                       #默认output
          destination: stream-demo    #消息发往的目的地
          content-type: text/plain    #消息发送格式
        myOutput:                     #自定义output
          destination: stream-demo    #消息发往的目的地
          content-type: text/plain    #消息发送格式
        input:                        #接收
          destination: stream-demo
          group: group-1
          consumer:
           max-attempts: 3 # 重试次数,默认为 3 次。
           back-off-initial-interval: 3000 # 重试间隔的初始值,单位毫秒,默认为 1000
           back-off-multiplier: 2.0 # 重试间隔的递乘系数,默认为 2.0
           back-off-max-interval: 10000 # 重试间隔的最大值,单位毫秒,默认为 10000
           enable-dlq: true # 是否开启死信队列,默认为 false 关闭
           concurrency:  2 # 每个 Consumer 消费线程数的初始大小,默认为 1
        input2:                        #接收
          destination: stream-demo
          group: group-2
最后修改:2020 年 10 月 27 日 08 : 26 PM