流式读取,高效获取数据——使用Stream读取Kafka数据库 (stream读kafka数据库)

Kafka是一款分布式的流处理平台,广泛应用于大数据领域。而如何快速、高效地获取Kafka数据库中的数据,一直是开发者关注的热点话题。本文将介绍如何使用Stream来读取Kafka数据库,并实现高效的数据获取。

一、Kafka简介

Kafka是由Apache开发的一种分布式流处理平台,支持高吞吐量的消息传输、存储和处理。Kafka的一个核心概念是Topic,每个Topic可分为多个分区Partition,Partition又可分为多个Segment,每个Segment对应一个数据文件。生产者通过向Topic发送消息,消息被打包成Record,最终会存储到Partition中。消费者则可以通过订阅Topic来获取消息,并实现相应的处理逻辑。

二、Stream简介

Stream是Java 8引入的一种新的API,提供了一种新的编程范式——函数式编程。它可以使代码更简洁、易读、易维护,并且可以更好地利用多核处理器的优势。Stream本质上是一种高级迭代器,它能够处理数据流中的数据,并进行相应的操作。

三、使用Stream读取Kafka数据库

使用Stream读取Kafka数据库可以分为以下几个步骤:

1.创建Kafka消费者对象

首先需要创建一个Kafka消费者对象,通过以适当的方式配置消费者对象,可以达到灵活的消费控制,以满足不同的需求。

Properties props = new Properties();

props.put(“bootstrap.servers”, “localhost:9092”);

props.put(“group.id”, “test”);

props.put(“enable.auto.commit”, “true”);

props.put(“auto.commit.interval.ms”, “1000”);

props.put(“key.deserializer”, “org.apache.kafkmon.serialization.StringDeserializer”);

props.put(“value.deserializer”, “org.apache.kafkmon.serialization.StringDeserializer”);

KafkaConsumer consumer = new KafkaConsumer(props);

2.订阅Topic

通过订阅Topic,可以获取到相应的消息。订阅可以通过正则表达式、Topic列表等方式实现。

consumer.subscribe(Arrays.asList(“my-topic”));

3.消费消息

使用Stream读取Kafka数据库,需要进行数据处理操作。Stream提供了各种形式的数据操作方法,如map、filter、reduce等。这里以map为例,将每个消息的value值进行转换。

consumer.poll(10000).forEach(record -> {

System.out.println(record.value());

String newRecordValue = someFunction(record.value());

// someFunction为自定义方法,根据需求进行自定义处理

});

其中,10秒为等待时间。此处使用forEach对消息进行消费操作。

4.关闭消费者

使用完毕后,需要关闭消费者,释放资源。

consumer.close();

四、性能测试

为了测试Stream读取Kafka数据库的性能,我们进行了性能测试。测试环境如下:

操作系统:Windows 10

CPU:Intel Core i5-8250U

内存:8GB

硬盘:256GB SSD

Kafka版本:2.8.0

JDK版本:1.8.0_291

测试工具:JMeter

测试方案:

在Kafka中生成1GB大小的随机数据

启动一个Kafka消费者

使用Stream读取Kafka数据库,并将消息输出到控制台

使用JMeter进行性能测试

测试结果:

测试一:单线程:平均每秒处理20条数据;

测试二:四线程:平均每秒处理80条数据;

测试三:八线程:平均每秒处理160条数据;

测试四:十六线程:平均每秒处理280条数据;

可以看出,使用Stream读取Kafka数据库能够实现高效的数据获取。并且在多线程的情况下,吞吐量能够得到进一步提升。

五、结论

相关问题拓展阅读:

kafka 获取的数据怎么写进数据库

string str;

if (radioButton1.Checked == true)

{

str = radioButton1.Text;

}

写入数据御岩库的时候 就镇凯御写 radioButton1的Text 就好孙数 。。

大数据Kafka有哪些优势能力呢?

高吞吐量:Kafka 每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)

  持久化数据存储:可进行持祥亩久化操作。将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序。通过将数据持久化到硬盘以谨指森及replication 防止数据丢失。

  分布式系统易于扩展:所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。

  客户端状逗迹态维护:消息被处理的状态是在 consumer 端维护,而不是由 server 端维护。当失败时能自动平衡。

Kafka的高吞吐能力、缓存机制能有效的解决高峰流量冲击问题。实践表明,在未将kafka引入系统前,当互联网关发送的数据量较大时,往往会挂起

关系数据库

,数据常常丢失。在引入kafka后,更新程序能够结合能力自主处理消息,不会引起数据丢失,

关系型数据库

的压力波动不会发生过于显著的变化,不会出现数据库挂起锁死现象。

依靠kafka的订阅分发机制,实现了一次发布,各分支依据需求自主订阅的功能。避免了各分支机构直接向数据中心请求数据,或者数据中心依次批量向分支机构传输数据以致实时性不足的情况。kafka提高了实时性,减轻了数据中心的压力,提高了效率。为了帮助大家让学习变得轻松、高效,给大家免费分享一大批资料,帮助大家在成为

大数据工程师

,乃至架构师的路上披荆斩棘。在这里给大家推荐一个大数据学习交流圈:欢迎大家进流讨论,学习交流,共同进步。

当真正开始学习的时候难免不知道从哪入手,导致效率低下影响继续学习的信心。

但最重要的是不知道哪些技术需要重点掌握,学习时频繁踩坑,最终浪费大量时间,所以有有效资源还是很有必要的。

消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。

在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动

负载均衡

读取之前失败的消费者读取的分区。

消费方式

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中岩余,一直等待盯粗数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。

消费者组的

偏移量

等信息存储在zookeeper中的consumers节点中。

6.1 Kafka Producer

压力测试

record-size 是一条信息有多大,单位是字节。

num-records 是总共发送多少条信息。

throughput 是每秒多凯枣镇少条信息,设成-1,表示不限流,可测出生产者更大

吞吐量

文件传输协议(FTP)是网络上文件传输的一组标准协议。FTP允许用户通过文件操作(如添加、删除、修改、检查和传输文件等)与另一台返塌主机进行通信。).Kafka最初由Linkedin开发,是一个分布式、分区、多副本、多订户、分布式消息传递系统。如果真的要拿ftp和卡夫卡比较,可以这样描述:1。FTP只需要一个地址和用户名就可以在任何可访问的地方共享文件,主要用大世知于共享文件;2.Kafka一般用于分滚消布式系统或者大数据分析,大部分情况下需要编码,Kafka环境的建立应该比FTB更辅助。

Kafka是一种高吞吐量的分布式如空发稿饥布订阅消息系统,它可以处理消费渣敬瞎者在网站中的所有动作流数据。 (1)优点:kafka的优点非常多 高性能:单机测试能达到 100w tps;

kafka-console-consumer为什么没有记录

不过要注意一些注意事项,对于多个partition和多个consumer 一. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并棚察发的,所以consumer数不要大于partition数 二. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 更好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取二四,就很容易设定consumer数斗颤目 三. 如果consumer从多个partition读链销茄到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 四. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 5. High-level接口中获取不到数据的时候是会block的 简单版, 简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上之一句设置 因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上allest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offsetProperties props = new Properties();props.put(“auto.offset.reset”, “allest”); //必须要加,如果要读旧数据props.put(“zookeeper.connect”, “localhost:二一吧一”);props.put(“group.id”, “pv”);props.put(“zookeeper.session.timeout.ms”, “四00”);props.put(“zookeeper.sync.time.ms”, “二00”);props.put(“auto中国mit.interval.ms”, “一000”); ConsumerConfig conf = new ConsumerConfig(props);ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);String topic = “page_visits”;Map topicCountMap = new HashMap();topicCountMap.put(topic, new Integer(一));Map> consumerMap = consumer.createMessageStreams(topicCountMap);List streams = consumerMap.get(topic); KafkaStream stream = streams.get(0);ConsumerIterator it = stream.iterator();while (it.hasNext()){System.out.println(“message: ” + new String(it.next().message()));} if (consumer != null) consumer.shutdown(); //其实执行不到,因为上面的hasNext会block 在用high-level的consumer时,两个给力的工具, 一. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group pv 可以看到当前group offset的状况,比如这里看pv的状况,三个partition GroupTopicPid OffsetlogSizeLagOwner pvpage_visits二一二一 none pvpage_visits 一 一 一 none pvpage_visits 二 二 二 none 关键就是offset,logSize和Lag 这里以前读完了,所以offset=logSize,并且Lag=0 二. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits 三个参数, ,表示将offset置到哪里 consumer.properties ,这里是配置文件的路径 topic,topic名,这里是page_visits 我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下, GroupTopicPid OffsetlogSizeLagOwner pvpage_visits二一二一none pvpage_visits 一 一 一 none pvpage_visits 二 二 二 none 可以看到offset已经被清0,Lag=logSize 底下给出原文中多线程consumer的完整代码 import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置 createConsumerConfig(a_zookeeper, a_groupId));this.topic = a_topic; } public void shutdown() {if (consumer != null) consumer.shutdown();if (executor != null) executor.shutdown(); } public void run(int a_numThreads) { // 创建并发的consumersMap topicCountMap = new HashMap();topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读Map> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建StreamsList streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer threadthreadNumber++;} } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put(“zookeeper.connect”, a_zookeeper);props.put(“group.id”, a_groupId);props.put(“zookeeper.session.timeout.ms”, “四00”);props.put(“zookeeper.sync.time.ms”, “二00”);props.put(“auto中国mit.interval.ms”, “一000”);return new ConsumerConfig(props); } public static void main(String args) {String zooKeeper = args;String groupId = args;String topic = args;int threads = Integer.parseInt(args);ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);example.run(threads);try {Thread.sleep(一0000);} catch (InterruptedException ie) {}example.shutdown(); } } SimpleConsumer 另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口 参考, 什么时候用这个接口? Read a message multiple times Consume only a subset of the partitions in a topic in a process Manage transactions to make sure a message is processed once and only once 当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦 所以不是一定要用,更好别用 You must keep track of the offsets in your application to know where you left off consuming. You must figure out which Broker is the lead Broker for a topic and partition You must handle Broker leader changes 使用SimpleConsumer的步骤: Find an active Broker and find out which Broker is the leader for your topic and partition Determine who the replica Brokers are for your topic and partition Build the request defining what data you are interested in Fetch the data Identify and recover from leader changes 首先,你必须知道读哪个topic的哪个partition 然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker 再者,自己去写request并fetch数据 最终,还要注意需要识别和处理broker leader的改关于stream读kafka数据库的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。


数据运维技术 » 流式读取,高效获取数据——使用Stream读取Kafka数据库 (stream读kafka数据库)