使用rabbitmq实现数据异步处理:从数据库中获取数据。 (从rabbitmq取数据库)

使用RabbitMQ实现数据异步处理:从数据库中获取数据

随着互联网的普及和信息化的飞速发展,各行各业都离不开大数据和高效数据处理。而数据异步处理作为一种高效的数据处理方式,已被广泛应用于各种业务场景中。RabbitMQ作为业界的消息队列系统,被广泛应用于分布式系统中,能够快速,可靠地处理异步消息。本文将介绍如何使用RabbitMQ实现数据异步处理:从数据库中获取数据。

1. RabbitMQ架构

RabbitMQ是一个由AMQP协议实现的开源消息中间件,其架构主要由以下部分组成:

– Producer:消息生产者,用于发送消息到RabbitMQ队列。

– Consumer:消息消费者,用于从RabbitMQ队列中取出消息进行处理。

– Exchange:消息交换机,用于接收来自生产者的消息并根据消息的路由键把消息发送给指定队列。

– Queue:消息队列,用于存储消息,其和Exchange配合使用,生产者发布的消息首先要发送到Exchange,Exchange再根据路由键将消息发送到不同的队列中。

– Routing Key:路由键,由生产者定义,用于指定消息被放到哪个队列中。

2. 数据库中获取数据

在使用RabbitMQ进行数据异步处理的过程中,需要从数据库中取出数据。可以使用任何一种数据库,例如MySQL、Oracle、MSSQL等。通常情况下,使用ORM(对象关系映射)框架将数据对象映射成Java对象,从而方便地对数据进行操作。

以Spring Boot框架为例,以下是使用JPA获取数据库中数据的示例:

“`java

@Service

public class UserService {

@Autowired

private UserRepository userRepository;

public List getAllUsers() {

return userRepository.findAll();

}

}

“`

上述示例中,将JPA的UserRepository注入到UserService中,通过调用findAll()方法获取数据库中所有用户信息。

3. RabbitMQ实现数据异步处理

需要在应用中引入RabbitMQ的依赖:

“`xml

org.springframework.boot

spring-boot-starter-amqp

“`

然后在application.properties中配置RabbitMQ:

“`properties

spring.rabbitmq.host=localhost

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

“`

以上配置将RabbitMQ的地址,端口号,用户名及密码等信息配置到了应用中。

接下来,需要定义Exchange和Queue,并将队列绑定到交换机中:

“`java

@Configuration

public class RabbitConfig {

public static final String QUEUE_NAME = “user_queue”;

public static final String EXCHANGE_NAME = “user_exchange”;

public static final String ROUTING_KEY = “user_routing_key”;

@Bean

public Queue queue() {

return new Queue(QUEUE_NAME);

}

@Bean

public DirectExchange exchange() {

return new DirectExchange(EXCHANGE_NAME);

}

@Bean

public Binding binding(Queue queue, DirectExchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);

}

}

“`

以上代码中,定义了一个名为user_queue的队列,一个名为user_exchange的交换机,并将user_queue绑定到了user_exchange中,绑定的路由键为user_routing_key。

在UserService中,通过调用getAllUsers()方法获取数据库中所有用户信息并发送到RabbitMQ队列中:

“`java

@Service

public class UserService {

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private Queue queue;

public void sendAllUsersToQueue() {

List users = userRepository.findAll();

for(User user : users) {

rabbitTemplate.convertAndSend(queue.getName(), user);

}

}

}

“`

以上代码中,使用了Spring Boot的RabbitTemplate发送消息到队列中。其中,queue.getName()获取了上述定义的队列名。

最后需要在应用中编写消息消费者,处理从队列中取出的消息:

“`java

@Component

public class UserConsumer {

@RabbitListener(queues = RabbitConfig.QUEUE_NAME)

public void receiveMessage(User user) {

// 处理消息

}

}

“`

以上代码定义了一个UserConsumer类,使用@RabbitListener注解指定了需要监听的队列名,当队列中有消息到达时,会自动调用receiveMessage方法进行处理。

相关问题拓展阅读:

厉害!一文了解消息中间件-RabbitMQ

RabbitMQ是2023年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消肆态哗息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。

1.消息 就是数据,增删改查的数据。例如在员工管理系统中增删改查闭磨的数据

2.队列 指的是一端进数据一端出数据,例如C#中(Queue数据结构)

1.消息队列指:一端进消息,一端出消息

2.RabbitMQ就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。

1.在常见的单体架构中,主要流程是用户UI操作发起Http请求>服务器处理>然后由服务器直接和数据库交互,最后同步反馈用户结果

2.在微服务架构中,UI与微服务通信,主要是通过Http或者gRPC同步通信

问题分析

在上述2种情况下,我们发现在UI请求时都是同步操作 ,第2种架构虽然将整体服务按业务拆分成不同的微服务并且对应各自的数据库,但是在用户与微服务通信时,存在的问题依然没有解决,例如数据库的承载能力只能处理10w个请求,如果遇到高并况下,UI发起50w请求,那数据库是远远承载不了的,从而导致如下问题。

1.高并发请求导致系统性能下降响应慢,同时数据库承载风险加大

2.扩展性不强UI操作的交互对业务的依赖较大,导致用户体验下降

3.瞬时流量涌入巨大的话,服务器可能直接挂了

解决方案

RabbitMQ的优势

RabbitMQ的不足裂行

1.ConnectionFactory 为Connection的制造工厂。

2.Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

3.Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

4.Exchange(交换机) 我们通常认为生产者将消息投递到Queue中,实际上实际的情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者丢弃),而在RabbitMQ中的Exchange一共有4种策略,分别为:fanout(扇形)、direct(直连)、topic(主题)、headers(头部)

1.下载RabbitMQ

2.运行环境erlang

3.安装完成之后,加载RabbitMQ管理插件

4.安装成功访问RabbitMQ管理后台

1.分别创建考勤服务,请假服务,计算薪酬服务,邮件服务,短信服务消费者角色

2.创建员工管理网站用于模拟前端调用,主要充当生产者角色

3.在员工管理网站和每一个模拟微服务中通过nuget引入RabbitMQ.Client

4.在员工管理网站中创建模拟添加考勤的控制器并加入生产者代码

5.在考勤微服务中创建接口,并在接口中加入消费者代码

fanout类型的Exchange路由规则非常简单,工作方式类似于多播一对多,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

业务实例

当我们有员工需要请假,在员工管理系统提交请假,但是由于公司规定普通员工请假,需要发送短信到他的主管领导,针对此业务场景我们需要调用请假服务的同时去发送短信,这时需要两个消费者(请假服务,短信服务)来消费同一条消息,其实本质就是往RabbitMQ写入一个能被多个消费者接收的消息,所以可以使用 扇形交换机,一个生产者,多个消费者.

生产者模拟使用调用控制器来实现

消费者实现IHostedService 接口创建一个监听主机

直接交换器,工作方式类似于单播一对一,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,缺陷是无法实现多生产者对一个消费者

当我们员工管理系统需要计算薪资并将结果以发送短信的方式告诉员工,这个时候我们就不太适合用“扇形交换机”了,因为换做是你,你也不想你的工资全公司都知道吧?这个时候就需要定制了一对一的场景了,那就在生产消息时使用直连交换机根据routingKey发送指定的消费者.

生产者模拟使用调用控制器来实现

消费者实现IHostedService 接口创建一个监听主机

Exchange绑定队列需要制定Key; Key 可以有自己的规则;Key可以有占位符;

或者# ,

匹配一个单词、#匹配多个单词,在Direct基础上加上模糊匹配;多生产者一个消费者,可以多对对,也可以多对1, 真实项目当中,使用主题交换机。可以满足所有场景

1.生产者定义Exchange,然后不同的routingKey绑定

3.消费者routingKey的模糊匹配,生产者发送消息时routingKey定义以s.开头, * 号只能匹配的routingKey为一级,例如(s.A)或(s.B)的发送的消息,# 能够匹配的routingKey为一级及多级以上 ,例如 (s.A)或者(s.A.QWE.IOP)

在月底的时候我们需要把员工存在异常考勤信息,薪资结算信息,请假信息分别以邮件的形式发送给我们的员工查阅,我们知道这是一个典型的多个生产者,一个消费者场景,异常考勤信息,薪资结算信息,请假信息分别需要生产消息发送到RabbitMQ,然后供我们员工消费

分别模拟3个生产者:异常考勤信息,薪资结算信息,请假信息

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。

1.不需要依赖Key

2.更多的时候,像这种Key Value 的键值,可能会存储在数据库中,那么我们就可以定义一个动态规则来拼装这个Key value ,从而达到消息灵活转发到不同的队列中去

我们根据上面的业务和代码简单实现了由生产者到消费者的一个业务流程,我们可以总结出知道,整个消息的收发过程包含有三个角色,生产者(员工管理网站)、RabbitMQ(Broker)、消费者(微服务),在理想状态下,按照这样实现,整个流程以及系统的稳定性,可能不会发生太大的问题,但是真正在实际应用中我们要去思考可能存在的问题,主要从三个大的方面去分析,然后发散。

1.生产端

2.存储端

3.消费端

我们在给RabbitMQ发送消息时,如何去保证消息一定到达呢,我们可以使用RabbitMQ提供了2种生产端的消息确认机制

我们生产端给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失,如何解决消息丢失问题,针对RabbitMQ消息丢失,我们可以在生产者中使用

1.持久化消息

2.集群

当生产者写入消息到RabbitMQ后,消费服务接收消息期间,服务器宕机,导致消息丢失了,这个时候我们就应该使用RabbitMQ的消费端消息确认机制

1.自动确认

2.手动确认

消费者收到消息。消费者发送确认消息给rabbitmq期间。执行业务逻辑失败了,但是消息已经确认被消费了,我们应该在我们的消费者接收消息回调执行业务逻辑后面,执行使用手动确认消息机制,保证消息不被丢失

从rabbitmq取数据库的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于从rabbitmq取数据库,使用rabbitmq实现数据异步处理:从数据库中获取数据。,厉害!一文了解消息中间件-RabbitMQ的信息别忘了在本站进行查找喔。


数据运维技术 » 使用rabbitmq实现数据异步处理:从数据库中获取数据。 (从rabbitmq取数据库)