SpringCloudStream-Kafka的学习
包括以下内容:
kafka的分组消费,并发消费,消费过滤,partition分区的配置,死信队列,异常处理,消费重试
/**
* 消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。
* 消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
* 要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。
* <p>
* 对于消费队列的消费者,会有两种消费模式:集群消费(Clustering)和广播消费(Broadcasting)。
* <p>
* 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息,同一个分组下的多个实例会只有一个消费者消费,容灾
* 广播消费(Broadcasting):广播消费模式下,不相同 Consumer Group 的每个 Consumer 实例都接收全量的消息,
* 同一个topic下的不同分组下的实例,都会消费,可以解耦,场景如用户注册成功后发同一个topic,不同的分组消费者执行不同的逻辑
* <p>
* kafka的并发消费
* 需要先给topic设置多分区 bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 10 --topic stream-demo
* 然后修改配置 concurrency: 2 # 每个 Consumer 消费线程数的初始大小,默认为 1
* 假设我们创建10个分区
* 则两个消费者 即两个线程分别分到5个partition 各自消费
* <p>
* 参考:http://www.iocoder.cn/Spring-Cloud/Kafka/
*/
public interface MySource {
String myOutput = "myOutput"; // 管道名称为"myOutput"
String input = "input"; // 接收管道名称为"input"
String input2 = "input2"; // 接收管道名称为"input"
@Output(myOutput)
MessageChannel myOutput();
@Input(input)
SubscribableChannel receive();
@Input(input2)
SubscribableChannel receive2();
}
//绑定自定义通道
@EnableBinding(MySource.class)
public class MySendService {
@Autowired
private MySource source;
public void sendMsg(String msg) {
source.myOutput().send(MessageBuilder.withPayload(msg).setHeader("flag","cq").build());
}
}
@Slf4j
@Component
@EnableBinding(value = MySource.class)
public class RecevieInputService {
// condition 消费过滤
@StreamListener(value = MySource.input, condition = "headers['flag'] == 'cq'")
public void onReceiver(byte[] msg) {
System.out.println("消费者1收到消息:" + Thread.currentThread().getId() + " " + new String(msg));
int value = 0;
try {
Thread.sleep(3000);
value = Integer.parseInt(new String(msg));
value = 10 / value;
System.out.println("value:" + value);
} catch (Exception e) {
throw new RuntimeException("FAIL");
}
}
@StreamListener(MySource.input2)
public void onReceiver2(byte[] msg) {
System.out.println("消费者2收到消息:" + new String(msg));
}
/**
* 局部的异常处理:通过订阅指定错误 Channel
* 在全局和局部异常处理都定义的情况下,错误消息仅会被符合条件的局部错误异常处理。
* 如果没有符合条件的,错误消息才会被全局异常处理。
* 如果异常处理方法成功,没有重新抛出异常,会认定为该消息被消费成功,就不会发到死信队列。
*
* @param errorMessage
*/
@ServiceActivator(inputChannel = "stream-demo.group-1.errors")
public void handleError(ErrorMessage errorMessage) {
log.error("[handleError][payload:{}]", errorMessage.getPayload().getMessage());
log.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
log.error("[handleError][headers:{}]", errorMessage.getHeaders());
}
/**
* 全局的异常处理:通过订阅全局错误 Channel
*
* @param errorMessage
*/
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
log.error("[globalHandleError][payload:{}]", errorMessage.getPayload().getMessage());
log.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
log.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
}
}
@Controller
@RequestMapping("/kafkamy")
public class MyProducerController {
@Autowired
private MySendService sendService;
@RequestMapping("/send")
public String send(@RequestParam("msg") String msg) {
sendService.sendMsg(msg);
return "ok";
}
}
spring:
cloud:
stream:
kafka:
binder:
brokers: 127.0.0.1:9092 # kafka地址
zk-nodes: 127.0.0.1:2181 # zk地址 springboot2.0之后 可省略
auto-create-topics: true
# Kafka Binding 配置项,对应 KafkaBindingProperties 类
bindings:
input:
# Kafka Consumer 配置项,对应 KafkaConsumerProperties 类
consumer:
enable-dlq: true # 是否开启死信队列,默认为 false 关闭
dlq-name: error.stream-demo.group-1 # 死信队列名,默认为 `errors.{topicName}.{consumerGroup}`
default-binder: kafka #如果还有rabbitmq的话需要制定一个默认的
bindings:
output: #默认output
destination: stream-demo #消息发往的目的地
content-type: text/plain #消息发送格式
myOutput: #自定义output
destination: stream-demo #消息发往的目的地
content-type: text/plain #消息发送格式
input: #接收
destination: stream-demo
group: group-1
consumer:
max-attempts: 3 # 重试次数,默认为 3 次。
back-off-initial-interval: 3000 # 重试间隔的初始值,单位毫秒,默认为 1000
back-off-multiplier: 2.0 # 重试间隔的递乘系数,默认为 2.0
back-off-max-interval: 10000 # 重试间隔的最大值,单位毫秒,默认为 10000
enable-dlq: true # 是否开启死信队列,默认为 false 关闭
concurrency: 2 # 每个 Consumer 消费线程数的初始大小,默认为 1
input2: #接收
destination: stream-demo
group: group-2