Android框架之OkHttp3源码 解析Android框架之OkHttp3源码
handsome黄 人气:0OkHttp流程图
OkHttp基本使用
gradle依赖
implementation 'com.squareup.okhttp3:okhttp:3.11.0'
implementation 'com.squareup.okio:okio:1.15.0'
/** *这里拿get请求来 * 异步的get请求 */ public void okhttpAsyn() { //设置超时的时间 OkHttpClient.Builder builder = new OkHttpClient.Builder() .connectTimeout(15, TimeUnit.SECONDS) .writeTimeout(20, TimeUnit.SECONDS) .readTimeout(20, TimeUnit.SECONDS) ; OkHttpClient okHttpClient = builder.build(); Request request = new Request.Builder() .get() //设置请求模式 .url("https://www.baidu.com/") .build(); Call call = okHttpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.d("MainActivity", "-----------onFailure-----------"); } @Override public void onResponse(Call call, Response response) throws IOException { Log.d("MainActivity", "----onResponse----" + response.body().toString()); runOnUiThread(new Runnable() { @Override public void run() { Toast.makeText(MainActivity.this, "请求成功", Toast.LENGTH_LONG).show(); } }); } }); }
OkHttp源码分析
从OkHttp的基本使用中,我们看到,通过okHttpClient.newCall()方法,拿到这个call对象,我们看看newCall是怎么走的
/** * Prepares the {@code request} to be executed at some point in the future. */ @Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); } static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.eventListener = client.eventListenerFactory().create(call); return call; }
从这里的源码知道,okHttpClient.newCall()实际上返回的是RealCall对象,而call.enqueue(),实际上是调用的了RealCall中的enqueue()方法,我们看看enqueue()方法方法怎么走。
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
可以看到client.dispatcher().enqueue(new AsyncCall(responseCallback));这句代码,也就是说,最终是有的请求是有dispatcher来完成,我们看看dispatcher。
/* * Copyright (C) 2013 Square, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package okhttp3; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import okhttp3.RealCall.AsyncCall; import okhttp3.internal.Util; /** * Policy on when async requests are executed. * * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number * of calls concurrently. */ public final class Dispatcher { //最大请求的并发数 private int maxRequests = 64; //每个主机最大请求数 private int maxRequestsPerHost = 5; private @Nullable Runnable idleCallback; /** 消费线程池 */ private @Nullable ExecutorService executorService; /** 准备运行的异步请求队列 */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** 正在运行的异步请求队列 */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** 正在运行的同步请求队列 */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); /** 构造方法 */ public Dispatcher(ExecutorService executorService) { this.executorService = executorService; } public Dispatcher() { } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } /** * *设置并发执行最大的请求数量 * <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests * will remain in flight. */ public synchronized void setMaxRequests(int maxRequests) { if (maxRequests < 1) { throw new IllegalArgumentException("max < 1: " + maxRequests); } this.maxRequests = maxRequests; promoteCalls(); } //获取到最大请求的数量 public synchronized int getMaxRequests() { return maxRequests; } /** * 设置每个主机并发执行的请求的最大数量 * <p>If more than {@code maxRequestsPerHost} requests are in flight when this is invoked, those * requests will remain in flight. * * <p>WebSocket connections to hosts <b>do not</b> count against this limit. */ public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) { if (maxRequestsPerHost < 1) { throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost); } this.maxRequestsPerHost = maxRequestsPerHost; promoteCalls(); } //获取每个主机最大并发数量 public synchronized int getMaxRequestsPerHost() { return maxRequestsPerHost; } /** * Set a callback to be invoked each time the dispatcher becomes idle (when the number of running * calls returns to zero). * * <p>Note: The time at which a {@linkplain Call call} is considered idle is different depending * on whether it was run {@linkplain Call#enqueue(Callback) asynchronously} or * {@linkplain Call#execute() synchronously}. Asynchronous calls become idle after the * {@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has * returned. Synchronous calls become idle once {@link Call#execute() execute()} returns. This * means that if you are doing synchronous calls the network layer will not truly be idle until * every returned {@link Response} has been closed. */ public synchronized void setIdleCallback(@Nullable Runnable idleCallback) { this.idleCallback = idleCallback; } synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } /** * Cancel all calls currently enqueued or executing. Includes calls executed both {@linkplain * Call#execute() synchronously} and {@linkplain Call#enqueue asynchronously}. */ public synchronized void cancelAll() { for (AsyncCall call : readyAsyncCalls) { call.get().cancel(); } for (AsyncCall call : runningAsyncCalls) { call.get().cancel(); } for (RealCall call : runningSyncCalls) { call.cancel(); } } private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } } //----------------省略若干代码----------------------- }
我们来找到这段代码
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
当正在运行的异步请求队列中的数量小于64并且正在运行的请求主机数小于5时则把请求加载到runningAsyncCalls中并在线程池中执行,否则就再入到readyAsyncCalls中进行缓存等待。而runningAsyncCalls这个请求队列存放的就是AsyncCall对象,而这个AsyncCall就是RealCall的内部类,也就是说executorService().execute(call);实际上走的是RealCall类中的execute()方法.
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }
这部分的代码,相信很多人都能够看的明白,无非就是一些成功,失败的回调,这段代码,最重要的是esponse response = getResponseWithInterceptorChain();和client.dispatcher().finished(this);我们先来看看client.dispatcher().finished(this);这句代码是怎么执行的。
/** Used by {@code AsyncCall#run} to signal completion. */ void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); } /** Used by {@code Call#execute} to signal completion. */ void finished(RealCall call) { finished(runningSyncCalls, call, false); } private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } } private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
由于client.dispatcher().finished(this);这句代码是放到finally中执行的,所以无论什么情况,都会执行上面的promoteCalls()方法,而从promoteCalls()方法中可以看出通过遍历来获取到下一个请求从而执行下一个网络请求。
回过头来,我们看看这一句代码Response response = getResponseWithInterceptorChain(); 通过getResponseWithInterceptorChain();来获取到response,然后回调返回。很明显getResponseWithInterceptorChain()这句代码里面进行了网络请求。我们看看是怎么执行的。
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); return chain.proceed(originalRequest); } }
从上面代码可以知道,缓存,网络请求,都封装成拦截器的形式。拦截器主要用来观察,修改以及可能短路的请求输出和响应的回来。最后return chain.proceed,而chain是通过new RealInterceptorChain来获取到的,我们来看看RealInterceptorChain对象,然后找到proceed()方法。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // 调用下一个拦截器 RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); //调用拦截器中的intercept()方法 // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); } if (response.body() == null) { throw new IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); } return response; }
从上面的代码可以看出来,chain.proceed主要是讲集合中的拦截器遍历出来,然后通过调用每一个拦截器中的intercept()方法,然后获取到response结果,返回。
我们看看CacheInterceptor这个类,找到intercept()方法。
@Override public Response intercept(Chain chain) throws IOException { Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; long now = System.currentTimeMillis(); //创建CacheStrategy.Factory对象,进行缓存配置 CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); //网络请求 Request networkRequest = strategy.networkRequest; //缓存响应 Response cacheResponse = strategy.cacheResponse; if (cache != null) { //记录当前请求是网络发起还是缓存发起 cache.trackResponse(strategy); } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // 不进行网络请求并且缓存不存在或者过期则返回504错误 if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // 不进行网络请求,而且缓存可以使用,直接返回缓存 if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } //进行网络请求 Response networkResponse = null; try { networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } //---------省略若干代码------------- return response; }
上面我做了很多注释,基本的流程是有缓存就取缓存里面的,没有缓存就请求网络。我们来看看网络请求的类CallServerInterceptor
/* * Copyright (C) 2016 Square, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package okhttp3.internal.http; import java.io.IOException; import java.net.ProtocolException; import okhttp3.Interceptor; import okhttp3.Request; import okhttp3.Response; import okhttp3.internal.Util; import okhttp3.internal.connection.RealConnection; import okhttp3.internal.connection.StreamAllocation; import okio.Buffer; import okio.BufferedSink; import okio.ForwardingSink; import okio.Okio; import okio.Sink; /** This is the last interceptor in the chain. It makes a network call to the server. */ public final class CallServerInterceptor implements Interceptor { private final boolean forWebSocket; public CallServerInterceptor(boolean forWebSocket) { this.forWebSocket = forWebSocket; } @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; HttpCodec httpCodec = realChain.httpStream(); StreamAllocation streamAllocation = realChain.streamAllocation(); RealConnection connection = (RealConnection) realChain.connection(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); realChain.eventListener().requestHeadersStart(realChain.call()); httpCodec.writeRequestHeaders(request); realChain.eventListener().requestHeadersEnd(realChain.call(), request); Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { httpCodec.flushRequest(); realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(true); } if (responseBuilder == null) { // Write the request body if the "Expect: 100-continue" expectation was met. realChain.eventListener().requestBodyStart(realChain.call()); long contentLength = request.body().contentLength(); CountingSink requestBodyOut = new CountingSink(httpCodec.createRequestBody(request, contentLength)); BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); realChain.eventListener() .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount); } else if (!connection.isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. streamAllocation.noNewStreams(); } } httpCodec.finishRequest(); if (responseBuilder == null) { realChain.eventListener().responseHeadersStart(realChain.call()); responseBuilder = httpCodec.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response responseBuilder = httpCodec.readResponseHeaders(false); response = responseBuilder .request(request) .handshake(streamAllocation.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } realChain.eventListener() .responseHeadersEnd(realChain.call(), response); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(httpCodec.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { streamAllocation.noNewStreams(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; } }
到这里,基本上okhttp的整个流程就出来了,当然,这里只是一个整体的大概流程,如果要抠的很细,那就不是一篇文章能够说明的了了。现在回过头来再看一眼流程图,是不是感觉特别明朗了。
加载全部内容