本文共 9668 字,大约阅读时间需要 32 分钟。
import java.io.Serializable;public class FlinkDao implements Serializable { private String id; private String startMoney; private String startTime; private String endMoney; private String endTime; private String total; public FlinkDao(String id, String startMoney, String startTime, String endMoney, String endTime, String total) { this.id = id; this.startMoney = startMoney; this.startTime = startTime; this.endMoney = endMoney; this.endTime = endTime; this.total = total; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getStartMoney() { return startMoney; } public void setStartMoney(String startMoney) { this.startMoney = startMoney; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndMoney() { return endMoney; } public void setEndMoney(String endMoney) { this.endMoney = endMoney; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public String getTotal() { return total; } public void setTotal(String total) { this.total = total; } @Override public String toString() { return "FlinkDao{" + "id='" + id + '\'' + ", startMoney='" + startMoney + '\'' + ", startTime='" + startTime + '\'' + ", endMoney='" + endMoney + '\'' + ", endTime='" + endTime + '\'' + ", total='" + total + '\'' + '}'; }}
import com.mongodb.MongoCredential;import com.mongodb.ServerAddress;import com.mongodb.MongoClient;import java.util.ArrayList;import java.util.List;public class MongoDBUtil { public static MongoClient getConnect(){ ServerAddress serverAddress = new ServerAddress("localhost", 27017); Listcredential = new ArrayList<>(); //MongoCredential.createScramSha1Credential()三个参数分别为 用户名 数据库名称 密码 MongoCredential mongoCredential1 = MongoCredential.createScramSha1Credential("root", "flink", "root".toCharArray()); credential.add(mongoCredential1); //通过连接认证获取MongoDB连接 MongoClient mongoClient = new MongoClient(serverAddress, credential); return mongoClient; }}
import com.mongodb.MongoClient;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoDatabase;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.bson.Document;import java.util.ArrayList;import java.util.List;public class MongoDBSink extends RichSinkFunction> { private static final long serialVersionUID = 1L; MongoClient mongoClient = null; public void invoke(Tuple6 value) { try { if (mongoClient != null) { mongoClient = MongoDBUtil.getConnect(); MongoDatabase db = mongoClient.getDatabase("flink"); MongoCollection collection = db.getCollection("flink"); List list = new ArrayList<>(); Document doc = new Document(); doc.append("id", value.f0); doc.append("startMoney", value.f1); doc.append("startTime", value.f2); doc.append("endMoney", value.f3); doc.append("endTime", value.f4); doc.append("total", value.f5); list.add(doc); System.out.println("Insert Starting"); collection.insertMany(list); } } catch (Exception e) { e.printStackTrace(); } } public void open(Configuration parms) throws Exception { super.open(parms); mongoClient = MongoDBUtil.getConnect(); } public void close() throws Exception { if (mongoClient != null) { mongoClient.close(); } }}
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import java.util.Properties;public class FlinkTest { public static void main(String[] args) throws Exception { // flink包 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置监控数据流间隔时间 env.enableCheckpointing(5000); // 配置kafka和zookeeper的ip和端口 Properties properties = new Properties(); properties.setProperty("zookeeper.connect", "192.168.203.128:2181"); properties.setProperty("bootstrap.servers", "192.168.203.128:9092"); properties.setProperty("group.id", "test"); //properties.setProperty( "enable.auto.commit", "true"); // 记载kafka和zookeeper的配置 // 采集源文件在/home/fkf/Documents FlinkKafkaConsumer010consumer = new FlinkKafkaConsumer010 ("fkfTopic", new SimpleStringSchema(), properties); consumer.setStartFromLatest(); // 指定消费策略 //consumer.setStartFromEarliest(); // - 从最早的记录开始; //consumer.setStartFromLatest(); //- 从最新记录开始; //consumer.setStartFromTimestamp(0); // 从指定的epoch时间戳(毫秒)开始; //consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。 // 转换kafka数据类型为flink的dataStream类型// SingleOutputStreamOperator stream = env.addSource(consumer).flatMap(new FlatMapFunction () {// @Override// public void flatMap(String s, Collector collector) {// //String[] split = .split(",");// // FlinkDao flinkDao = new FlinkDao(split[0], split[1], split[2], split[3], split[4], split[5]);// collector.collect(s);//// }// }); DataStreamSource stream= env.addSource(consumer); DataStream map = stream.map(new MapFunction >() { @Override public Tuple6 map(String s) throws Exception { String[] split = s.split(","); return Tuple6.of(split[0], split[1], split[2], split[3], split[4], split[5]); } ; }); map.addSink(new MongoDBSink()); stream.print().setParallelism(1); env.execute("WordCount from kafka data"); }}
kafka_2.11-0.10.1.0
zookeeper-3.3.6apache-flume-1.9.0flink1.64.0.0 com.flink app 1.0-SNAPSHOT org.apache.flink flink-java 1.6.1 org.apache.flink flink-clients_2.11 1.6.1 org.apache.flink flink-streaming-java_2.11 1.6.1 org.apache.flink flink-connector-kafka-0.10_2.11 1.6.1 org.apache.flink flink-connector-filesystem_2.11 1.6.1 org.apache.flink flink-core 1.6.1 org.slf4j slf4j-log4j12 1.7.25 log4j log4j 1.2.17 com.alibaba fastjson 1.2.51 org.mongodb mongo-java-driver 3.10.1 org.apache.maven.plugins maven-compiler-plugin 3.2 1.8 1.8 org.apache.maven.plugins maven-dependency-plugin copy-dependencies test copy-dependencies target/classes/lib org.apache.maven.plugins maven-jar-plugin true com.tonytaotao.flink.FlinkKafka lib/ .
转载地址:http://yhivz.baihongyu.com/