博客
关于我
项目实战 从 0 到 1 学习之Flink (26)Flink采集kafka数据后存到mongodb
阅读量:566 次
发布时间:2019-03-11

本文共 9668 字,大约阅读时间需要 32 分钟。

实体类:

import java.io.Serializable;public class FlinkDao implements Serializable  {    private String id;    private String startMoney;    private String startTime;    private String endMoney;    private String endTime;    private String total;    public FlinkDao(String id, String startMoney, String startTime, String endMoney, String endTime, String total) {        this.id = id;        this.startMoney = startMoney;        this.startTime = startTime;        this.endMoney = endMoney;        this.endTime = endTime;        this.total = total;    }    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public String getStartMoney() {        return startMoney;    }    public void setStartMoney(String startMoney) {        this.startMoney = startMoney;    }    public String getStartTime() {        return startTime;    }    public void setStartTime(String startTime) {        this.startTime = startTime;    }    public String getEndMoney() {        return endMoney;    }    public void setEndMoney(String endMoney) {        this.endMoney = endMoney;    }    public String getEndTime() {        return endTime;    }    public void setEndTime(String endTime) {        this.endTime = endTime;    }    public String getTotal() {        return total;    }    public void setTotal(String total) {        this.total = total;    }    @Override    public String toString() {        return "FlinkDao{" +                "id='" + id + '\'' +                ", startMoney='" + startMoney + '\'' +                ", startTime='" + startTime + '\'' +                ", endMoney='" + endMoney + '\'' +                ", endTime='" + endTime + '\'' +                ", total='" + total + '\'' +                '}';    }}

mongodb工具类:

import com.mongodb.MongoCredential;import com.mongodb.ServerAddress;import com.mongodb.MongoClient;import java.util.ArrayList;import java.util.List;public class MongoDBUtil   {    public static MongoClient getConnect(){        ServerAddress serverAddress = new ServerAddress("localhost", 27017);        List
credential = new ArrayList<>(); //MongoCredential.createScramSha1Credential()三个参数分别为 用户名 数据库名称 密码 MongoCredential mongoCredential1 = MongoCredential.createScramSha1Credential("root", "flink", "root".toCharArray()); credential.add(mongoCredential1); //通过连接认证获取MongoDB连接 MongoClient mongoClient = new MongoClient(serverAddress, credential); return mongoClient; }}

MongoDBSink

import com.mongodb.MongoClient;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoDatabase;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.bson.Document;import java.util.ArrayList;import java.util.List;public class MongoDBSink extends RichSinkFunction
> { private static final long serialVersionUID = 1L; MongoClient mongoClient = null; public void invoke(Tuple6
value) { try { if (mongoClient != null) { mongoClient = MongoDBUtil.getConnect(); MongoDatabase db = mongoClient.getDatabase("flink"); MongoCollection collection = db.getCollection("flink"); List
list = new ArrayList<>(); Document doc = new Document(); doc.append("id", value.f0); doc.append("startMoney", value.f1); doc.append("startTime", value.f2); doc.append("endMoney", value.f3); doc.append("endTime", value.f4); doc.append("total", value.f5); list.add(doc); System.out.println("Insert Starting"); collection.insertMany(list); } } catch (Exception e) { e.printStackTrace(); } } public void open(Configuration parms) throws Exception { super.open(parms); mongoClient = MongoDBUtil.getConnect(); } public void close() throws Exception { if (mongoClient != null) { mongoClient.close(); } }}

FlinkTest

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import java.util.Properties;public class FlinkTest {    public static void main(String[] args) throws Exception {        // flink包        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 设置监控数据流间隔时间        env.enableCheckpointing(5000);        // 配置kafka和zookeeper的ip和端口        Properties properties = new Properties();        properties.setProperty("zookeeper.connect", "192.168.203.128:2181");        properties.setProperty("bootstrap.servers", "192.168.203.128:9092");        properties.setProperty("group.id", "test");        //properties.setProperty( "enable.auto.commit", "true");        // 记载kafka和zookeeper的配置        // 采集源文件在/home/fkf/Documents        FlinkKafkaConsumer010
consumer = new FlinkKafkaConsumer010
("fkfTopic", new SimpleStringSchema(), properties); consumer.setStartFromLatest(); // 指定消费策略 //consumer.setStartFromEarliest(); // - 从最早的记录开始; //consumer.setStartFromLatest(); //- 从最新记录开始; //consumer.setStartFromTimestamp(0); // 从指定的epoch时间戳(毫秒)开始; //consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。 // 转换kafka数据类型为flink的dataStream类型// SingleOutputStreamOperator
stream = env.addSource(consumer).flatMap(new FlatMapFunction
() {// @Override// public void flatMap(String s, Collector
collector) {// //String[] split = .split(",");// // FlinkDao flinkDao = new FlinkDao(split[0], split[1], split[2], split[3], split[4], split[5]);// collector.collect(s);//// }// }); DataStreamSource
stream= env.addSource(consumer); DataStream map = stream.map(new MapFunction
>() { @Override public Tuple6
map(String s) throws Exception { String[] split = s.split(","); return Tuple6.of(split[0], split[1], split[2], split[3], split[4], split[5]); } ; }); map.addSink(new MongoDBSink()); stream.print().setParallelism(1); env.execute("WordCount from kafka data"); }}

pom文件

kafka_2.11-0.10.1.0

zookeeper-3.3.6
apache-flume-1.9.0
flink1.6

4.0.0
com.flink
app
1.0-SNAPSHOT
org.apache.flink
flink-java
1.6.1
org.apache.flink
flink-clients_2.11
1.6.1
org.apache.flink
flink-streaming-java_2.11
1.6.1
org.apache.flink
flink-connector-kafka-0.10_2.11
1.6.1
org.apache.flink
flink-connector-filesystem_2.11
1.6.1
org.apache.flink
flink-core
1.6.1
org.slf4j
slf4j-log4j12
1.7.25
log4j
log4j
1.2.17
com.alibaba
fastjson
1.2.51
org.mongodb
mongo-java-driver
3.10.1
org.apache.maven.plugins
maven-compiler-plugin
3.2
1.8
1.8
org.apache.maven.plugins
maven-dependency-plugin
copy-dependencies
test
copy-dependencies
target/classes/lib
org.apache.maven.plugins
maven-jar-plugin
true
com.tonytaotao.flink.FlinkKafka
lib/
.

 

转载地址:http://yhivz.baihongyu.com/

你可能感兴趣的文章
伪类选择器
查看>>
两正态总体参数的检验
查看>>
C# WinForm程序退出的方法
查看>>
ubuntu安装gem和fastlane
查看>>
ViroMedia集成android报错Lcom/facebook/react/bridge/WritableMap
查看>>
onFailure unexpected end of stream
查看>>
android 集成weex
查看>>
《C Prime Plus》(第六版) 第03章 编程练习 5 unsigned long int 格式化输出
查看>>
【echarts】中国地图china.js 在线引用地址
查看>>
Flex 布局的自适应子项内容过长导致其被撑大问题
查看>>
PL/SQL 动态Sql拼接where条件
查看>>
Lua-table 一种更少访问的安全取值方式
查看>>
虚函数
查看>>
菱形继承
查看>>
Error:Cannot read packageName from AndroidManifest.xml
查看>>
RTL设计- 多时钟域按顺序复位释放
查看>>
斐波那契数列两种算法的时间复杂度
查看>>
int main(int argc,char* argv[])详解
查看>>
【Android踩过的坑】7.Android Studio 点击启动项目时进入调试模式
查看>>
【Android小技巧】1.快速查看SDK对应的API Level
查看>>