news 2026/6/10 1:05:18

DorisStreamLoader工具类

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DorisStreamLoader工具类
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!-- SpringBoot 父工程 必须带,版本统一管理核心,版本号建议固定2.7.18 稳定版 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>demo-project</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo-project</name> <description>SpringBoot Project</description> <!-- JDK版本指定 1.8 --> <properties> <java.version>1.8</java.version> </properties> <!-- 代码所需全部依赖【都带版本号】 --> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.7.18</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.13.5</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.tomcat</groupId> <artifactId>tomcat-util</artifactId> <version>9.0.80</version> </dependency> </dependencies> <!-- SpringBoot打包插件,必须带,打包可执行jar包 --> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.7.18</version> </plugin> </plugins> </build> </project>
package com.example.demo.util; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.apache.tomcat.util.codec.binary.Base64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.*; /** * @author qushen * @create 2023/6/22 19:41:08 */ @Component public class DorisStreamLoader { private final Logger log = LoggerFactory.getLogger(DorisStreamLoader.class); /** * 用户名 */ @Value("${spring.datasource.doris.username}") private String user ; /** * 密码 */ @Value("${spring.datasource.doris.password}") private String password; /** * doris stream load url */ @Value("${spring.datasource.doris.loadUrl}") private String loadUrl ; private ObjectMapper objectMapper = new ObjectMapper(); { objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); objectMapper.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai")); objectMapper.configure(SerializationFeature.WRITE_ENUMS_USING_INDEX, true); } /** * 构建http客户端 */ final HttpClientBuilder httpClientBuilder = HttpClients .custom() .setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(String method) { return true; } }); /** * JSON格式的数据导入 * @param content String * @throws Exception Exception */ public void loadJson(String table,List<Map<String,Object>> datas,Map<String,String> defaultValue) throws Exception { try (CloseableHttpClient client = httpClientBuilder.build()) { String fullLoadUrl = String.format(loadUrl,table); HttpPut httpPut = new HttpPut(fullLoadUrl); httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH); httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING); httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); httpPut.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(user, password)); httpPut.setHeader("Content-Type", "application/json;charset=UTF-8"); // You can set stream load related properties in the Header, here we set label and column_separator. httpPut.setHeader("column_separator", ","); httpPut.setHeader("format", "json"); httpPut.setHeader("strip_outer_array", "true"); httpPut.setHeader("line_delimiter", "\\x02"); httpPut.setHeader("two_phase_commit", "false"); httpPut.setHeader("strict_mode", "true"); // Set up the import file. Here you can also use StringEntity to transfer arbitrary data. ArrayNode arrayNode = objectMapper.createArrayNode(); int[] se = new int[]{0,0}; for ( int i = 0;i < datas.size(); i++) { Map<String, Object> stringObjectMap = datas.get(i); ObjectNode objectNode = objectMapper.createObjectNode(); for (Map.Entry<String, Object> stringObjectEntry : stringObjectMap.entrySet()) { objectNode.putPOJO(stringObjectEntry.getKey().toLowerCase(),stringObjectEntry.getValue()); } objectNode.putPOJO("sjcrsj",new Date()); arrayNode.add(objectNode); if ((i+1)%5000 == 0) { sendData(client, httpPut,objectMapper.writeValueAsString(arrayNode),se); arrayNode.removeAll(); } } if (!arrayNode.isEmpty()) { sendData(client, httpPut,objectMapper.writeValueAsString(arrayNode),se); } log.info("数据总数:{},成功数:{},失败数:{}",datas.size(),se[0],se[1]); } } private void sendData(CloseableHttpClient client, HttpPut httpPut,String data,int[] se) throws IOException { StringEntity entity = new StringEntity(data, "UTF-8"); httpPut.setEntity(entity); httpPut.setHeader("label", UUID.randomUUID().toString()); try (CloseableHttpResponse response = client.execute(httpPut)) { String loadResult = ""; if (response.getEntity() != null) { loadResult = EntityUtils.toString(response.getEntity()); } final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200) { throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult)); } JsonNode jsonNode = objectMapper.readTree(loadResult); String status = jsonNode.get("Status").asText(); if ("Fail".equals(status)) { throw new IOException("导入doris失败:"+loadResult); } else if ("Label Already Exists".equals(status)) { } int numberTotalRows = jsonNode.get("NumberTotalRows").asInt(); int numberLoadedRows = jsonNode.get("NumberLoadedRows").asInt(); int numberFilteredRows = jsonNode.get("NumberFilteredRows").asInt(); se[0] += numberLoadedRows; se[1] += numberFilteredRows; } catch (Exception e) { log.error("写入doris失败",e); } } /** * 封装认证信息 * @param username String * @param password String * @return String */ private static String basicAuthHeader(String username, String password) { final String tobeEncode = username + ":" + password; byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } public static void main(String[] args) { System.out.println(basicAuthHeader("root", "W*!#7Z*N@sLXxs")); } public static void main1(String[] args) throws Exception { DorisStreamLoader dorisStreamLoader = new DorisStreamLoader(); dorisStreamLoader.loadUrl = "http://192.168.21.117:8030/api/sywsdb/%s/_stream_load"; dorisStreamLoader.user = "root"; dorisStreamLoader.password = "WYYTT951"; List<Map<String,Object>> maps = new ArrayList<>(); Map<String,Object> map = new HashMap<>(); map.put("clgj", ""); map.put("jyzid", "4433e13b-1bbb-11ee-ae7e-0242c0a80002"); map.put("jyzdqhdm", "652801"); map.put("jyzsl", 44.00); map.put("lx", "0"); map.put("jyzqqhdm", "652823"); map.put("jyzh", "6550351129"); map.put("dlx", "02"); map.put("dqhdm", "652823"); map.put("ddsl", 44.00); map.put("jyzlx", "1"); map.put("sm", ""); map.put("id", "4e5d72f1-a4fa-480c-85e9-40e19b517d0e"); map.put("jgryxm", "xxxxxx"); map.put("dmc", "xxxxxx"); map.put("sjcrsj", new Date(1691940454000L)); map.put("jgryid", "78e7a042-6908-45f8-999a-422ea4689b47"); map.put("rq",new Date(1691940454000L)); Map<String,Object> map1 = new HashMap<>(); map1.put("clgj", ""); map1.put("jyzid", "44335435-1bbb-11ee-ae7e-0242c0a80002"); map1.put("jyzdqhdm", "653121"); map1.put("jyzsl", 1.00); map1.put("lx", "0"); map1.put("jyzqqhdm", "653121"); map1.put("jyzh", "6550348683"); map1.put("dlx", "02"); map1.put("dqhdm", "653121"); map1.put("ddsl", 1.00); map1.put("jyzlx", "1"); map1.put("sm", ""); map1.put("id", "c2bce8d2-43f7-4c3c-9166-e7bba7ac1d2b"); map1.put("jgryxm", "xxxxxx"); map1.put("dmc", "xxxxxx"); map1.put("sjcrsj", new Date(1691940453000L)); map1.put("jgryid", "6059858a-483b-4e2b-a98f-27f423c3d886"); map1.put("rq",new Date(1688212304000L)); Map<String,Object> map2 = new HashMap<>(); map2.put("clgj", ""); map2.put("jyzid", "44340156-1bbb-11ee-ae7e-0242c0a80002"); map2.put("jyzdqhdm", "652926"); map2.put("jyzsl", 10000.00); map2.put("lx", "0"); map2.put("jyzqqhdm", "652926"); map2.put("jyzh", "6550351562"); map2.put("dlx", "02"); map2.put("dqhdm", "652926"); map2.put("ddsl", 10000.00); map2.put("jyzlx", "1"); map2.put("sm", ""); map2.put("id", "6cd3f741-b8c8-4343-a510-78a6be3b7adc"); map2.put("jgryxm", "xxxxxx"); map2.put("dmc", "xxxxxx"); map2.put("sjcrsj", new Date(1691940454000L)); map2.put("jgryid", "a56ffc0b-09d0-4e34-82e2-8457ccb0625b"); map2.put("rq",new Date(1688439693000L)); maps.add(map); maps.add(map1); maps.add(map2); dorisStreamLoader.loadJson("jyz_dd",maps,null); } }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 19:53:32

深度测评!本科生毕业论文必备的8个AI论文网站

深度测评&#xff01;本科生毕业论文必备的8个AI论文网站 2026年学术写作工具测评&#xff1a;为何需要一份精准的AI论文网站榜单 随着人工智能技术在学术领域的广泛应用&#xff0c;越来越多的本科生开始依赖AI工具辅助论文写作。然而&#xff0c;面对市场上种类繁多的平台&am…

作者头像 李华
网站建设 2026/6/9 17:42:09

C++ 是一门广泛应用于高性能计算、游戏开发、嵌入式系统和底层系统编程的语言

C 是一门广泛应用于高性能计算、游戏开发、嵌入式系统和底层系统编程的语言。其核心优势在于对内存的精细控制和接近硬件的操作能力。以下是围绕你提供的“核心学习路径”进行的详细解析与实战示例。1. 基础语法 变量与数据类型 int a 10; double b 3.14; char c A; bool fl…

作者头像 李华
网站建设 2026/6/9 17:45:47

如何通过单北斗形变监测提升水库的安全性?

单北斗形变监测技术在水库安全管理中展现出显著作用。通过高精度实时监测&#xff0c;能够及时掌握水库及周边的形变情况&#xff0c;确保可以在隐患发生的初期及时响应。这项技术的核心在于单北斗GNSS系统&#xff0c;具备稳定性和准确性&#xff0c;支持多点同步监测。针对复…

作者头像 李华
网站建设 2026/6/9 17:45:26

人工智能应用-机器视觉:车牌识别(5)

字符识别 一、分割识别方法 传统方法一般采用“先切割&#xff0c;再识别”的策略&#xff0c;即先对车牌图像进行字符分割&#xff0c;然后将每个字符输入一个分类器进行识别。例如&#xff0c;在下图 3.2.12 中&#xff0c;首先将车牌图片分成“渝”“A”“J”“I”“2”“2…

作者头像 李华
网站建设 2026/6/9 17:42:45

人工智能应用-机器视觉:车牌识别(6)

一、端到端序列识别方法 更先进的方法利用循环神经网络&#xff08;RNN&#xff09;的序列建模能力&#xff0c;不需要对字符进行逐一切割&#xff0c;而是对车牌图像中的字符串做整体识别。如下图 24.14所示&#xff1a;首先利用卷积神经网络&#xff08;CNN&#xff09;对输入…

作者头像 李华
网站建设 2026/6/9 17:45:22

ZYNQ MPSOC VCU介绍

关注、星标公众号&#xff0c;精彩内容每日送达 来源&#xff1a;网络素材1 什么是VCU?VCU 的全称是 Video Codec Unit&#xff0c;即视频编解码单元&#xff0c; Zynq UltraScale MPSoC 系列产品分为三种类型&#xff0c;分别是 CG 型器件、 EG 型器件和 EV 型器件&#xff0…

作者头像 李华