简单易懂的Linux Kafka下载指南 (linux kafka 下载)

Apache Kafka是目前应用非常广泛的分布式消息系统,它主要由Scala语言编写,提供了高性能、高吞吐量、低延迟的消息处理能力,广泛应用于日志处理、数据分析、实时计算等场景。对于Linux用户来说,下载和启动Kafka并不是一件难事,本文将介绍Linux Kafka下载指南,以便用户快速下载和使用Kafka。

一、下载Kafka

在开始下载Kafka之前,需要先确认下载的版本是否适用于当前操作系统的版本。Kafka官网提供了多个版本的下载链接,用户需要根据自己的需求选择合适的版本进行下载。本文以Kafka 2.8.0版本为例,演示下载的操作。

在终端中执行以下命令,下载Kafka二进制包:

“`

wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz

“`

等待下载完成后,使用以下命令解压文件:

“`

tar -xzf kafka_2.13-2.8.0.tgz

“`

二、启动Kafka

解压Kafka之后,进入Kafka目录,执行以下命令启动Kafka:

“`

cd kafka_2.13-2.8.0

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

“`

执行以上命令后,Kafka将会启动,并在终端输出相关的日志信息。此时,用户就可以通过Kafka提供的API对消息进行发送和消费。

三、常见问题解决

在使用Kafka过程中,常常会遇到一些问题,下面将介绍一些解决方法:

1. Kafka启动后无法连接

有时候,Kafka启动后可能会遇到无法连接的问题,这时候需要查看Kafka的日志信息。

打开终端,进入Kafka目录,执行以下命令:

“`

cd kafka_2.13-2.8.0

tl -f logs/server.log

“`

执行以上命令后,会实时输出Kafka的日志信息,这时候可以根据输出的日志信息,快速定位问题,并进行解决。

2. Kafka启动时提示端口被占用

在启动Kafka时,如果出现“Address already in use”的错误,说明Kafka所占用的端口已经被占用。这是因为在启动Kafka之前,可能已经有其他进程占用了Kafka所需的端口。

解决方法如下:

执行以下命令,查看当前所有进程所占用的端口:

“`

netstat -tlnp

“`

找到所占用的端口,并终止该进程:

“`

kill -9 进程ID

“`

然后再次启动Kafka即可。

3. Kafka启动后,无法发送和接收消息

在使用Kafka时,有时候可能会遇到无法发送和接收消息的问题,这时候需要检查Kafka的配置文件。

打开终端,进入Kafka目录,执行以下命令:

“`

cd kafka_2.13-2.8.0

vi config/server.properties

“`

进入该文件后,查找以下参数:

“`

listeners=PLNTEXT://:9092

“`

该参数是Kafka的监听地址和端口号。确认该参数是否正确,并根据需要进行修改。

四、结论

通过本文的介绍,相信读者已经了解了Linux Kafka下载指南。下载和启动Kafka非常简单,只需几个命令即可完成。但在使用Kafka时,还需要注意Kafka的配置和日志信息,以便快速解决问题。希望本文对于Linux用户下载和使用Kafka有所帮助。

相关问题拓展阅读:

深入理解kafka(五)日志存储

5.1文件目录布局

根目录下有以下5个checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint

分区目录下有以下目录: 0000.index(偏移量为64位长整形,长度固定为20位), 0000.log, 0000.timeindex.

还有可能包含.deleted .cleaned .swap等临时文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint

5.2日志格式演变

5.2.1 v0版本

kafka0.10.0之前

RECORD_OVERHEAD包括offset(8B)和message size(4B)

RECORD包括:

crc32(4B):crc32校验值

magic(1B):消息版本号0

attributes(1B):消息属性。低3位表示压缩类型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)

key length(4B):表示消息的key的长度。-1代表null

key: 可选

value length(4B):实际消息体的长度。-1代表null

value: 消息体。可以为空,如墓碑消息

5.2.2 v1版本

kafka0.10.0-0.11.0

比v0多了timestamp(8B)字段,表示消息的时间戳

attributes的第4位也被利用起来,0表示timestamp的类型为CreateTime,1表示timestamp的类型为LogAppendTime

timestamp类型由broker端参数log.message.timestamp.type来配置,默认为CreateTime,即采用生产者创建的时间戳

5.2.3 消息压缩

保证端到端的压缩,服务端配置compression.type,默认为”producer”,表示保留生产者使用的压缩方式,还可以配置为”gzip”,”snappy”,”lz4″

多条消息压缩至value字段,以提高压缩率

5.2.4 变长字段

变长整形(Varints):每一个字节都有一个位于更高位的m位(most significant bit),除了最后一个字节为1,其余都为0,字节倒序排列

为了使编码更加高效,Varints使用ZigZag编码:sint32对应 (n>31) sint64对应 (n>63)

5.2.5 v2版本

Record Batch

first offset:

length:

partition leader epoch:

magic:固定为2

attributes:两个字节。低3位表示压缩格式,第4位表示时间戳类型,第5位表示事务(0-非事务1-事务),第6位控制消息(0-非控制1控制)

first timestamp:

max timestamp:

producer id:

producer epoch:

first sequence:

records count:

v2版本的消息去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且弃用了attributes

Record

length:

attributes:弃用,但仍占据1B

timestamp delta:

offset delta:

headers:

5.3日志索引

稀疏索引(sparse index):每当写入一定量(broker端参数log.index.interval.bytes指定,默认为4096B),偏移量索引文件和时间索引文件分别对应一个索引项

日志段切分策略:

1.大小超过broker端参数log.segment.bytes配置的值,默认为(1GB)

2.当前日志段消息的更大时间戳与当前系统的时间戳差值大于log.roll.ms或者log.roll.hours,ms优先级高,默认log.roll.hours=168(7天)

3.索引文件或者时间戳索引文件的大小大于log.index.size.max.bytes配置的值,默认为(10MB)

4.偏移量差值(offset-baseOffset)>Integer.MAX_VALUE

5.3.1 偏移量索引

每个索引项占用8个字节,分为两个部分:1.relativeOffset相对偏移量(4B) 2.position物理地址(4B)

使用kafka-dump-log.sh脚本来解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:

bin/kafka-dump-log.sh –files /tmp/kafka-logs/topicId-0/00……00.index

如果broker端参数log.index.size.max.bytes不是8的倍数,内部会自动转换为8的倍数

5.3.2 时间戳索引

每个索引项占用12个字节,分为两个部分:1.timestamp当前日志分段的更大时间戳(12B) 2.relativeOffset时间戳对应的相对偏移量(4B)

如果broker端参数log.index.size.max.bytes不是12的倍数,内部会自动转换为12的倍数

5.4日志清理

日志清理策略可以控制到主题级别

5.4.1 日志删除

broker端参数log.cleanup.policy设置为delete(默认为delete)

检测周期broker端参数log.retention.check.interval.ms=300000(默认5分钟)

1.基于时间

broker端参数log.retention.hours,log.retention.minutes,log.retention.ms,优先级ms>minutes>hours

删除时先增加.delete后缀,延迟删除根据file.delete.delay.ms(默认60000)配置

2.基于日志大小

日志总大小为broker端参数log.retention.bytes(默认为-1,表示无穷大)

日志段大小为broker端参数log.segment.bytes(默认为,1GB)

3.基于日志起始偏移量

DeleteRecordRequest请求

1.KafkaAdminClient的deleteRecord()

2.kafka-delete-record.sh脚本

5.4.2 日志压缩

broker端参数log.cleanup.policy设置为compact,且log.cleaner.enable设置为true(默认为true)

5.5磁盘存储

相关测试:一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性写入600MB/s,随机写入100KB/s,随机内存写入400MB/s,线性内存3.6GB/s

5.5.1 页缓存

Linux操作系统的vm.dirty_background_ratio参数用来指定脏页数量达到系统的百分比之后就触发pdflush/flush/kdmflush,一般小于10,不建议为0

vm.dirty_ratio表示脏页百分比之后刷盘,但是阻塞新IO请求

kafka同样提供同步刷盘及间断性强制刷盘(fsync)功能,可以通过log.flush.interval.messages、log.flush.interval.ms等参数来控制

kafka不建议使用swap分区,vm.swappiness参数上限为100,下限为0,建议设置为1

5.5.2 磁盘I/O流程

一般磁盘IO的场景有以下4种:

1.用户调用标准C库进行IO操作,数据流为:应用程序Buffer->C库标准IOBuffer->文件系统也缓存->通过具体文件系统到磁盘

2.用户调用文件IO,数据流为:应用程序Buffer->文件系统也缓存->通过具体文件系统到磁盘

3.用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘

4.用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接读写磁盘

Linux系统中IO调度策略有4种:

1.NOOP:no operation

2.CFQ

3.DEADLINE

4.ANTICIPATORY

5.5.3 零拷贝

指数据直接从磁盘文件复制到网卡设备中,不需要经应用程序

对linux而言依赖于底层的sendfile()

对java而言,FileChannal.transferTo()的底层实现就是sendfile()

linux怎么从mq里面读取报文信息

在Linux中,可以通过以下方式从消息队列MQ中读取报文信息:

1. 使用自带的PN命令。如果使用的消息队列系统自带有消息查询命令,可以直接使用该命令查询消息队列中的报文。如RabbitMQ有rabbitmqctl list_queues等命令。

2. 使用消息队列的API。大多数消息队列系统都提供了客户端API,可以通过编写程序使用 API 读取消息队列中的消息。例如:

– RabbitMQ提供AMQP客户端API,可以使用Polyglot AMQP, librabbitmq等库调用API读取消息。

– Kafka提供Kafka客户端API,可以使用kafka-python, librdkafka等库调用API读取主题中的消息。

– ActiveMQ提供JMS API,可以使用JMS客户端如NMS读取消息。

使用消息队列的API是主流的读取MQ报文的方式。需要选择消息队列对应的客户端API,编写读取消息的程序。

3. 使用消息队列提供的管理工具。一些消息队列系统提供了并模图形化的管理控制台或工具,可以通过该工具查询和读取消息队列中的报文信息。如:

– RabbitMQ提供了RabbitMQ Management插件,可以通过Web UI查询消息队列信息。

– Kafka提供了Confluent Control Center等工具可以管理主题和消费消息。

– ActiveMQ提乎肆供了ActiveMQ Console可以管理消息和订阅者。

使用管理工具可以更直观简便地读取MQ中的报文信息。

4. 解析消息队列的数据存储。一些消息队列系统使用数据库等方式存储消息数据,通过解析其数据存储也可以读取报文信息。但这种方式较复杂,需要深入研究消息队列的内部实现,一般不推荐。

所以,读取Linux下MQ的报文信息,推荐的方式主要是:

1) 使用消息队列自带的命令行工具(如果有)

2) 调用消息队列提供的API,编写程序读取消息

3) 使用消息队列的管理控制台或图形化工具查询消息绝顷缓

这几种方式可以比较方便和标准地读取MQ中的报文信息。选择具体的方式需要根据使用的消息队列系统来决定。

linux kafka 下载的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于linux kafka 下载,简单易懂的Linux Kafka下载指南,深入理解kafka(五)日志存储,linux怎么从mq里面读取报文信息的信息别忘了在本站进行查找喔。


数据运维技术 » 简单易懂的Linux Kafka下载指南 (linux kafka 下载)