导入本地数据

          PALO

          导入本地数据

          Stream Load 用于将本地文件导入到 Doris 中。

          不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。

          该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

          • 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
          • 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。

          本文文档我们以 curl 命令为例演示如何进行数据导入。

          文档最后,我们给出一个使用 Java 导入数据的代码示例。

          导入数据

          Stream Load 的请求体如下:

          PUT /api/{db}/{table}/_stream_load
          1. 创建一张表

            通过 CREATE TABLE 命令创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:

            CREATE TABLE IF NOT EXISTS load_test
            (
                id INT,
                name VARCHAR(128)
            )
            DISTRIBUTED BY HASH(id) BUCKETS 8;
          2. 导入数据

            执行以下 curl 命令导入本地文件:

             curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example_db/load_test/_stream_load
            • user:passwd 为在 Doris 中创建的用户。初始用户为 admin,密码为创建 Doris 集群时设置的密码。
            • host:port 为 Compute Node 的 HTTP 协议端口,默认是 8040,可以在智能云 Doris 集群详情页面查看。
            • label: 可以在 Header 中指定 Label 唯一标识这个导入任务。

            关于 Stream Load 命令的更多高级操作,请参阅 Stream Load 命令文档。

          3. 等待导入结果

            Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

            {
                "TxnId": 1003,
                "Label": "example_label_1",
                "Status": "Success",
                "Message": "OK",
                "NumberTotalRows": 1000000,
                "NumberLoadedRows": 1000000,
                "NumberFilteredRows": 1,
                "NumberUnselectedRows": 0,
                "LoadBytes": 40888898,
                "LoadTimeMs": 2144,
                "BeginTxnTimeMs": 1,
                "StreamLoadPutTimeMs": 2,
                "ReadDataTimeMs": 325,
                "WriteDataTimeMs": 1933,
                "CommitAndPublishTimeMs": 106,
                "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
            }
            • Status 字段状态为 Success 即表示导入成功。
            • 其他字段的详细介绍,请参阅 Stream Load 命令文档。

          使用建议

          • Stream Load 只能导入本地文件。
          • 建议一个导入请求的数据量控制在 1 GB 以内。如果有大量本地文件,可以分批并发提交。

          Java 代码示例

          这里通过一个简单的 JAVA 示例来执行 Stream Load:

          (感谢 hf200012 提供示例)

          package demo.doris;
          
          import org.apache.commons.codec.binary.Base64;
          import org.apache.http.HttpHeaders;
          import org.apache.http.client.methods.CloseableHttpResponse;
          import org.apache.http.client.methods.HttpPut;
          import org.apache.http.entity.FileEntity;
          import org.apache.http.impl.client.CloseableHttpClient;
          import org.apache.http.impl.client.DefaultRedirectStrategy;
          import org.apache.http.impl.client.HttpClientBuilder;
          import org.apache.http.impl.client.HttpClients;
          import org.apache.http.util.EntityUtils;
          
          import java.io.File;
          import java.io.IOException;
          import java.nio.charset.StandardCharsets;
          
          /*
          这是一个 Doris Stream Load 示例,需要依赖
          <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
          </dependency>
           */
          public class DorisStreamLoader {
              // 1. 对于公有云公户,这里填写 Compute Node 地址以及 HTTP 协议访问端口(8040)。
              // 2. 对于开源用户,可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
              private final static String HOST = "your_host";
              private final static int PORT = 8040;
              private final static String DATABASE = "db1";   // 要导入的数据库
              private final static String TABLE = "tbl1";     // 要导入的表
              private final static String USER = "root";      // Doris 用户名
              private final static String PASSWD = "";        // Doris 密码
              private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径
          
              private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
                      HOST, PORT, DATABASE, TABLE);
          
              private final static HttpClientBuilder httpClientBuilder = HttpClients
                      .custom()
                      .setRedirectStrategy(new DefaultRedirectStrategy() {
                          @Override
                          protected boolean isRedirectable(String method) {
                              // 如果连接目标是 FE,则需要处理 307 redirect。
                              return true;
                          }
                      });
          
              public void load(File file) throws Exception {
                  try (CloseableHttpClient client = httpClientBuilder.build()) {
                      HttpPut put = new HttpPut(loadUrl);
                      put.setHeader(HttpHeaders.EXPECT, "100-continue");
                      put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
          
                      // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
                      put.setHeader("label","label1");
                      put.setHeader("column_separator",",");
          
                      // 设置导入文件。
                      // 这里也可以使用 StringEntity 来传输任意数据。
                      FileEntity entity = new FileEntity(file);
                      put.setEntity(entity);
          
                      try (CloseableHttpResponse response = client.execute(put)) {
                          String loadResult = "";
                          if (response.getEntity() != null) {
                              loadResult = EntityUtils.toString(response.getEntity());
                          }
          
                          final int statusCode = response.getStatusLine().getStatusCode();
                          if (statusCode != 200) {
                              throw new IOException(
                                      String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
                          }
          
                          System.out.println("Get load result: " + loadResult);
                      }
                  }
              }
          
              private String basicAuthHeader(String username, String password) {
                  final String tobeEncode = username + ":" + password;
                  byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
                  return "Basic " + new String(encoded);
              }
          
              public static void main(String[] args) throws Exception{
                  DorisStreamLoader loader = new DorisStreamLoader();
                  File file = new File(LOAD_FILE_NAME);
                  loader.load(file);
              }
          }
          上一篇
          导入总览
          下一篇
          导入BOS中的数据