订阅Kafka日志

          PALO

          订阅Kafka日志

          用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。

          Doris 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once 消费语义。

          准备工作

          请提前构建Kafka服务,可以选择开通百度消息服务(BMS)或自建Kafka服务。

          开通百度消息服务

          百度消息服务(BMS)基于 Kafka 在百度智能云提供托管服务,请先按照以下流程开通服务。

          1. 请根据 BMS 快速入门 文档开通消息服务
          2. 下载证书压缩包 kafka-key.zip 并解压,解压后将得到以下文件

            • ca.pem
            • client.key
            • client.keystore.jks
            • client.pem
            • client.properties
            • client.truststore.jks
          3. 上传证书文件到 HTTP 服务器。

            因为后续 Doris 需要从某个 HTTP 服务器上下载这些整数以供访问 Kafka。因此我们需要先将这些证书上传到 HTTP 服务器。这个 HTTP 服务器必须要能够被 Doris 的 Leader Node 节点所访问。

            如果您没有合适的 HTTP 服务器,可以参照以下方式借助百度对象存储(BOS)来完成:

            1. 根据 开始使用创建Bucket 文档开通BOS服务并创建一个 Bucket。注意,Bucket所在地域必须和 Doris 集群所在地域相同
            2. 将以下三个文件上传到 Bucket

              • ca.pem
              • client.key
              • client.pem
            3. 在 BOS Bucket 文件列表页面,点击文件右侧的 文件信息,可以获取 HTTP 访问连接。请将 连接有效时间 设为 -1,即永久。

              注:请不要使用带有 cdn 加速的 http 下载地址。这个地址某些情况无法被 Doris 访问。

          自建 Kafka 服务

          如果使用自建 Kafka 服务,请确保 Kafka 服务和 Doris 集群在同一个 VPC 内,并且相互之间的网络能够互通。

          订阅 Kafka 消息

          订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。

          用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。

          请注意以下使用限制:

          1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
          2. 支持的消息格式如下:

            • csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
            • Json 格式,详见 导入 Json 格式数据
          3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

          访问 SSL 认证的 Kafka 集群

          例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。

          访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE 命令上传到 Plao 中,并且 catalog 名称为 kafkaCREATE FILE 命令的具体帮助可以参见 CREATE FILE 命令手册。这里给出示例:

          • 上传文件

            CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
            CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
            CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");

          上传完成后,可以通过 SHOW FILES 命令查看已上传的文件。

          创建例行导入作业

          创建例行导入任务的具体命令,请参阅 ROUTINE LOAD 命令手册。这里给出示例:

          1. 访问无认证的 Kafka 集群

            CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
            COLUMNS TERMINATED BY ","
            PROPERTIES
            (
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "property.group.id" = "xxx",
                "property.client.id" = "xxx",
                "property.kafka_default_offsets" = "OFFSET_BEGINNING"
            );
            • max_batch_interval/max_batch_rows/max_batch_size 用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。
          2. 访问 SSL 认证的 Kafka 集群

            CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
            COLUMNS TERMINATED BY ",",
            PROPERTIES
            (
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
            )
            FROM KAFKA
            (
               "kafka_broker_list"= "broker1:9091,broker2:9091",
               "kafka_topic" = "my_topic",
               "property.security.protocol" = "ssl",
               "property.ssl.ca.location" = "FILE:ca.pem",
               "property.ssl.certificate.location" = "FILE:client.pem",
               "property.ssl.key.location" = "FILE:client.key",
               "property.ssl.key.password" = "abcdefg"
            );
            • 对于百度消息服务,property.ssl.key.password 属性可以在 client.properties 文件中获取。

          查看导入作业状态

          查看作业状态的具体命令和示例请参阅 SHOW ROUTINE LOAD 命令文档。

          查看某个作业的任务运行状态的具体命令和示例请参阅 SHOW ROUTINE LOAD TASK 命令文档。

          只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

          修改作业属性

          用户可以修改已经创建的作业的部分属性。具体说明请参阅 ALTER ROUTINE LOAD 命令手册。

          作业控制

          用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。

          具体命令请参阅 STOP ROUTINE LOADPAUSE ROUTINE LOADRESUME ROUTINE LOAD 命令文档。

          更多帮助

          关于 ROUTINE LOAD 的更多详细语法和最佳实践,请参阅 ROUTINE LOAD 命令手册。

          上一篇
          导入BOS中的数据
          下一篇
          使用JDBC同步数据