1.一个简单的source
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.CachingTokenFilter;
import java.util.Random;
public class MySelfSourceTest01 {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.OFF);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
Random random = new Random();
// 循环可以不停的读取静态数据
while (true) {
int nextInt = random.nextInt(100);
ctx.collect("random : " + nextInt);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] sps = value.split(":");
return new Tuple2<>(value, Integer.parseInt(sps[1].trim()));
}
}).keyBy(0).timeWindow(Time.seconds(5));
SingleOutputStreamOperator<String> apply = window.apply(new WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
input.forEach(x -> {
System.out.println("apply function -> " + x.f0);
out.collect(x.f0);
});
}
});
apply.print();
try {
env.execute("myself_source_test01");
} catch (Exception e) {
e.printStackTrace();
}
}
}
package code.book.stream.customsinkandsource.jdbc.java;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class StudentSourceFromMysql extends RichSourceFunction<Student> {
PreparedStatement ps;
private Connection connection;
/**
* 一、open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://qingcheng11:3306/flinktest";
String username = "root";
String password = "qingcheng";
//1.加载驱动
Class.forName(driver);
//2.创建连接
connection = DriverManager.getConnection(url, username, password);
//3.获得执行语句
String sql = "select stuid,stuname,stuaddr,stusex from Student;";
ps = connection.prepareStatement(sql);
}
/**
* 二、DataStream调用一次run()方法用来获取数据
*/
@Override
public void run(SourceContext<Student> sourceContext) throws Exception {
try {
//4.执行查询,封装数据
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Student student = new Student(
resultSet.getInt("stuid"),
resultSet.getString("stuname").trim(),
resultSet.getString("stuaddr").trim(),
resultSet.getString("stusex").trim());
sourceContext.collect(student);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
/**
* 三、 程序执行完毕就可以进行,关闭连接和释放资源的动作了
*/
@Override
public void close() throws Exception {
//5.关闭连接和释放资源
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
}
2.source测试程序
package code.book.stream.customsinkandsource.jdbc.java;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StudentSourceFromMysqlTest {
public static void main(String[] args) throws Exception {
//1.创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.从自定义source中读取数据
DataStream<Student> students=env.addSource(new StudentSourceFromMysql());
//3.显示结果
students.print();
//4.触发流执行
env.execute();
}
}
https://blog.csdn.net/liguohuaBigdata/article/details/78588902
https://blog.csdn.net/whr_yy/article/details/79869741