1、 业务数据库的导入
先在 mysql 创建数据库 jrxd
之后将准备好的大数据sql导入mysql
传统通过navicat导入速度过慢时,可以使用mysql命令行进行导入,即先将sql上传至虚拟机
之后再使用mysql客户端中的source命令
source /opt/modules/jrxd-new.sql;
导入成功后再在navicat中查看
2、生成 hive 建表语句(需要准备hive集群)
采集数据:
在hive中创建一个数据库: create database finance;在hive创建对应obs层数据库可以,由于数据表过多,我们可以使用脚本:
思路:mysql 中的任意一个表,都在元数据中有表名,字段名,字段类型等信息,如果我们能够通过 python 获取到每一个表的元数据,就可以生成它的建表语句,并将语句生成 hive 的语法。
在mysql数据库,查询jrxd下的所有表名,复制到/root/tables.txt文件中 select table_name from information_schema.`TABLES` where table_schema = 'jrxd'可以得到数据(获得源数据所有表名,为下面创建所有对应表做准备)
channel_info com_manager_info dict_citys dict_product dict_provinces drawal_address drawal_apply drawal_companys loan_apply loan_apply_credit_report loan_apply_salary loan_credit repay_plan repay_plan_item repay_plan_item_his user_det user_md5 user_ocrlog user_quota users得到数据后在/home下创建tables.txt文件将表名存入,供后面脚本使用。
py脚本运行时须使用mysql连接依赖pymysql
由于本人用的集群内置py为python2 第一步:安装pip2服务 wget https://bootstrap.pypa.io/pip/2.7/get-pip.py python get-pip.py 第二步:安装pymysql pip2 install pymysql 补充一个知识:当你不知道一个命令在哪个软件中时 yum search all +命令的名字 举例:假如我们想使用ifconfig 查看ip yum search all ifconfig 查询到这个命令在 net-tools.x86_64 : Basic networking tools yum install -y net-tools.x86_64 若使用python3直接运行 pip install pymysqlpy脚本(用于查询数据库数据并生成建表语句,适用python2)(AutoCreateHiveSql.py)
# 给定一个数据库的名字和表的名字,自动生成hive的建表语句 import sys import pymysql def getDBData(dbName, tableName): # 查询mysql的元数据,根据数据库的名字和表的名字查询改表对应的字段和类型 # 连接数据库(指定charset避免中文乱码) conn = pymysql.connect( host='hadoop11', user='root', password='123456', database='information_schema', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor # 返回字典格式,更易读 ) cursor = conn.cursor() sql = """ SELECT column_name, data_type FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = %s AND table_name = %s ORDER BY ordinal_position # 按字段顺序排序 """ # 执行sql语句 cursor.execute(sql, (dbName, tableName)) result=cursor.fetchall() # 统一将字典的key转为小写,兼容大小写场景 result_lower = [] for item in result: lower_item = {k.lower(): v for k, v in item.items()} result_lower.append(lower_item) cursor.close() conn.close() return result_lower # 完善MySQL到Hive的类型映射(覆盖更多场景,符合生产规范) MYSQL_TO_HIVE_TYPE_MAPPING = { # 整数类型 'tinyint': 'TINYINT', 'smallint': 'SMALLINT', 'int': 'INT', 'integer': 'INT', 'bigint': 'BIGINT', 'bit': 'BOOLEAN', # 浮点/定点类型 'float': 'FLOAT', 'double': 'DOUBLE', 'decimal': 'DECIMAL', 'numeric': 'DECIMAL', # 字符串类型 'varchar': 'STRING', 'char': 'STRING', 'text': 'STRING', 'tinytext': 'STRING', 'mediumtext': 'STRING', 'longtext': 'STRING', 'enum': 'STRING', 'set': 'STRING', # 日期时间类型 'date': 'DATE', 'datetime': 'STRING', 'timestamp': 'STRING', 'time': 'STRING', 'year': 'INT', # 二进制类型 'binary': 'BINARY', 'varbinary': 'BINARY', 'blob': 'BINARY', 'tinyblob': 'BINARY', 'mediumblob': 'BINARY', 'longblob': 'BINARY', # 其他类型 'boolean': 'BOOLEAN', 'bool': 'BOOLEAN' } def generateHiveCreateSql(dbName, tableName,field_info): hive_columns = [] for tuple in field_info: column_name = tuple['column_name'] data_type = tuple['data_type'] # 根据mysql的字段类型获取hive对应的字段类型 hive_type=MYSQL_TO_HIVE_TYPE_MAPPING.get(data_type, "string") # 修复:去掉 f-string,兼容 Python2 hive_columns.append("%s %s" % (column_name, hive_type)) columns_str = ','.join(hive_columns) # 修复:多行字符串格式化兼容 Python2 sql = """ CREATE TABLE IF NOT EXISTS ods_jrxd_%s ( %s ) COMMENT '%s.%s' ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; """ % (tableName, columns_str, dbName, tableName) return sql if __name__ == '__main__': # 校验外部参数 if len(sys.argv) != 3: print("请传入数据库的名字和表的名字") sys.exit(1) dbName = sys.argv[1] tableName = sys.argv[2] print(dbName, tableName) field_info=getDBData(dbName, tableName) create_hive_table_sql=generateHiveCreateSql(dbName, tableName, field_info) print(create_hive_table_sql) with open("./hive_create_table.sql","a") as f: f.write(create_hive_table_sql)先单独测试该 py 是否可以使用:
python AutoCreateHiveSql.py jrxd channel_info若没问题,则使用shell 脚本批量生成:(datax_autosql_finance.sh)
#!/bin/bash while read x1 do python AutoCreateHiveSql.py jrxd $x1 done < /root/tables.txt执行:
./datax_autosql_finance.sh即可生成全部表
3、批量生成 datax 的 json 文件
编写python代码
第一步:能够通过python读取mysql的数据库的表
import pymysql def getDBData(dbName,tableName): db_connection = pymysql.connect( host="bigdata01", port=3306, user="root", password="123456", database='information_schema' ) cursor = db_connection.cursor() sql = f"select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = '{dbName}' and table_name = '{tableName}' order by ordinal_position" cursor.execute(sql) result = cursor.fetchall() cursor.close() db_connection.close() return result if __name__ == '__main__': result = getDBData("spark_project","oms_order") print(result)第二步:使用json.dump将文件保存起来
第三步:拼接json文件(完整代码)(AutoCreateJson.py)
# -*- coding: utf-8 -*- import json import sys import os import pymysql def getDBData(dbName, tableName): # 查询mysql的元数据,根据数据库的名字和表的名字查询改表对应的字段和类型 # 连接数据库(指定charset避免中文乱码) conn = pymysql.connect( host='hadoop11', user='root', password='123456', database='information_schema', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor # 返回字典格式,更易读 ) cursor = conn.cursor() sql = """ SELECT column_name, data_type FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = %s AND table_name = %s ORDER BY ordinal_position # 按字段顺序排序 """ # 执行sql语句 cursor.execute(sql, (dbName, tableName)) result = cursor.fetchall() cursor.close() conn.close() return result def getAllCloumnsName(result): cloumnName = map(lambda x:x['COLUMN_NAME'],result) list1 = list(cloumnName) return ",".join(list1) def getAllClumnsNameAndType(result): # 获取列的名字和类型 mappings = { 'bigint': 'bigint', 'varchar': 'string', 'int': 'int', 'datetime': 'string', 'text': 'string', 'decimal': 'string', 'date': 'string', 'timestamp': 'string', 'varbinary': 'Bytes', 'double': 'double', 'time': 'Date' } list2 = [] for x in result: item = { "name": x['COLUMN_NAME'], "type": mappings.get(x['DATA_TYPE'], 'string') } list2.append(item) return list2 if __name__ == '__main__': # 校验外部参数 if len(sys.argv) != 3: print("请传入数据库的名字和表的名字") sys.exit(1) dbName = sys.argv[1] tableName = sys.argv[2] result = getDBData(dbName, tableName) print(result) cloumn = getAllCloumnsName(result) print(cloumn) columnAndType = getAllClumnsNameAndType(result) # 拼接json 数据 —— 全部改为 Python2 兼容格式 query_sql = "select %s from %s" % (cloumn, tableName) jdbc_url = "jdbc:mysql://hadoop11:3306/%s" % dbName hdfs_path = "/user/hive/warehouse/finance.db/ods_jrxd_%s" % tableName file_name = tableName output_file = "./datax_json/%s.json" % tableName # 创建目录(Python2 兼容) if not os.path.exists("./datax_json"): os.makedirs("./datax_json") # 拼接 JSON 数据结构 jsonData = { "job": { "setting": { "speed": { "channel": 3 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123456", "connection": [ { "querySql": [ query_sql ], "jdbcUrl": [ jdbc_url ] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://hadoop11:8020", "fileType": "text", "path": hdfs_path, "fileName": file_name, "writeMode": "append", "column": columnAndType, "fieldDelimiter": "," } } } ] } } # 写入文件 with open(output_file, "w") as f: json.dump(jsonData, f) print("JSON 文件生成成功:%s" % output_file)运行脚本
python AutoCreateJson.py jrxd channel_info批量生成全部json(create_datax_json.sh)
#/bin/bash while read x1 do python AutoCreateJson.py jrxd $x1 done < /root/tables.txt执行
chmod 777 create_datax_json.sh 将AutoCreateJson.py 也放入到/home/scripts 创建一个文件夹 mkdir -p /home/scripts/datax_json 继续执行 脚本 ./create_datax_json.sh4、将mysql中全部数据迁移到hive
创建批量运行json脚本(run_all_datax.sh)
#!/bin/bash # 定义 JSON 文件所在目录 JSON_DIR="/home/datax_json" # 定义 datax.py 的路径(请根据你实际路径修改!) DATAX_PY="/opt/installs/datax/bin/datax.py" echo "========================================" echo "开始执行目录下所有 DataX 任务:$JSON_DIR" echo "========================================" # 循环遍历所有 .json 文件 for json_file in "$JSON_DIR"/*.json; do # 如果目录里没有 json 文件,跳过 [ -e "$json_file" ] || continue echo "" echo "====> 正在执行:$json_file" # 执行 DataX 命令 python "$DATAX_PY" "$json_file" done echo "========================================" echo "所有任务执行完成!" echo "========================================"执行脚本
chmod 777 run_all_datax.sh 之前已经测试过channel_info 这个json文件了,为了防止数据插入重复,将该json文件删除 执行脚本: ./run_all_datax.sh操作时须先搭建hive集群:
本文中集群为:hadoop11,hadoop12,hadoop13(集群机器名/ip)脚本中出现的hadoop11注意替换为自己机器的ip
且脚本文件全在/home路径下编写