博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot整合SpringKafka实现生产者史上最简代码实现
阅读量:7286 次
发布时间:2019-06-30

本文共 4261 字,大约阅读时间需要 14 分钟。

该项目是使用的技术:SpringBoot  + SpringKafka + Maven

先看pom.xml文件中引入的依赖:

1 
2
4
4.0.0
5 6
com.xuebusi.producer
7
producer
8
0.0.1-SNAPSHOT
9
jar
10 11
springkafkaproducer
12
Demo project for Spring Boot
13 14
15
org.springframework.boot
16
spring-boot-starter-parent
17
1.5.8.RELEASE
18
19
20 21
22
UTF-8
23
UTF-8
24
1.8
25
26 27
28
29
org.springframework.boot
30
spring-boot-starter-freemarker
31
32
33
org.springframework.kafka
34
spring-kafka
35
1.0.6.RELEASE
36
37
38
org.springframework.boot
39
spring-boot-starter-web
40
41 42
43
org.springframework.boot
44
spring-boot-starter-test
45
test
46
47
48 49
50
51
52
org.springframework.boot
53
spring-boot-maven-plugin
54
55
56
57 58 59

注意:这里我使用的spring-kafka(它包装了apache的kafka-client)的依赖包版本是 1.0.6.RELEASE, 是因为我Linux服务器上部署的kafka服务器的版本是kafka_2.10-0.9.0.1,使用的kafka的时候要注意,kafka客户端(kafka-client)的版本要和kafka服务器的版本一一对应,否则,消息发送会失败。

Spring官方网站上给出了SpringKafka和kafka-client版本(它的版本号要和kafka服务器的版本保持一致)的对应关系:

https://projects.spring.io/spring-kafka/

 

下面是生产者的配置文件,既然使用的是SpringBoot,配置文件就是 application.yml

server:  port: 8081spring:  kafka:    producer:      bootstrap-servers: 192.168.71.11:9092,192.168.71.12:9092,192.168.71.13:9092

在上面的配置中,我们给生产者分配的端口号是8081,服务器有3台,分别对应3个ip地址和端口。

该配置只是配置了kafka服务器的ip地址,这里并没有对生产者做过多的配置。想了解关于kafka生产者相关的更多配置的话,可以自行查阅相关资料进行学习。

 

下面是kafka生产者的核心代码,实现了消息的发送逻辑:

package com.xuebusi.producer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import java.util.UUID;/** * 生产者 * 使用@EnableScheduling注解开启定时任务 */@Component@EnableSchedulingpublic class KafkaProducer {    @Autowired    private KafkaTemplate kafkaTemplate;    /**     * 定时任务     */    @Scheduled(cron = "00/1 * * * * ?")    public void send(){        String message = UUID.randomUUID().toString();        ListenableFuture future = kafkaTemplate.send("app_log", message);        future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));    }}

在上面的代码中,负责发送消息的角色就是SpringKafka提供的 KafkaTemplate对象,使用它的 send()方法就可以把自己的消息发送到kafka服务器。send()方法有多个重载的方法,大家可以根据自己的需要来使用不同的send()方法。

这里我们为了方便发送消息进行测试,使用了Spring的定时任务,在类上使用 @EnableScheduling 注解开启定时任务,在方法上使用@Scheduled注解并指定表达式来定义定时规则,这里我们每秒就会向kafka服务器发送一条消息。

当然,你可能不会用到Spring的定时任务,你可以把@EnableScheduling  和 @Scheduled注解去掉。你也可以通过使用Spring的@Controller和@RequestMapping 注解将你的发送消息的方法定义成一个接口。

你可以定义多个不同的方法,每个使用到了 KafkaTemplate 对象的方法都会是一个生产者。

 

下面就启动SpringBoot项目测试:

package com.xuebusi;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class SpringkafkaproducerApplication {    public static void main(String[] args) {        SpringApplication.run(SpringkafkaproducerApplication.class, args);    }}

使用鼠标右键运行main方法即可启动SpringBoot项目,在控制台你会看到成功连接到kafka服务器,并每隔1秒就会发送一条消息到kafka服务器。

 

转载地址:http://xopjm.baihongyu.com/

你可能感兴趣的文章
centos7 安装iftop
查看>>
CISCO之BGP配置
查看>>
python ConfigParser 模块
查看>>
如何通过Word 2010发布文章到博客
查看>>
JVM监控和查看
查看>>
$.ajax与$.post,$.get的区别
查看>>
Java开发者易犯错误Top10
查看>>
Xcode快捷键整理(陆续添加中)
查看>>
分布式系统的事务处理
查看>>
VC 双缓存技术+滚动条
查看>>
strtol详解
查看>>
mysql部分参数注解
查看>>
Powershell常用命令总结
查看>>
HAProxy+Keepalived实现Web服务器负载均衡
查看>>
apache动静态编译
查看>>
导出到Excal表格
查看>>
nginx Rewrite 规则
查看>>
周珍:浅析百度调整的几大猜想
查看>>
微软异想天开!居然想让电脑厂商为它生产VR眼镜
查看>>
Linux Mint和LMDE将开发新版
查看>>