1.flink统计文本单词数量
package com.ycl; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountDemo { public static void main(String[] args) throws Exception { //1.创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //2.读取数据,从文件中读取 DataSource<String> lineDS = env.readTextFile("input/word.txt"); //3.切分,转换(word,1),匿名类 //Alt+Enter跳出如下方法的全部内容。 FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //3.1 按照空格切分单词 String[] words = value.split(" "); //3.2将单词转换为(word,1) ,点击.var 会补全所有的行。 for (String word : words) { Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1); //3.3 使用 Collector 向下游发送数据; out.collect(wordTuple2); } } }); //4.按照 word分组 UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0); //5.各分组内聚合 1是位置,表示第二个元素; AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroupBy.sum(1); //6.输出 sum.print(); } }输出结果如下;