立即注册 登录
About云-梭伦科技 返回首页

pig2的个人空间 https://www.aboutyun.com/?61 [收藏] [复制] [分享] [RSS]

日志

Flink自定义一个简单source及mysqlsource实例

已有 1667 次阅读2018-11-21 17:50 |系统分类:大数据

1.一个简单的source
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.CachingTokenFilter;
 
import java.util.Random;
 
public class MySelfSourceTest01 {
    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.OFF);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Random random = new Random();
                // 循环可以不停的读取静态数据
                while (true) {
                    int nextInt = random.nextInt(100);
                    ctx.collect("random : " + nextInt);
                    Thread.sleep(1000);
                }
            }
 
            @Override
            public void cancel() {
 
            }
        });
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] sps = value.split(":");
                return new Tuple2<>(value, Integer.parseInt(sps[1].trim()));
            }
        }).keyBy(0).timeWindow(Time.seconds(5));
 
        SingleOutputStreamOperator<String> apply = window.apply(new WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                input.forEach(x -> {
                    System.out.println("apply function -> " + x.f0);
                    out.collect(x.f0);
                });
            }
        });
 
        apply.print();
 
        try {
            env.execute("myself_source_test01");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.MySql自定义source

1.自定义source
package code.book.stream.customsinkandsource.jdbc.java;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


public class StudentSourceFromMysql extends RichSourceFunction<Student> {
    PreparedStatement ps;
    private Connection connection;

    /**
     * 一、open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        String driver = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://qingcheng11:3306/flinktest";
        String username = "root";
        String password = "qingcheng";
        //1.加载驱动
        Class.forName(driver);
        //2.创建连接
        connection = DriverManager.getConnection(url, username, password);
        //3.获得执行语句
        String sql = "select stuid,stuname,stuaddr,stusex from Student;";
        ps = connection.prepareStatement(sql);
    }

    /**
     * 二、DataStream调用一次run()方法用来获取数据
     */
    @Override
    public void run(SourceContext<Student> sourceContext) throws Exception {
        try {
            //4.执行查询,封装数据
            ResultSet resultSet = ps.executeQuery();
            while (resultSet.next()) {
                Student student = new Student(
                resultSet.getInt("stuid"),
                resultSet.getString("stuname").trim(),
                resultSet.getString("stuaddr").trim(), 
                resultSet.getString("stusex").trim());
                sourceContext.collect(student);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cancel() {

    }

    /**
     * 三、 程序执行完毕就可以进行,关闭连接和释放资源的动作了
     */
    @Override
    public void close() throws Exception {
        //5.关闭连接和释放资源
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }
}
2.source测试程序
package code.book.stream.customsinkandsource.jdbc.java;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StudentSourceFromMysqlTest {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.从自定义source中读取数据
        DataStream<Student> students=env.addSource(new StudentSourceFromMysql());

        //3.显示结果
        students.print();

        //4.触发流执行
        env.execute();
    }
3.source测试效果
能够正确查询出mysql中的数据。





https://blog.csdn.net/liguohuaBigdata/article/details/78588902 
https://blog.csdn.net/whr_yy/article/details/79869741 


路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条