Kafka流式数据管道:实现高效的数据库生成 (kafka生成数据库)

在现代企业中,数据是生命力。对数据进行收集、存储和管理对企业的成长至关重要。然而,为了保存大量的数据,企业需要使用高效的数据存储技术。MySQL和PostgreSQL等传统数据库仍然是许多企业的存储选择,它们在数据保存和查询方面具有很高的性能。不过,这些数据库并不是万能的,它们无法有效地处理流式数据。对于高度变化的数据流,利用传统数据库进行处理需要大量的计算资源,这在大型企业中是无法承受的。为了解决这个问题,Kafka流式数据管道被推出并广泛使用于各大企业。

Kafka是由LinkedIn开发的分布式流式数据平台,旨在为企业提供大规模、高效的数据收集、传输和存储能力。与传统数据库不同的是,使用Kafka的流式数据处理方式具有以下三个优点:

1.灵活的数据处理

许多传统数据库需要严格的数据结构和模式,这使得将数据存储和移动到数据库中变得困难。而Kafka与传统数据库不同,它提供了灵活的数据处理方式。在Kafka中,数据不需要预定义的模式,不需要推断数据类型。这使得企业更加容易处理未知结构的数据,并使得系统可以轻松地适应新的数据流。

2.高性能的数据批处理

与传统数据库不同,Kafka利用流数据处理方式,提供大型分布式消息队列,支持高并发的数据传输和功能复杂的数据处理。当企业需要针对海量数据进行批量处理时,Kafka的流数据处理在性能和效率上表现出色。Kafka还可以利用持久化存储,使得数据不会因任何失败情况丢失。

3.实时的数据处理

对于实时的数据流,传统数据库的处理方式通常是相对较慢的,处理速度无法与数据产生的速度相匹配。但是,Kafka的流处理机制使得企业能够在数据到达时立即进行处理,并快速地响应变化。这提高了企业对数据的密集实时监视和管理能力,对准确性和数据驱动的决策产生积极的影响。

Kafka的流式数据管道支持多种数据源,如数据生成器、传感器、网站浏览器、设备传感器和消息队列等。通过将数据从这些源中收集和处理,可以让系统产生更多的价值。此外,使用KafkaProcessors,可以实现异步和同步的流数据处理,实现数据的实时或定期处理。这使得企业可以根据实际情况来调整数据处理范围和速度,并实现更高效和灵活的数据协作。

在实际应用中,使用Kafka的流式数据管道可以从多个方面帮助企业。通过流数据处理机制,企业可以更好地监视和管理其数据。使用高性能的Kafka技术,企业可以更好地应对大数据量的处理需求。此外,使用流式数据管道实现的数据库生成,可以更好地支持企业内部的数据协作和数据共享。

对于需要高效处理流式数据的企业,使用Kafka的流式数据管道可以是一个非常有价值的解决方案。它提供了实时、高效、灵活和可扩展的数据处理方式,可以帮助企业提高其数据管理水平,进而提高整体业务竞争力。

相关问题拓展阅读:

c+Kafka实现mysql与redis数据同步

前言

上篇文章简单介绍c概念,本文结合常见的缓存业务去讲解c使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更操作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过c结合Kafka来实现mysql与redis之间的数据同步。

架构设计

通过上述结构设计图可以很清晰的知道用到的组件:MySQL、C、Kafka、ZooKeeper、Redis。

Kafka&Zookeeper搭建

首先在

官网

下载Kafka:

下载后解压文件夹,可以看到以下几个文件:

Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。

通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:

C搭建

c搭建具体可以参考上文,这里只讲解具体的参数配置:

找到/conf目录下的c.properties配置文件:

然后配置instance,找到/conf/example/instance.properties配置文件:

经过上述配置后,就可以启动c了。

测试

环境搭建完成后,就可以编写代码进行测试。

1、引入pom依赖

2、封装Redis工具类

在application.yml文件增加以下配置:

封装一个操作Redis的工具类:

3、创建MQ消费者进行同步

创建一个CBean对象进行接收:

最后就可以创建一个消费者CConsumer进行消费:

测试Mysql与Redis同步

mysql对应的表结构如下:

启动项目后,新增一条数据:

可以在控制台看到以下输出:

如果更新呢?试一下Update语句:

同样可以在控制台看到以下输出:

经过测试完全么有问题。

总结

既然c这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:c只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等; 尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用C+Kafka的方式来实现的。

你了解那些Kafka的核心概念吗

Kafka中的数据称为message,就类似于record和row。Message是以batches的形式写入Kafka,batch就是一组数据,他们被写入同一个topic和partition。 Message被写入topic,topic又被分成了partition。每个partition可以在不同的server上。

分批次写入消息是为了提高效率。

topic:主题,一个主题代表了一类消息,就像数据库中的表一样。

Partition:分区,一个主题有若干个分区,同一个主题的分区可以不分布在同一个机器上,单一主题中的分区有序,但是无法保证所有的分区有序。

Producer用来创造消息。默认情况下,producer不care往哪个partition中写,一个topic中message会被均匀的分配到partition中。通过message key,partitioner会生成这个key的hash并把message写到特定的partition中。

Consumer读取数据。一个consumer会subscribe到一个或多个topic下,并以message被produce的顺序读取。通过跟踪message offset,consumer记录哪些消息已经被消费过。每个message有一个独立的offset,对于每个partition,通过存储最后消费消息的offset在zookeeper或kafka中,consumer可以停止重启是不失去上次读取的位置。

Consumer组成了consumer group,group保证了每个partition只有一个成员进行消费。如果一个consumer失败,group中的consumer会rebalance partition。

一个kafka的server称为一个broker。一个partition在cluster中被归在一个broker下,这个broker被称为partition的leader。一个partition可以被assign到多个broker下,这样partition就会被复制。

Replica:副本,分为leader和follower,leader对外提供服务。

为什么要用kafka:多个生产者,多个消费者,磁盘存储,可拓展性高,高性能。

把partition从一个consumer分配到另一个consumer称为rebalance。Rebalance保证了consumer group的高可用和高拓展性。在rebalance过程中,consumer不消费消息。

offset:在partition中给message连续的id,用来识别每条消息。

Zookeeper的作用:在集群不同节点间建立coordination。同时,如果哪个节点失败,我们还可以通过zookeeper从之前committed offset中恢复因为zookeeper周期性的commit offset。如果kafka的cluster有什么更改,zookeeper会通知所有node这一更改比如增删broker或topic。

ISR:In-Sync Replicas, 是和leader同步的复制的分区,这些followers和leader有着相同的message。

QueueFullException:当producer以broker无法接受的速度发送消息是会出现,解决方案是增加broker的数量。

Retention Period: retention period 可以帮助保持所有published的消息并不在乎消息是否被消费。这些记录可以通过retention period的配置进行销毁来腾出一些空间。

多分区多副本的好处:kafka通过给topic指定多个分区分布在多个broker上,并发能力较好(负载均衡)。partition可以指定replica数,增加了消息存储的安全性,提高了容灾能力,不过也增加了存储空间。

Kafka(四)集群之kafka

在章节二(

)中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。

这里我准备了三台虚拟机:

192.168.184.134

192.168.184.135

192.168.184.136

每台机器部署一个zk和kafka。

上一章节中zk集群已经部署完毕。

在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:

在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:

以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。

在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。

下面这个是日志路径的配置

下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:

日志的保存时间:

以下是zookeeper的配置:

这里我们直接设置后台启动,三个节点都是如此:

这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:

解决这个问题只需要把/tmp/kafka-logs文件删除就好了。

看到日志出现这一句表明启动成功了:

下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:

我们在134节点创建一个topic:

查看topic列表:

在kafkatool中查看:

创建生产者:

创建消费者:

生成者发送消息:

消费者接收消息:

到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。

关于kafka生成数据库的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。


数据运维技术 » Kafka流式数据管道:实现高效的数据库生成 (kafka生成数据库)