分享

Kafka源码分析Producer的网络层——JAVA NIO封装


问题导读:

1.Java NIO有哪些组件?
2.Unix环境有哪几种网络IO模型?
3.Selector和epoll在事件的注册上面有哪些区别?







在上一篇我们分析了Metadata的更新机制,其中涉及到一个问题,就是Sender如何跟服务器通信,也就是网络层。同很多Java项目一样,Kafka client的网络层也是用的JavaNIO,然后在上面做了一层封装。

下面首先看一下,在Sender和服务器之间的部分:

20160923115714181.jpg

可以看到,Kafka client基于Java NIO封装了一个网络层,这个网络层最上层的接口是KakfaClient。其层次关系如下:

20160923121553048.jpg

在本篇中,先详细对最底层的Java NIO进行讲述。

NIO的4大组件Buffer与Channel
Channel: 在通常的Java网络编程中,我们知道有一对Socket/ServerSocket对象,每1个socket对象表示一个connection,ServerSocket用于服务器监听新的连接。

在NIO中,与之相对应的一对是SocketChannel/ServerSocketChannel。

下图展示了SocketChannel/ServerSocketChannel的类继承层次

20160923131810163.jpg

[mw_shl_code=java,true]public interface Channel extends Closeable {
    public boolean isOpen();
    public void close() throws IOException;
}

public interface ReadableByteChannel extends Channel {
    public int read(ByteBuffer dst) throws IOException;
}

public interface WritableByteChannel extends Channel {
    public int write(ByteBuffer src) throws IOException;
}[/mw_shl_code]

从代码可以看出,一个Channel最基本的操作就是read/write,并且其传进去的必须是ByteBuffer类型,而不是普通的内存buffer。

Buffer:在NIO中,也有1套围绕Buffer的类继承层次,在此就不详述了。只需知道Buffer就是用来封装channel发送/接收的数据。

Selector
Selector的主要目的是网络事件的 loop 循环,通过调用selector.poll,不断轮询每个Channel上读写事件

SelectionKey
SelectionKey用来记录一个Channel上的事件集合,每个Channel对应一个SelectionKey。

SelectionKey也是Selector和Channel之间的关联,通过SelectionKey可以取到对应的Selector和Channel。

关于这4大组件的协作、配合,下面来详细讲述。

4种网络IO模型

epoll与IOCP
在《Unix环境高级编程》中介绍了以下4种IO模型(实际不止4种,但常用的就这4种):

阻塞IO: read/write的时候,阻塞调用

非阻塞IO: read/write,没有数据,立马返回,轮询

IO复用:read/write一次都只能监听一个socket,但对于服务器来讲,有成千上完个socket连接,如何用一个函数,可以监听所有的socket上面的读写事件呢?这就是IO复用模型,对应Linux上面,就是select/poll/epoll3种技术。

异步IO:linux上没有,windows上对应的是IOCP。

Reactor模式 vs. Preactor模式
相信很多人都听说过网络IO的2种设计模式,关于这2种模式的具体阐述,可以自行google之。

在此处,只想对这2种模式做一个“最通俗的解释“:

Reactor模式:主动模式,所谓主动,是指应用程序不断去轮询,问操作系统,IO是否就绪。Linux下的select/poll/epooll就属于主动模式,需要应用程序中有个循环,一直去poll。

在这种模式下,实际的IO操作还是应用程序做的。

Proactor模式:被动模式,你把read/write全部交给操作系统,实际的IO操作由操作系统完成,完成之后,再callback你的应用程序。Windows下的IOCP就属于这种模式,再比如C++ Boost中的Asio库,就是典型的Proactor模式。

epoll的编程模型--3个阶段
在Linux平台上,Java NIO就是基于epoll来实现的。所有基于epoll的框架,都有3个阶段:

注册事件(connect,accept,read,write), 轮询IO是否就绪,执行实际IO操作。

下面的代码展示了在linux下,用C语言epoll编程的基本框架

[mw_shl_code=java,true]//阶段1: 调用epoll_ctl(xx) 注册事件

for( ; ; )
    {
        nfds = epoll_wait(epfd,events,20,500);     //阶段2:轮询所有的socket

        for(i=0;i<nfds;++i)  //处理轮询结果
        {
            if(events.data.fd==listenfd) //accept事件就绪
            {
                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //阶段3:执行实际的IO操作,accept
                ev.data.fd=connfd;
                ev.events=EPOLLIN|EPOLLET;
                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //回到阶段1:重新注册
            }
            else if( events.events&EPOLLIN )  //读就绪
            {
                n = read(sockfd, line, MAXLINE)) < 0    //阶段3:执行实际的io操作
                ev.data.ptr = md;     
                ev.events=EPOLLOUT|EPOLLET;
                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //回到阶段1:重新注册事件
            }
            else if(events.events&EPOLLOUT) //写就绪
            {
                struct myepoll_data* md = (myepoll_data*)events.data.ptr;   
                sockfd = md->fd;
                send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //阶段3: 执行实际的io操作
                ev.data.fd=sockfd;
                ev.events=EPOLLIN|EPOLLET;
                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //回到阶段1,重新注册事件
            }
            else
            {
                //其他的处理
            }
        }
    }[/mw_shl_code]
同样, NIO中的Selector同样有以下3个阶段,下面把Selector和epoll的使用做个对比:

20160923135649562.jpg


可以看到,2者只是写法不同,同样的, 都有这3个阶段。

下面的表格展示了connect, accept, read, write 这4种事件,分别在这3个阶段对应的函数:

20160923141539570.jpg


下面看一下Kafka client中Selector的核心实现:

[mw_shl_code=java,true]    @Override
    public void poll(long timeout) throws IOException {
        。。。
        clear(); //清空各种状态
        if (hasStagedReceives())
            timeout = 0;
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);  //轮询
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0) {
            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                KafkaChannel channel = channel(key);

                // register all per-connection metrics at once
                sensors.maybeRegisterConnectionMetrics(channel.id());
                lruConnections.put(channel.id(), currentTimeNanos);

                try {
                    if (key.isConnectable()) {  //有连接事件
                        channel.finishConnect();
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    }

                    if (channel.isConnected() && !channel.ready())
                        channel.prepare(); //这个只有需要安全检查的SSL需求,普通的不加密的channel,prepare()为空实现

                    if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { //读就绪
                        NetworkReceive networkReceive;
                        while ((networkReceive = channel.read()) != null)
                            addToStagedReceives(channel, networkReceive); //实际的读动作
                    }

                    if (channel.ready() && key.isWritable()) {  //写就绪
                        Send send = channel.write(); //实际的写动作
                        if (send != null) {
                            this.completedSends.add(send);
                            this.sensors.recordBytesSent(channel.id(), send.size());
                        }
                    }

                    /* cancel any defunct sockets */
                    if (!key.isValid()) {
                        close(channel);
                        this.disconnected.add(channel.id());
                    }
                } catch (Exception e) {
                    String desc = channel.socketDescription();
                    if (e instanceof IOException)
                        log.debug("Connection with {} disconnected", desc, e);
                    else
                        log.warn("Unexpected error from {}; closing connection", desc, e);
                    close(channel);
                    this.disconnected.add(channel.id());
                }
            }
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();
    }[/mw_shl_code]

epoll和selector在注册上的差别
从代码可以看出, Selector和epoll在代码结构上基本一样,但在事件的注册上面有区别:

epoll: 每次read/write之后,都要调用epoll_ctl重新注册

Selector: 注册一次,一直有效,一直会有事件产生,因此需要取消注册。下面来详细分析一下:

connect事件的注册

[mw_shl_code=java,true]//Selector
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.channels.containsKey(id))
            throw new IllegalStateException("There is already a connection for id " + id);

        SocketChannel socketChannel = SocketChannel.open();
        。。。
        try {
            socketChannel.connect(address);
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);  //构造channel的时候,注册connect事件
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        key.attach(channel);
        this.channels.put(id, channel);
    }[/mw_shl_code]

connect事件的取消
[mw_shl_code=java,true]//在上面的poll函数中,connect事件就绪,也就是指connect连接完成,连接简历
if (key.isConnectable()) {  //有连接事件
       channel.finishConnect();
                        ...
     }

//PlainTransportLayer
public void finishConnect() throws IOException {
        socketChannel.finishConnect();  //调用channel的finishConnect()
        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); //取消connect事件,新加read事件组册
    }[/mw_shl_code]
read事件的注册
从上面也可以看出,read事件的注册和connect事件的取消,是同时进行的

read事件的取消
因为read是要一直监听远程,是否有新数据到来,所以不会取消,一直监听

write事件的注册
[mw_shl_code=java,true]//Selector
    public void send(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

//KafkaChannel
    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);  //每调用一次Send,注册一次Write事件
    }[/mw_shl_code]
Write事件的取消
[mw_shl_code=java,true]//上面的poll函数里面
                    if (channel.ready() && key.isWritable()) { //write事件就绪
                        Send send = channel.write(); //在这个write里面,取消了write事件
                        if (send != null) {
                            this.completedSends.add(send);
                            this.sensors.recordBytesSent(channel.id(), send.size());
                        }
                    }


    private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);  //取消write事件

        return send.completed();
    }  [/mw_shl_code]

总结一下:

(1)“事件就绪“这个概念,对于不同事件类型,还是有点歧义的

read事件就绪:这个最好理解,就是远程有新数据到来,需要去read

write事件就绪:这个指什么呢? 其实指本地的socket缓冲区有没有满。没有满的话,应该就是一直就绪的,可写

connect事件就绪: 指connect连接完成

accept事件就绪:有新的连接进来,调用accept处理

(2)不同类型事件,处理方式是不一样的:

connect事件:注册1次,成功之后,就取消了。有且仅有1次

read事件:注册之后不取消,一直监听

write事件: 每调用一次send,注册1次,send成功,取消注册


上一篇:Kafka源码分析Producer读取Metadata的数据结构及Metadata2种更新机制介绍


来源:csdn
作者:li_101357


已有(2)人评论

跳转到指定楼层
sdtm1016 发表于 2016-10-24 09:43:35
真的是很不错
回复

使用道具 举报

hlmcm 发表于 2016-12-14 10:18:23
都是大牛,膜拜啊
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条