Java OkHttp框架源码超详细解析
AllenC6 人气:0一、自己的理解的OkHttp
我理解的http的本质就是基于socket连接,把要传输的数据按照http协议的格式去封装后,传输在网络中,以此来实现的网络通信。
而OkHttp协议就是帮助我们,把我们把要传输的数据请求,按照http协议的格式,传输在Socket上,当然还有很多优化管理这些请求和连接的方法,例如:对于这些请求的管理:最多同时进行64个请求,同域名的最多同时进行5个请求。还有Socket连接池的管理。
二、OkHttp的使用方法
1.创建一个client,构建一个request
OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url("https://www.baidu.com/") .build();
2.同步请求
Response response = client.newCall(request).execute();
3.异步请求
client.newCall(request).enqueue(new Callback() { @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { //todo handle request failed } @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { //todo handle Response } });
三、基本对象介绍
1.OkHttpClient
一个请求的配置类,采用了建造者模式,方便用户配置一些请求参数,如配置callTimeout
,cookie
,interceptor
等等。
open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { constructor() : this(Builder()) class Builder constructor() { //调度器 internal var dispatcher: Dispatcher = Dispatcher() //连接池 internal var connectionPool: ConnectionPool = ConnectionPool() //整体流程拦截器 internal val interceptors: MutableList<Interceptor> = mutableListOf() //网络流程拦截器 internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() //流程监听器 internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() //连接失败时是否重连 internal var retryOnConnectionFailure = true //服务器认证设置 internal var authenticator: Authenticator = Authenticator.NONE //是否重定向 internal var followRedirects = true //是否从HTTP重定向到HTTPS internal var followSslRedirects = true //cookie设置 internal var cookieJar: CookieJar = CookieJar.NO_COOKIES //缓存设置 internal var cache: Cache? = null //DNS设置 internal var dns: Dns = Dns.SYSTEM //代理设置 internal var proxy: Proxy? = null //代理选择器设置 internal var proxySelector: ProxySelector? = null //代理服务器认证设置 internal var proxyAuthenticator: Authenticator = Authenticator.NONE //socket配置 internal var socketFactory: SocketFactory = SocketFactory.getDefault() //https socket配置 internal var sslSocketFactoryOrNull: SSLSocketFactory? = null internal var x509TrustManagerOrNull: X509TrustManager? = null internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS //协议 internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS //域名校验 internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT internal var certificateChainCleaner: CertificateChainCleaner? = null //请求超时 internal var callTimeout = 0 //连接超时 internal var connectTimeout = 10_000 //读取超时 internal var readTimeout = 10_000 //写入超时 internal var writeTimeout = 10_000 internal var pingInterval = 0 internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE internal var routeDatabase: RouteDatabase? = null ···省略代码···
2.request
同样是请求参数的配置类,也同样采用了建造者模式,但相比于OkHttpClient
,Request
就十分简单了,只有四个参数,分别是请求URL、请求方法、请求头、请求体。
class Request internal constructor( @get:JvmName("url") val url: HttpUrl, @get:JvmName("method") val method: String, @get:JvmName("headers") val headers: Headers, @get:JvmName("body") val body: RequestBody?, internal val tags: Map<Class<*>, Any> ) { open class Builder { //请求的URL internal var url: HttpUrl? = null //请求方法,如:GET、POST.. internal var method: String //请求头 internal var headers: Headers.Builder //请求体 internal var body: RequestBody? = null ···省略代码···
3.Call
请求调用接口,表示这个请求已经准备好可以执行,也可以取消,只能执行一次。
interface Call : Cloneable { /** 返回发起此调用的原始请求 */ fun request(): Request /** * 同步请求,立即执行。 * * 抛出两种异常: * 1. 请求失败抛出IOException; * 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/ @Throws(IOException::class) fun execute(): Response /** * 异步请求,将请求安排在将来的某个时间点执行。 * 如果在执行过一回的前提下再次执行抛出IllegalStateException */ fun enqueue(responseCallback: Callback) /** 取消请求。已经完成的请求不能被取消 */ fun cancel() /** 是否已被执行 */ fun isExecuted(): Boolean /** 是否被取消 */ fun isCanceled(): Boolean /** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */ fun timeout(): Timeout /** 克隆这个call,创建一个新的相同的Call */ public override fun clone(): Call /** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */ fun interface Factory { fun newCall(request: Request): Call } }
4.RealCall
OkHttpClient.kt override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall
是Call接口
的具体实现类,是应用端与网络层的连接桥,展示应用端原始的请求与连接数据,以及网络层返回的response
及其它数据流。 通过使用方法也可知,创建RealCall
对象后,就要调用同步或异步请求方法,所以它里面还包含同步请求 execute()
与异步请求 enqueue()
方法。(后面具体展开分析)
5.AsyncCall
异步请求调用,是RealCall
的一个内部类,就是一个Runnable
,被dispatcher调度器中的线程池所执行。
inner class AsyncCall( //用户传入的响应回调方法 private val responseCallback: Callback ) : Runnable { //同一个域名的请求次数,volatile + AtomicInteger 保证在多线程下及时可见性与原子性 @Volatile var callsPerHost = AtomicInteger(0) private set fun reuseCallsPerHostFrom(other: AsyncCall) { this.callsPerHost = other.callsPerHost } ···省略代码··· fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock() var success = false try { //调用线程池执行 executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) noMoreExchanges(ioException) //请求失败,调用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { //请求失败,调用调度器finish方法 client.dispatcher.finished(this) // This call is no longer running! } } } override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false timeout.enter() try { //请求成功,获取到服务器返回的response val response = getResponseWithInterceptorChain() signalledCallback = true //调用 Callback.onResponse() 方法,将 response 传递出去 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { //请求失败,调用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { //请求出现异常,调用cancel方法来取消请求 cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) //请求失败,调用 Callback.onFailure() 方法 responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { //请求结束,调用调度器finish方法 client.dispatcher.finished(this) } } } }
6.Dispatcher
调度器,用来调度Call
对象,同时包含线程池与异步请求队列,用来存放与执行AsyncCall
对象。
class Dispatcher constructor() { @get:Synchronized @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { //创建一个缓存线程池,来处理请求调用,这个线程池的核心线程数是0,等待队列的长度也是0,意味着 //线程池会直接创建新的线程去处理请求 executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! } /** 已准备好的异步请求队列 */ @get:Synchronized private val readyAsyncCalls = ArrayDeque<AsyncCall>() /** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */ private val runningAsyncCalls = ArrayDeque<AsyncCall>() /** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */ private val runningSyncCalls = ArrayDeque<RealCall>() ···省略代码··· }
四、流程分析
1.同步请求
client.newCall(request).execute();
newCall
方法就是创建一个RealCall
对象,然后执行其execute()
方法。
RealCall.kt override fun execute(): Response { //CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常 check(executed.compareAndSet(false, true)) { "Already Executed" } //请求超时开始计时 timeout.enter() //开启请求监听 callStart() try { //调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中 client.dispatcher.executed(this) //调用getResponseWithInterceptorChain 方法拿到 response return getResponseWithInterceptorChain() } finally { //执行完毕,调度器将该 call 从 runningSyncCalls队列中移除 client.dispatcher.finished(this) } } Dispatcher.kt @Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) }
调用调度器executed
方法,就是将当前的RealCall
对象加入到runningSyncCalls
队列中,然后调用getResponseWithInterceptorChain
方法拿到response
。
2.异步请求
RealCall.kt override fun enqueue(responseCallback: Callback) { //CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常 check(executed.compareAndSet(false, true)) { "Already Executed" } //开启请求监听 callStart() //新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中 client.dispatcher.enqueue(AsyncCall(responseCallback)) }
然后调用调度器的enqueue
方法
Dispatcher.kt internal fun enqueue(call: AsyncCall) { //加锁,保证线程安全 synchronized(this) { //将该请求调用加入到 readyAsyncCalls 队列中 readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { //通过域名来查找有没有相同域名的请求,有则复用。 val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } //执行请求 promoteAndExecute() } private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() //判断是否有请求正在执行 val isRunning: Boolean //加锁,保证线程安全 synchronized(this) { //遍历 readyAsyncCalls 队列 val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() //runningAsyncCalls 的数量不能大于最大并发请求数 64 if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. //同域名最大请求数5,同一个域名最多允许5条线程同时执行请求 if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. //从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中 i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } //通过运行队列中的请求数量来判断是否有请求正在执行 isRunning = runningCallsCount() > 0 } //遍历可执行队列,调用线程池来执行AsyncCall for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning }
调度器的enqueue
方法就是将AsyncCall
加入到readyAsyncCalls
队列中,然后调用promoteAndExecute
方法来执行请求,promoteAndExecute
方法做的其实就是遍历readyAsyncCalls
队列,然后将符合条件的请求用线程池执行,也就是会执行AsyncCall.run()
方法。
AsyncCall 方法的具体代码看上面的这边就不在此展示了,简单来说就是调用getResponseWithInterceptorChain
方法拿到response
,然后通过Callback.onResponse
方法传递出去。反之,如果请求失败,捕获了异常,就通过Callback.onFailure
将异常信息传递出去。 最终,请求结束,调用调度器finish
方法。
Dispatcher.kt /** 异步请求调用结束方法 */ internal fun finished(call: AsyncCall) { call.callsPerHost.decrementAndGet() finished(runningAsyncCalls, call) } /** 同步请求调用结束方法 */ internal fun finished(call: RealCall) { finished(runningSyncCalls, call) } private fun <T> finished(calls: Deque<T>, call: T) { val idleCallback: Runnable? synchronized(this) { //将当前请求调用从 正在运行队列 中移除 if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback } //继续执行剩余请求,将call从readyAsyncCalls中取出加入到runningAsyncCalls,然后执行 val isRunning = promoteAndExecute() if (!isRunning && idleCallback != null) { //如果执行完了所有请求,处于闲置状态,调用闲置回调方法 idleCallback.run() } }
请求结束,异步请求,把当前同域名的计数减一,然后后面和同步一样,都是把请求从正在执行的队列中移除,然后继续执行剩余请求。
3.获取Response
接着就是看看getResponseWithInterceptorChain
方法是如何拿到response
的。
internal fun getResponseWithInterceptorChain(): Response { //拦截器列表 val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) //构建拦截器责任链 val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) //如果call请求完成,那就意味着交互完成了,没有更多的东西来交换了 var calledNoMoreExchanges = false try { //执行拦截器责任链来获取 response val response = chain.proceed(originalRequest) //如果被取消,关闭响应,抛出异常 if (isCanceled()) { response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { noMoreExchanges(null) } } }
简单概括一下:这里采用了责任链设计模式,通过拦截器构建了以RealInterceptorChain
责任链,然后执行proceed
方法来得到response
。
那么,这又涉及拦截器是什么?拦截器责任链又是什么?
五、Interceptor
只声明了一个拦截器方法,在子类中具体实现,还包含一个Chain
接口,核心方法是proceed(request)
处理请求来获取response
。
fun interface Interceptor { /** 拦截方法 */ @Throws(IOException::class) fun intercept(chain: Chain): Response interface Chain { /** 原始请求数据 */ fun request(): Request /** 核心方法,处理请求,获取response */ @Throws(IOException::class) fun proceed(request: Request): Response fun connection(): Connection? fun call(): Call fun connectTimeoutMillis(): Int fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain fun readTimeoutMillis(): Int fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain fun writeTimeoutMillis(): Int fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain } }
六、RealInterceptorChain
拦截器链就是实现Interceptor.Chain
接口,重点就是复写的proceed
方法。
class RealInterceptorChain( internal val call: RealCall, private val interceptors: List<Interceptor>, private val index: Int, internal val exchange: Exchange?, internal val request: Request, internal val connectTimeoutMillis: Int, internal val readTimeoutMillis: Int, internal val writeTimeoutMillis: Int ) : Interceptor.Chain { ···省略代码··· private var calls: Int = 0 override fun call(): Call = call override fun request(): Request = request @Throws(IOException::class) override fun proceed(request: Request): Response { check(index < interceptors.size) calls++ if (exchange != null) { check(exchange.finder.sameHostAndPort(request.url)) { "network interceptor ${interceptors[index - 1]} must retain the same host and port" } check(calls == 1) { "network interceptor ${interceptors[index - 1]} must call proceed() exactly once" } } //index+1, 复制创建新的责任链,也就意味着调用责任链中的下一个处理者,也就是下一个拦截器 val next = copy(index = index + 1, request = request) //取出当前拦截器 val interceptor = interceptors[index] //执行当前拦截器的拦截方法 @Suppress("USELESS_ELVIS") val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor $interceptor returned null") if (exchange != null) { check(index + 1 >= interceptors.size || next.calls == 1) { "network interceptor $interceptor must call proceed() exactly once" } } check(response.body != null) { "interceptor $interceptor returned a response with no body" } return response } }
链式调用,最终会向下执行拦截器列表中的每个拦截器,然后向上返回Response
。
七、拦截器
各类拦截器的总结,按顺序:
client.interceptors
:这是由开发者设置的,会在所有的拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义header
、自定义log
等等。RetryAndFollowUpInterceptor
:这里会对连接做一些初始化工作,以及请求失败的重试工作,重定向的后续请求工作。跟他的名字一样,就是做重试工作还有一些连接跟踪工作。BridgeInterceptor
:是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的响应。CacheInterceptor
:这里主要是缓存的相关处理,会根据用户在OkHttpClient
里定义的缓存配置,然后结合请求新建一个缓存策略,由它来判断是使用网络还是缓存来构建response
。ConnectInterceptor
:这里主要就是负责建立连接,会建立TCP连接
或者TLS连接
。Client.networkInterceptors
:这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。CallServerInterceptor
:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,将请求头与请求体发送给服务器,以及解析服务器返回的response
。
接下来我们按顺序,从上往下,对这些拦截器进行一一解读。
1.client.interceptors
这是用户自己定义的拦截器,称为应用拦截器,会保存在OkHttpClient
的interceptors: List<Interceptor>
列表中。 他是拦截器责任链中的第一个拦截器,也就是说会第一个执行拦截方法,我们可以通过它来添加自定义Header信息
,如:
class HeaderInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request().newBuilder() .addHeader("device-android", "xxxxxxxxxxx") .addHeader("country-code", "ZH") .build(); return chain.proceed(request); } } //然后在 OkHttpClient 中加入 OkHttpClient client = new OkHttpClient.Builder() .connectTimeout(60, TimeUnit.SECONDS) .readTimeout(15, TimeUnit.SECONDS) .writeTimeout(15, TimeUnit.SECONDS) .cookieJar(new MyCookieJar()) .addInterceptor(new HeaderInterceptor())//添加自定义Header拦截器 .build();
2.RetryAndFollowUpInterceptor
第二个拦截器,从它的名字也可知道,它负责请求失败的重试工作与重定向的后续请求工作,同时它会对连接做一些初始化工作。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { //这里会新建一个ExchangeFinder,ConnectInterceptor会使用到 call.enterNetworkInterceptorExchange(request, newExchangeFinder) var response: Response var closeActiveExchange = true try { if (call.isCanceled()) { throw IOException("Canceled") } try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { //尝试通过路由连接失败。该请求将不会被发送。 if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { //尝试与服务器通信失败。该请求可能已发送。 if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue } // Attach the prior response if it exists. Such responses never have a body. //尝试关联上一个response,注意:body是为null if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() } val exchange = call.interceptorScopedExchange //会根据 responseCode 来判断,构建一个新的request并返回来重试或者重定向 val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } //如果请求体是一次性的,不需要再次重试 val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() //最大重试次数,不同的浏览器是不同的,比如:Chrome为21,Safari则是16 if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp priorResponse = response } finally { call.exitNetworkInterceptorExchange(closeActiveExchange) } } } /** 判断是否要进行重连,false->不尝试重连;true->尝试重连。*/ private fun recover( e: IOException, call: RealCall, userRequest: Request, requestSendStarted: Boolean ): Boolean { //客户端禁止重试 if (!client.retryOnConnectionFailure) return false //不能再次发送该请求体 if (requestSendStarted && requestIsOneShot(e, userRequest)) return false //发生的异常是致命的,无法恢复,如:ProtocolException if (!isRecoverable(e, requestSendStarted)) return false //没有更多的路由来尝试重连 if (!call.retryAfterFailure()) return false // 对于失败恢复,使用带有新连接的相同路由选择器 return true } ···省略代码···
3.BridgeInterceptor
从它的名字可以看出,他的定位是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,比如:添加Content-Type
,添加Cookie
,添加User-Agent
等等。再将服务器返回的response
做一些处理转换为客户端需要的response
。比如:移除响应头中的Content-Encoding
、Content-Length
等等。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //获取原始请求数据 val userRequest = chain.request() val requestBuilder = userRequest.newBuilder() //重新构建请求头,请求体信息 val body = userRequest.body val contentType = body.contentType() requestBuilder.header("Content-Type", contentType.toString()) requestBuilder.header("Content-Length", contentLength.toString()) requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.header("Host", userRequest.url.toHostHeader()) requestBuilder.header("Connection", "Keep-Alive") ···省略代码··· //添加cookie val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) } //添加user-agent if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) } //重新构建一个Request,然后执行下一个拦截器来处理该请求 val networkResponse = chain.proceed(requestBuilder.build()) cookieJar.receiveHeaders(userRequest.url, networkResponse.headers) //创建一个新的responseBuilder,目的是将原始请求数据构建到response中 val responseBuilder = networkResponse.newBuilder() .request(userRequest) if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body if (responseBody != null) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() //修改response header信息,移除Content-Encoding,Content-Length信息 responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") //修改response body信息 responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } } return responseBuilder.build() ···省略代码···
4.CacheInterceptor
用户可以通过OkHttpClient.cache
来配置缓存,缓存拦截器通过CacheStrategy
来判断是使用网络还是缓存来构建response
。
class CacheInterceptor(internal val cache: Cache?) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() //通过request从OkHttpClient.cache中获取缓存 val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() //创建一个缓存策略,用来确定怎么使用缓存 val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() //为空表示不使用网络,反之,则表示使用网络 val networkRequest = strategy.networkRequest //为空表示不使用缓存,反之,则表示使用缓存 val cacheResponse = strategy.cacheResponse //追踪网络与缓存的使用情况 cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE //有缓存但不适用,关闭它 if (cacheCandidate != null && cacheResponse == null) { cacheCandidate.body?.closeQuietly() } //如果网络被禁止,但是缓存又是空的,构建一个code为504的response,并返回 if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } } //如果我们禁用了网络不使用网络,且有缓存,直接根据缓存内容构建并返回response if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } } //为缓存添加监听 if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse) } else if (cache != null) { listener.cacheMiss(call) } var networkResponse: Response? = null try { //责任链往下处理,从服务器返回response 赋值给 networkResponse networkResponse = chain.proceed(networkRequest) } finally { //捕获I/O或其他异常,请求失败,networkResponse为空,且有缓存的时候,不暴露缓存内容。 if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } } //如果有缓存 if (cacheResponse != null) { //且网络返回response code为304的时候,使用缓存内容新构建一个Response返回。 if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!!.close() // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { //否则关闭缓存响应体 cacheResponse.body?.closeQuietly() } } //构建网络请求的response val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() //如果cache不为null,即用户在OkHttpClient中配置了缓存,则将上一步新构建的网络请求response存到cache中 if (cache != null) { //根据response的code,header以及CacheControl.noStore来判断是否可以缓存 if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // 将该response存入缓存 val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { listener.cacheMiss(call) } } } //根据请求方法来判断缓存是否有效,只对Get请求进行缓存,其它方法的请求则移除 if (HttpMethod.invalidatesCache(networkRequest.method)) { try { //缓存无效,将该请求缓存从client缓存配置中移除 cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } } return response } ···省略代码···
5.ConnectInterceptor
负责实现与服务器真正建立起连接,
object ConnectInterceptor : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain //初始化一个exchange对象 val exchange = realChain.call.initExchange(chain) //根据这个exchange对象来复制创建一个新的连接责任链 val connectedChain = realChain.copy(exchange = exchange) //执行该连接责任链 return connectedChain.proceed(realChain.request) } }
一扫下来,代码十分简单,拦截方法里就只有三步。
- 初始化一个
exchange
对象。 - 然后根据这个
exchange
对象来复制创建一个新的连接责任链。 - 执行该连接责任链。
那这个exchange
对象又是什么呢?
RealCall.kt internal fun initExchange(chain: RealInterceptorChain): Exchange { ...省略代码... //这里的exchangeFinder就是在RetryAndFollowUpInterceptor中创建的 val exchangeFinder = this.exchangeFinder!! //返回一个ExchangeCodec(是个编码器,为request编码以及为response解码) val codec = exchangeFinder.find(client, chain) //根据exchangeFinder与codec新构建一个Exchange对象,并返回 val result = Exchange(this, eventListener, exchangeFinder, codec) ...省略代码... return result }
具体看看ExchangeFinder.find()
这一步,
ExchangeFinder.kt fun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec { try { //查找合格可用的连接,返回一个 RealConnection 对象 val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method != "GET" ) //根据连接,创建并返回一个请求响应编码器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分别对应Http1协议与Http2协议 return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure(e.lastConnectException) throw e } catch (e: IOException) { trackFailure(e) throw RouteException(e) } }
继续往下看findHealthyConnection
方法
ExchangeFinder.kt private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { //重点:查找连接 val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled ) //检查该连接是否合格可用,合格则直接返回该连接 if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate } //如果该连接不合格,标记为不可用,从连接池中移除 candidate.noNewExchanges() ...省略代码... } }
所以核心方法就是findConnection
,我们继续深入看看该方法:
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { if (call.isCanceled()) throw IOException("Canceled") //第一次,尝试重连 call 中的 connection,不需要去重新获取连接 val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } //如果 call 中的 connection 还没有释放,就重用它。 if (call.connection != null) { check(toClose == null) return callConnection } //如果 call 中的 connection 已经被释放,关闭Socket. toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) } //需要一个新的连接,所以重置一些状态 refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0 //第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } //连接池中是空的,准备下次尝试连接的路由 val routes: List<Route>? val route: Route ...省略代码... //第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用 if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } route = localRouteSelection.next() } //第四次,手动创建一个新连接 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) //第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。 //这一步主要是为了校验一下,比如已经有了一条连接了,就可以直接复用,而不用使用手动创建的新连接。 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } synchronized(newConnection) { //将手动创建的新连接放入连接池 connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection) return newConnection }
在代码中可以看出,一共做了5次尝试去得到连接:
- 第一次,尝试重连 call 中的 connection,不需要去重新获取连接。
- 第二次,尝试从连接池中获取一个连接,不带路由,不带多路复用。
- 第三次,再次尝试从连接池中获取一个连接,带路由,不带多路复用。
- 第四次,手动创建一个新连接。
- 第五次,再次尝试从连接池中获取一个连接,带路由,带多路复用。
这一步就是为了建立连接。
6.client.networkInterceptors
该拦截器称为网络拦截器,与client.interceptors
一样也是由用户自己定义的,同样是以列表的形式存在OkHttpClient
中。
那这两个拦截器有什么不同呢?
其实他两的不同都是由于他们所处的位置不同所导致的,应用拦截器处于第一个位置,所以无论如何它都会被执行,而且只会执行一次。而网络拦截器处于倒数第二的位置,它不一定会被执行,而且可能会被执行多次,比如:在RetryAndFollowUpInterceptor
失败或者CacheInterceptor
直接返回缓存的情况下,我们的网络拦截器是不会被执行的。
7.CallServerInterceptor
到了这里,客户端与服务器已经建立好了连接,接着就是将请求头与请求体发送给服务器,以及解析服务器返回的response
了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body var invokeStartEvent = true var responseBuilder: Response.Builder? = null try { //写入请求头 exchange.writeRequestHeaders(request) //如果不是GET请求,并且请求体不为空 if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { //当请求头为"Expect: 100-continue"时,在发送请求体之前需要等待服务器返回"HTTP/1.1 100 Continue" 的response,如果没有等到该response,就不发送请求体。 //POST请求,先发送请求头,在获取到100继续状态后继续发送请求体 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { //刷新请求,即发送请求头 exchange.flushRequest() //解析响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } //写入请求体 if (responseBuilder == null) { if (requestBody.isDuplex()) { //如果请求体是双公体,就先发送请求头,稍后在发送请求体 exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() //写入请求体 requestBody.writeTo(bufferedRequestBody) } else { //如果获取到了"Expect: 100-continue"响应,写入请求体 val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } ···省略代码··· //请求结束,发送请求体 exchange.finishRequest() ···省略代码··· try { if (responseBuilder == null) { //读取响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! ···省略代码··· //构建一个response var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code ···省略代码··· return response ···省略代码···
简单概括一下:写入发送请求头,然后根据条件是否写入发送请求体,请求结束。解析服务器返回的请求头,然后构建一个新的response
,并返回。 这里CallServerInterceptor
是拦截器责任链中最后一个拦截器了,所以他不会再调用chain.proceed()
方法往下执行,而是将这个构建的response
往上传递给责任链中的每个拦截器。
总结一下流程:
加载全部内容