大数据毕业设计及源码实战:从零构建可扩展的实时数据处理系统
摘要:许多学生在完成大数据毕业设计时面临技术栈选型混乱、缺乏真实业务场景、代码结构松散等问题,导致项目难以部署或演示。本文基于真实数据流场景,选用 Kafka + Flink + Doris 技术栈,完整实现一个端到端的实时日志分析系统,并提供结构清晰、注释完备的开源级源码。读者将掌握高内聚低耦合的模块设计、状态管理与 Exactly-Once 语义保障,显著提升系统可维护性与演示说服力。
一、毕设常见痛点:为什么“跑通”≠“能演示”
环境配置复杂
本地 Windows + IDEA 能跑,一到 CentOS 7 集群就缺包、缺 so 文件,甚至 Flink 版本与 Hadoop 冲突,现场答辩直接翻车。缺乏真实数据
用Random.nextLong()造数据,指标曲线过于平滑,老师一句“这数据是假的吧?”就让你语塞。代码不可复现
GitHub 丢一个 500 MB 的data.zip,路径全写死,README 只有一句“先跑 main”。评审老师打不开,直接扣分。模块耦合严重
采集、计算、可视化全写在一个main方法里,改一行日志格式,整个 Jar 重编,调试 10 分钟编译 5 分钟,心态炸裂。
二、技术选型:Spark vs Flink、HBase vs Doris 的权衡
| 维度 | Spark Structured Streaming | Flink |
|---|---|---|
| 延迟 | 秒级微批 | 毫秒级真流 |
| Exactly-Once | 支持,但需手动幂等 | 原生两阶段提交 |
| 状态 TTL API | 无内置,需自己删 | StateTtlConfig一行代码 |
| 本地调试 | 重,需整包 Spark | 轻,一个flink-clients即可 |
结论:毕设场景要“实时 + 可演示”,Flink 更稳。
| 维度 | HBase | Doris |
|---|---|---|
| 聚合查询 | 前缀扫描快,复杂 SQL 慢 | MPP 引擎,秒级 GROUP BY |
| 运维成本 | 需 HDFS + ZooKeeper | 只有 FE + BE,一键启停 |
| 字段扩列 | 需预分区,易热点 | 动态 Schema Change |
结论:Doris 一行CREATE TABLE就能跑通 OLAP,答辩现场写 SQL 给老师看,比解释 RegionServer 直观多了。
三、系统架构与数据流
- 日志文件 → FileBeat → Kafka Topic
ods_log - Flink 消费
ods_log,做 ETL & 窗口聚合 → Sink 到 Doris 表dws_log_agg - Doris 通过 JDBC 对接 Superset,3 分钟拖拽出 PV/UV 折线图
四、核心模块实现细节
4.1 数据采集:FileBeat 侧车模式
- 采用
container.inputs把业务容器内日志实时捞出,避免给业务系统加 SDK。 - 每条日志追加
pod_name、node_ip字段,方便后续定位热点 Pod。
4.2 流处理主 Job:Flink 1.17
以下代码为LogEtlJob核心片段,已删非关键代码,可直接复制到 IDE 跑通。
public class LogEtlJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 5s 一次 CK env.setStateBackend(new HashMapStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cp"); // 1. 读 Kafka KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("kafka:9092") .setTopics("ods_log") .setGroupId("flink-etl") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); SingleOutputStreamOperator<LogEvent> stream = env .fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") .map(new JsonToLogEventFunc()) // 解析 JSON .filter(Objects::nonNull); // 2. 10s 滚动窗口统计 PV/UV SingleOutputStreamOperator<Metric> agg = stream .keyBy(LogEvent::getProductId) .windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) .aggregate(new PvUvAggFunc(), new WindowAllFunction<Metric, Metric, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Metric> values, Collector<Metric> out) { Metric m = values.iterator().next(); m.setWindowStart(window.getStart()); m.setWindowEnd(window.getEnd()); out.collect(m); } }); // 3. Sink 到 Doris:幂等写入 DorisSink<Metric> sink = DorisSink.<Metric>builder() .setDorisOptions(DorisOptions.builder() .setFenodes("doris:8030") .setTableIdentifier("log_analyze.dws_log_agg") .build()) .setDorisExecutionOptions(DorisExecutionOptions.builder() .setBatchSize(1000) .setMaxRetries(3) .setStreamLoadProp(getStreamLoadProps()) // labelPrefix=jobId .build()) .setSerializer(new MetricDorisSerializer()) .build(); agg.sinkTo(sink).name("doris-sink"); env.execute("LogEtlJob"); } private static Properties getStreamLoadProps() { Properties p = new Properties(); p.put("format", "json"); p.put("strip_outer_array", "true"); p.put("label_prefix", "flink_" + System.currentTimeMillis()); return p; } }关键注释:
label_prefix保证同一批次重跑不会重复导入,实现 Doris 端的幂等。- 状态后端用
HashMapStateBackend,本地调试方便;上生产换RocksDBStateBackend并开启增量 CK。 - 窗口函数里把窗口起止时间写进
Metric,前端可直接拿来做折线 X 轴。
4.3 状态 TTL 配置
StateTtlConfig ttl = StateTtlConfig .newBuilder(Time.hours(24)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupFullSnapshot() .build();在PvUvAggFunc里对MapStateDescriptor追加.enableTimeToLive(ttl),防止 UV 的Set无限膨胀。
4.4 结果存储:Doris 表设计
CREATE TABLE dws_log_agg ( product_id BIGINT, pv BIGINT SUM, uv BIGINT SUM, window_start DATETIME, window_end DATETIME ) AGGREGATE KEY(product_id, window_start) DISTRIBUTED BY HASH(product_id) BUCKETS 10;- 用
AGGREGATE KEY模型,后续同批次重导自动覆盖,天然幂等。 - 分区可按天创建,历史数据通过
ALTER TABLE DROP PARTITION快速清理,节省 SSD。
五、性能测试与安全考量
吞吐量
3 台 16 U 32 G 虚拟机,Kafka 单分区 1.5 kB 日志,Flink 并发度 6,可稳吃 25 万条/秒,CPU 65%。延迟
端到端:日志产生 → Doris 可查,P99 2.3 s,满足“秒级实时”答辩口径。敏感字段脱敏
在JsonToLogEventFunc里加正则:uid=(\d{6})\d{4}替换为uid=$1****,Doris 端即使被导出也不泄露完整用户 ID。
六、生产环境避坑指南
依赖冲突
Flink 1.17 自带 Jackson 2.15,若业务 Jar 里引了 2.12,运行时出现NoSuchMethodError。解决:<scope>provided</scope>把冲突包全部剔除,或者使用flink-shaded-jackson。Checkpoint 失败
现象:Kafka 一次性拉 50 M 数据,网络抖动 10 s,CK 超时。调优:execution.checkpointing.timeout=20 min- 并发度调小,降低单次屏障数据量。
本地调试与集群差异
Windows 路径分隔符、时区、hostname 大小写都会导致IllegalStateException。建议:Docker-Compose 一键拉起 ZooKeeper、Kafka、Doris,Mac/Win 统一跑容器,答辩电脑也一样镜像,确保“所见即所得”。Doris Stream Load 返回
Label Already Exists
因为作业失败自动重试,但 label 没换。把jobId + subtaskIndex + checkpointId拼进 label,保证全局唯一。
七、源码结构与快速上手
仓库目录:
bigdata-graduation-project ├─ deploy/ # docker-compose & k8s yaml ├─ log-etl/ # Flink 主工程 │ ├─ src/main/scala # 若需 Scala 扩展 │ └─ src/main/java ├─ doris-ddl/ # 建表语句 ├─ superset-dashboard/ # 导出的图表 JSON └─ README.md # 一行命令跑通一键启动:
git clone https://github.com/yourname/realtime-log-etl.git cd deploy && docker-compose up -d # 访问 http://localhost:8081 查看 Flink WebUI # 访问 http://localhost:8088 查看 Superset八、延伸思考:从毕设到生产还差几步?
多租户
在 Kafka 加tenant_id头,Flink 侧根据tenant_id动态建 Doris 表,写数据时route到对应表,实现 SaaS 级隔离。监控告警
把 Flink 指标打入 Prometheus,配置 Grafana 面板:CK 失败次数、Doris Stream Load 延迟,夜间短信告警,真正“睡后收入”。性能调优
- 使用
MiniBatch攒微批,提高聚合吞吐。 - Doris 开
light_schema_change=true,在线加列,不影响写入。
- 使用
源码已开源,README 里给出演示地址。建议你先原封不动跑一遍,再把业务日志换成自己学院的 Nginx 日志,改几个指标,就能在答辩现场秀出“我们校园网的真实流量”。如果还想进一步压测,试着把并发度从 6 提到 20,观察背压曲线,调优的过程本身就是最好的面试谈资。祝你毕设高分,也欢迎提 PR 一起把项目做成真正的生产级模板。