轻松学会Kafka数据库查询技巧 (如何查看kafka里的数据库)

Kafka是一个分布式消息传输系统,具有高可用性、扩展性和容错性,非常适合用于日志和流式处理。其作为消息队列,可以互联不同应用程序之间,实现应用程序之间的解耦,提高信息传输的效率和可靠性。在Kafka中,数据存储在一组组使用相同主题的分区中。对于Kafka的使用者来说,数据查询是一个常见的需求,本篇文章将介绍Kafka数据库查询的技巧。

一、了解Kafka数据库查询基本操作

在Kafka中,常见的数据库操作包括查询、插入、更新和删除等。相比其他数据库系统,Kafka的查询操作较为简单,主要是通过控制台工具kafka-console-consumer来实现。其基本命令格式如下:

./kafka-console-consumer.sh –bootstrap-server [broker地址] –topic [主题]

其中,[broker地址]表示Kafka集群的地址,[主题]表示需要查询的主题名称。如果需要查询某个特定的分区,可以使用–partition选项。

二、使用查询选项

控制台工具kafka-console-consumer支持多项查询选项,可以更加精确的查询所需数据。其中,一些常用的选项如下:

1. –from-beginning: 从最开始的数据开始读取,不仅可以查询当前最新数据,还能查询历史数据。

2. –skip-message-on-error: 在读出一个无法解析的数据时,跳过这条数据继续读取。

3. –max-messages: 查询指定数量的数据。

4. –timeout-ms: 查询指定时间内的数据,单位为毫秒。

5. –property: 其中一些扩展选项,例如–property print.key=true或者print.value=true,可以控制工具输出数据的键或值。

三、高级查询技巧

除了基本操作和查询选项外,Kafka也支持更加高级的查询技巧,以满足更复杂的需求。

1. 使用正则表达式: 在过滤数据时,可以使用正则表达式来精确匹配所需数据。例如,要查询主题_testtopic中包含数字的消息,可以使用以下命令:

./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic _testtopic –from-beginning –property print.key=true –property key.separator=”,” | grep “[0-9]”

2. 查询主题中的最新数据: 在某些情况下,我们只需要查询数据中的最新记录,可以使用以下命令:

./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic _testtopic –from-beginning –max-messages 1

3. 查询某个特定时间段的数据: 对于大型的日志数据库来说,查询整个数据集可能很花时间,如果我们只需要查询某个特定时间段,可以使用以下命令:

./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic _testtopic –from-beginning –property print.timestamp=true –property timestamp.format=yyyy-MM-dd\ HH:mm:ss.SSS –timeout-ms 10000 | awk -v start=”2023-01-01 00:00:00″ -v end=”2023-01-02 00:00:00″ ‘$NF >= start && $NF

在以上命令中,我们先将时间戳按字符串格式输出,并使用awk过滤所需的数据。

四、

Kafka作为一个高功率的分布式消息传输系统,具有高可用性、扩展性和容错性,在日志和流式处理中被广泛使用。本篇文章介绍了对于Kafka用户来说基本的Kafka数据库查询操作,以及进阶的操作技巧,可以让用户更加灵活的查询所需数据。

相关问题拓展阅读:

OpenTelemetry、Spring Cloud Sleuth、Kafka、Jager实现分布式跟踪

分布式跟踪可让您深入了解特定服务纤核在分布式软件系统中作为整体的一部分是如何执行的。它跟踪和记录从起点到目的地的请求以及它们经过的系统。

在本文中,我们将使用 OpenTelemetry、Spring Cloud Sleuth、Kafka 和 Jaeger 在三个 Spring Boot 微服务 中实现分布式跟踪。

我们先来看看分布式追踪中的一些基本术语。

跨度:表示系统内的单个工作单元。跨度可以相互嵌套以模拟工作的分解。例如,一个跨度可能正在调用一个 REST 端点,然后另一个子跨度可能是该端点调用另一个,等等在不同的服务中。

Trace:所有共享相同根跨度的跨度,或者更简单地说,将所有跨度创建为原始请求的直接结果。跨度的层次结构(每个跨度在根跨度旁边都有自己的父跨度)可用于形成有向无环图,显示请求在通过各种组件时的路径。

OpenTelemetry ,也简称为 OTel,是一个供应商中立的开源 Observability 框架,用于检测、生成、收集和导出遥测数据,例如 跟踪 、 指标 和 日志 。作为 云原生 计算基金会 (CNCF) 的孵化项目,OTel 旨在提供与供应商无关的统一库和 API 集——主要用于收集数据并将其传输到某处。OTel 正在成为生成和管理遥测数据的世界标准,并被广泛采用。

Sleuth 是一个由 Spring Cloud 团队管理和维护的项目,旨在将分布式跟踪功能集成到 Spring Boot 应用程序中。它作为一个典型Spring Starter的 . 以下是一些开箱即用的 Sleuth 工具:

Sleuth 添加了一个拦截器,以确保在请求中传递所有跟踪信息。每次调用时,都会毁丛掘创建一个新的 Span。它在收到响应后关闭。

Sleuth 能够跟踪您的请求和消息,以便您可以将该通信与相应的日志条目相关联。您还可以将跟踪信息导出到外部系统郑瞎以可视化延迟。

Jaeger 最初由 Uber 的团队构建,然后于 2023 年开源。它于 2023 年被接受为云原生孵化项目,并于 2023 年毕业。作为 CNCF 的一部分,Jaeger 是云原生 架构 中公认的项目。它的源代码主要是用 Go 编写的。Jaeger 的架构包括:

与 Jaeger 类似,Zipkin 在其架构中也提供了相同的组件集。尽管 Zipkin 是一个较老的项目,但 Jaeger 具有更现代和可扩展的设计。对于此示例,我们选择 Jaeger 作为后端。

让我们设计三个 Spring Boot 微服务:

这三个微服务旨在:

这是为了观察 OpenTelemetry 如何结合 Spring Cloud Sleuth 处理代码的自动检测以及生成和传输跟踪数据。上面的虚线捕获了微服务导出的跟踪数据的路径,通过OTLP(OpenTelemetry Protocol)传输到OpenTelemetry Collector,收集器依次处理并将跟踪数据导出到后端Jaeger进行存储和查询。

使用 monorepo,我们的项目结构如下:

第 1 步:添加 POM 依赖项

这是使用 OTel 和 Spring Cloud Sleuth 实现分布式跟踪的关键。我们的目标是不必手动检测我们的代码,因此我们依靠这些依赖项来完成它们设计的工作——自动检测我们的代码,除了跟踪实现、将遥测数据导出到 OTel 收集器等。

第 2 步:OpenTelemetry 配置

OpenTelemetry 收集器端点

对于每个微服务,我们需要在其中添加以下配置application.yml(请参阅下面部分中的示例片段)。spring.sleuth.otel.exporter.otlp.endpoint主要是配置OTel Collector端点。它告诉导出器,在我们的例子中是 Sleuth,通过 OTLP 将跟踪数据发送到指定的收集器端点 URL 来自otel-collector图像的 docker-compose 服务。

跟踪数据概率抽样

spring.sleuth.otel.config.trace-id-ratio-based属性定义了跟踪数据的采样概率。它根据提供给采样器的分数对一部分迹线进行采样。概率抽样允许 OpenTelemetry 跟踪用户通过使用随机抽样技术降低跨度收集成本。如果该比率小于 1.0,则某些迹线将不会被导出。对于此示例,我们将采样配置为 1.0、100%。

有关其他 OTel Spring Cloud Sleuth 属性,请参阅常见应用程序属性。

OpenTelemetry 配置文件

我们需要项目根目录下的 OTel 配置文件otel-config.yaml。内容如下。此配置文件定义了 OTel 接收器、处理器和导出器的行为。正如我们所看到的,我们定义了我们的接收器来监听 gRPC 和 HTTP,处理器使用批处理和导出器作为 jaeger 和日志记录。

第 3 步:docker-compose 将所有内容串在一起

让我们看看我们需要启动哪些 docker 容器来运行这三个微服务并观察它们的分布式跟踪,前三个微服务在上面的部分中进行了解释。

运行docker-compose up -d以调出所有九个容器:

第 4 步:追踪数据在行动

快乐之路

现在,让我们启动customer-service-bff流程的入口点,以创建新客户。

启动 Jaeger UI,

除了 Trace Timeline 视图(上面的屏幕截图),Jaeger 还提供了一个图形视图(Trace Graph在右上角的下拉菜单中选择):

三个微服务在 docker 中的日志输出显示相同的跟踪 id,以红色突出显示,并根据其应用程序名称显示不同的跨度 id(应用程序名称及其对应的跨度 id 以匹配的颜色突出显示)。在 的情况下customer-service,相同的 span id 从 REST API 请求传递到 Kafka 发布者请求。

customer-service让我们在 docker 中暂停我们的PostgreSQL 数据库,然后重复从customer-service-bff. 500 internal server error正如预期的那样,我们得到了。检查 Jaeger,我们看到以下跟踪,异常堆栈跟踪抱怨SocketTimeoutException,再次如预期的那样。

识别长期运行的跨度

Jaeger UI 允许我们搜索超过指定更大持续时间的跟踪。例如,我们可以搜索所有耗时超过 1000 毫秒的跟踪。然后,我们可以深入研究长期运行的跟踪以调查其根本原因。

在这个故事中,我们从 OpenTelemetry、Spring Cloud Sleuth 和 Jaeger 的角度解压了分布式跟踪,验证了 REST API 调用和 Kafka pub/sub 中分布式跟踪的自动检测。我希望这个故事能让你更好地理解这些跟踪框架和工具,尤其是 OpenTelemetry,以及它如何从根本上改变我们在 分布式系统 中进行可观察性的方式。

如何查看kafka里的数据库的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于如何查看kafka里的数据库,轻松学会Kafka数据库查询技巧,OpenTelemetry、Spring Cloud Sleuth、Kafka、Jager实现分布式跟踪的信息别忘了在本站进行查找喔。


数据运维技术 » 轻松学会Kafka数据库查询技巧 (如何查看kafka里的数据库)