亲宝软件园·资讯

展开

springboot内置tomcat NIO处理

CRUD的W 人气:1

前言

springboot内置的tomcat目前默认是基于NIO来实现的,本文介绍下tomcat接受请求的一些组件及组件之间的关联

tomcat组件

本文只介绍NIO中tomcat的组件

我们直接看NIO的核心类NioEndpoint的startInternal方法

Acceptor组件

public void startInternal() throws Exception {
        if (!running) {
            running = true;
            paused = false;
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());
            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }
            initializeConnectionLatch();
            // Start poller threads
            // 核心代码1
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }
			// 核心代码2
            startAcceptorThreads();
        }
    }

看核心代码1的位置构造了一个Poller数组,Poller是一个实现了Runnable的类,并且启动了该线程类,

getPollerThreadCount()方法返回了2和当前物理机CPU内核数的最小值,即创建的数组最大值为2

接下来看核心代码2startAcceptorThreads()

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);
    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

创建了多个Acceptor类,Acceptor也是实现了Runnable的线程类,创建个数默认是1

然后我们看下Acceptor启动后做了什么,我们直接看run方法

  public void run() {
        ......
                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // 核心代码1
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    ......
                }
                // Successful accept, reset the error delay
                errorDelay = 0;
                // Configure the socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 核心代码2
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
               ......
    }

核心代码1很明显是一个阻塞模型,即接受客户端连接的,当没有客户端连接时会处于阻塞,这里可以看到默认情况下tomcat在nio模式下只有一个Acceptor线程类来接受连接

然后看核心代码2

 protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
            // 核心代码
            getPoller0().register(channel);
        } catch (Throwable t) {
           ......
        }
        return true;
    }

我们看核心代码getPoller0().register(channel)

public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
        // 核心代码
        addEvent(r);
    }

看addEvent

  private void addEvent(PollerEvent event) {
        events.offer(event);
        if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
    }

events的定义

private final SynchronizedQueue<PollerEvent> events =
                new SynchronizedQueue<>();

这里可以看到封装了一个PollerEvent 并且扔到了一个队列里面,然后当前类就结束了

由此可得Acceptor的作用就是接受客户端连接,并且把连接封装起来扔到了一个队列中

Poller

我们前面已经创建并且启动了多个Poller线程类,默认的数量是小于等于2的。

然后我们看下Poller类做了什么,同样我们看run方法

  @Override
    public void run() {
        // Loop until destroy() is called
        while (true) {
            boolean hasEvents = false;
            try {
                if (!close) {
                	// 核心代码1
                    hasEvents = events();
                    
            .......
            
            Iterator<SelectionKey> iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            // Walk through the collection of ready keys and dispatch
            // any active event.
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (attachment == null) {
                    iterator.remove();
                } else {
                    iterator.remove();
                    // 核心代码2
                    processKey(sk, attachment);
                }
            }//while
            //process timeouts
            timeout(keyCount,hasEvents);
        }//while
        getStopLatch().countDown();
    }

先看核心代码1 hasEvents = events()

 public boolean events() {
        boolean result = false;
        PollerEvent pe = null;
        for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
            result = true;
            try {
            	// 核心代码
                pe.run();
                pe.reset();
                if (running && !paused) {
                    eventCache.push(pe);
                }
            } catch ( Throwable x ) {
                log.error("",x);
            }
        }
        return result;
    }

核心代码run

@Override
    public void run() {
        if (interestOps == OP_REGISTER) {
            try {
            	// 核心代码,注册到selector轮训器
                socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
            } catch (Exception x) {
                log.error(sm.getString("endpoint.nio.registerFail"), x);
            }
        } else {
            ......
        }
    }

可以看出大概的意思就是从刚才我们放进去的队列events里面取数据放到了eventCache里面,eventCache的定义SynchronizedStack eventCache,当取到数据后返回true,这个时候就会进入核心代码2处的processKey(sk, attachment),也就是开始处理请求了

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
        try {
            if ( close ) {
                cancelledKey(sk);
            } else if ( sk.isValid() && attachment != null ) {
                if (sk.isReadable() || sk.isWritable() ) {
                    if ( attachment.getSendfileData() != null ) {
                        processSendfile(sk,attachment, false);
                    } else {
                        unreg(sk, attachment, sk.readyOps());
                        boolean closeSocket = false;
                        // Read goes before write
                        if (sk.isReadable()) {
                        	// 核心代码
                            if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                closeSocket = true;
                            }
                        }
                        if (!closeSocket && sk.isWritable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                closeSocket = true;
                            }
                        }
                      ......
    }

这里就可以看到我们的NIO的模型了,也就是多路复用的模型,轮询来判断key的状态,当key是可读或者可写时执行processSocket

  public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            // 核心代码
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

这里就是核心代码了,可以看到getExecutor()方法,获取线程池,这个线程池是在初始化tomcat时提前初始化好的,默认情况下核心线程是10,最大线程是200。线程池的配置可以根据我们自己配置来设置大小。

这里拿到线程池然后包装了一个SocketProcessorBase线程类扔到线程池里面取执行

从这里可以看到Poller的功能就是从前面的队列里面获取连接然后包装成SocketProcessorBase之后扔到线程池里面去执行,SocketProcessorBase才是最终真正处理请求的

总结

根据上面的分析我们已经可以看到tomcat的执行流程了,这里盗用网上的一张比较好的图

在这里插入图片描述

大致流程为

1、创建一个Acceptor线程来接收用户连接,接收到之后扔到events queue队列里面,默认情况下只有一个线程来接收

2、创建Poller线程,数量小于等于2,Poller对象是NIO的核心,在Poller中,维护了一个Selector对象;当Poller从队列中取出socket后,注册到该Selector中;然后通过遍历Selector,找出其中可读的socket,然后扔到线程池中处理相应请求,这就是典型的NIO多路复用模型

3、扔到线程池中的SocketProcessorBase处理请求

相较于BIO模型的tomcat,NIO的优势分析

1、BIO中的流程应该是接收到请求之后直接把请求扔给线程池去做处理,在这个情况下一个连接即需要一个线程来处理,线程既需要读取数据还需要处理请求,线程占用时间长,很容易达到最大线程

2、NIO的流程的不同点在于Poller类采用了多路复用模型,即Poller类只有检查到可读或者可写的连接时才把当前连接扔给线程池来处理,这样的好处是大大节省了连接还不能读写时的处理时间(如读取请求数据),也就是说NIO“读取socket并交给Worker中的线程”这个过程是非阻塞的,当socket在等待下一个请求或等待释放时,并不会占用工作线程,因此Tomcat可以同时处理的socket数目远大于最大线程数,并发性能大大提高。

以上就是我对于tomcat中nio处理模型的一些理解。希望能给大家一个参考,也希望大家多多支持

加载全部内容

相关教程
猜你喜欢
用户评论