OkHttp源码研究以及流量统计可行性分析

业务中需要对流量根据域名来做一下流量细分,由于主要的通信流程都是通过OkHttp来完成的,而且OkHttp是开源的,因此研究一下OkHttp源码的实现结构,评估下改造增加流量统计的可行性

Posted by chchaooo on October 1, 2019

Okhttp源码研究

private void testOkhttp() {
    OkHttpClient client = new OkHttpClient.Builder().build();
    Request request = new Request.Builder().url("https://blog.csdn.net/omyrobin/article/details/81773804").build();
    Call call = client.newCall(request);
    // 同步请求
    Response response = call.execute();
    Log.v("cc", response.toString());

    // 异步请求
    call.enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.v("cc", e.getMessage());
        }
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            Log.v("cc", response.toString());
        }
    });
}

Demo中第一行代码是构造了一个OkHttpClient对象 上面是一个最基本的okhttp的Demo。后面我们将将跟踪这个最简单的demo的源码执行流程,来理解okhttp的内部运行机制。 首先从源码中OkHttpClient.java的注释来观察一下OkHttpClient

// code in OkHttpClient.java
Factory for {@linkplain Call calls}, which can be used to send HTTP requests and read their responses.

OkHttp performs best when you create a single {@code OkHttpClient} instance and reuse it 
  for all of your HTTP calls. This is because each client holds its own connection pool 
  and thread pools. Reusing connections and threads reduces latency and saves memory. 
  Conversely, creating a client for each request wastes resources on idle pools.

OkHttpClient这个类有点总管的意思,负责发送请求,读取回复信息。 且官方推荐尽量重用该对象,因为每个OkHttpClient都有自己的连接池和线程池。 如果每个请求创建一个Client,那么重复的连接池和线程池非常浪费。

其内部变量:

// code in OkHttpClient.java
final Dispatcher dispatcher;
final @Nullable Proxy proxy; // 代理
final List<Protocol> protocols; // 协议
final List<ConnectionSpec> connectionSpecs;
final List<Interceptor> interceptors; 
final List<Interceptor> networkInterceptors;
final EventListener.Factory eventListenerFactory;
final ProxySelector proxySelector;
final CookieJar cookieJar;
final @Nullable Cache cache;
final @Nullable InternalCache internalCache;
final SocketFactory socketFactory;
final @Nullable SSLSocketFactory sslSocketFactory;
final @Nullable CertificateChainCleaner certificateChainCleaner;
final HostnameVerifier hostnameVerifier;
final CertificatePinner certificatePinner;
final Authenticator proxyAuthenticator;
final Authenticator authenticator;
final ConnectionPool connectionPool; // 连接池
final Dns dns; // dns
final boolean followSslRedirects;
final boolean followRedirects;
final boolean retryOnConnectionFailure; // 自动重试
final int connectTimeout; // 连接超时时间
final int readTimeout; // 读取超时时间
final int writeTimeout; // 写入超时时间
final int pingInterval; 

OkHttpClient中有很多与网络请求相关的属性,这里着重说明一下Dispatcher任务调度器,Dispatcher中有三个队列:

  • 准备执行的请求队列 readyAsyncCalls
  • 正在运行的请求队列 runningAsyncCalls;
  • 一个正在运行的同步请求队列 runningSyncCalls
// code in Dispatcher.java
 /** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
/** 这个线程池没有核心线程,线程数量没有限制,空闲60s就会回收*/
public synchronized ExecutorService executorService() {
   if (executorService == null) {
     executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
   }
   return executorService;
}

这里有两个小点: (1)这里的线程池没有核心线程,只要空闲60s,就会被回收 (2)这里的队列是双端队列,两端都可以插入和删除。因此既能够实现FIFO,也可以实现LIFO

接下来Demo中构造了一个request对象

// code in Demo
Request request = new Request.Builder().url("https://blog.csdn.net/omyrobin/article/details/81773804").build();

request对象中包含以下几个属性

// code in Request
final HttpUrl url;
final String method;
final Headers headers;
final @Nullable RequestBody body;
final Object tag;

当前的demo中,method会在Builder()方法中默认设置为“GET”,同时初始化好headers, 然后

// code in Demo
Call call = client.newCall(request);

通过OkHttpClient获取一个Call对象,这个Call对象是什么?

// code in Call.java
/** ( method in OkHttpClient )
 * Prepares the {@code request} to be executed at some point in the future.
 */
@Override public Call newCall(Request request) {
   return RealCall.newRealCall(this, request, false /* for web socket */);
}
// code in RealCall.java
/**
 * method in RealCall
 **/
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
   // Safely publish the Call instance to the EventListener.
   RealCall call = new RealCall(client, originalRequest, forWebSocket);
   call.eventListener = client.eventListenerFactory().create(call);
   return call;
}

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
   this.client = client;
   this.originalRequest = originalRequest;
   this.forWebSocket = forWebSocket;
   this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

可以看到在 RealCall 的构造方法中创建了一个RetryAndFollowUpInterceptor,用于处理请求错误和重定向等,这是 Okhttp 框架的精髓 interceptor chain 中的一环,默认情况下也是第一个拦截器,除非调用 OkHttpClient.Builder#addInterceptor(Interceptor) 来添加全局的拦截器。关于拦截器链的顺序参见 RealCall#getResponseWithInterceptorChain() 方法。

接下来是

// code in Demo
/**
  * call接口中的注释
  * Schedules the request to be executed at some point in the future.
  **/
call.enqueue(new Callback() {
    ...
/**
 * RealCall中的实际实现
 */
@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

可以看到,一个 Call 只能执行一次,否则会抛异常,这里创建了一个 AsyncCall 并将Callback传入,接着再交给任务分发器 Dispatcher 来进一步处理。

从 Dispatcher#enqueue()方法的策略可以看出,对于请求的入队做了一些限制,若正在执行的请求数量小于最大值(默认64),并且此请求所属主机的正在执行任务小于最大值(默认5),就加入正在运行的队列并通过线程池来执行该任务,否则加入准备执行队列中。

小结

现在小结一下

private void testOkhttp() {
    OkHttpClient client = new OkHttpClient.Builder().build();
    Request request = new Request.Builder().url("https://blog.csdn.net/omyrobin/article/details/81773804").build();
    Call call = client.newCall(request);
    call.enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
            Log.v("cc", e.getMessage());
        }
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            Log.v("cc", response.toString());
        }
    });
}

前面的源码体现了以下几个流程: 创建了一个OkHttpClient对象,这个对象里面包含连接池和线程池。它负责总管请求事物。 然后将url信息放到Request对象中,并据此创建一个Call对象(实际的对象类型为RealCall)。 每个Call表示一个请求,该对象只能被执行一次。 向Call对象中添加callback,并启动请求过程。

从理解的角度看,OkHttp所需要管理的是通信请求的过程,抽象出一个Call来作为通信过程的管理,出于复用目的,抽象出一个Client来管理线程和连接池。Call对象要达到复用目的,初始过程需要Client来管理。这些都非常合乎逻辑。 请求有同步和异步之分,OkHttp的同步请求的代码是:Response response = call.execute(); 而异步请求的代码是call.enqueue(new Callback(){..}

// code in RealCall.java
@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  eventListener.callStart(this);
  try {
    client.dispatcher().executed(this);
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } catch (IOException e) {
    eventListener.callFailed(this, e);
    throw e;
  } finally {
    client.dispatcher().finished(this);
  }
}

// code in Dispatcher()
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}
// code in RealCall.java
@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

// code in Dispatcher()
synchronized void enqueue(AsyncCall call) {
  //正在执行的任务数量小于最大值(64),并且此任务所属主机的正在执行任务小于最大值(5)
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    runningAsyncCalls.add(call);
    executorService().execute(call);
  } else {
    readyAsyncCalls.add(call);
  }
}

无论是同步还是异步请求,其过程基本是一样的,将当前的RealCall对象放到Dispatcher的同步或者异步队列中。 然后同步线程直接调用了Response result = getResponseWithInterceptorChain(); 而异步线程则是另起一个线程去执行了AsyncCall中的execute()方法,其内部最终也调用了Response response = getResponseWithInterceptorChain()方法


final class AsyncCall extends NamedRunnable {
   ...
   @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
   }
}

/**
 * Runnable implementation which always sets its thread name.
 * 它的作用有2个:
 *  ① 采用模板方法的设计模式,让子类将具体的操作放在 execute()方法中;
 *  ② 给线程指定一个名字,比如传入模块名称,方便监控线程的活动状态;
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

由于同步请求的代码看起来逻辑更清晰一些,后续看同步请求的流程来理解整体的流程。

拦截器链

Response getResponseWithInterceptorChain() throws IOException {
  // Build a full stack of interceptors.
  List<Interceptor> interceptors = new ArrayList<>();
  // 如果没有手动添加拦截器,这里的client.interceptors()为空,
  // 所以默认情况下retryAndFollowUpInterceptor是第一个拦截器
  interceptors.addAll(client.interceptors());
  interceptors.add(retryAndFollowUpInterceptor);
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  interceptors.add(new CacheInterceptor(client.internalCache()));
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) {
    interceptors.addAll(client.networkInterceptors());
  }
  interceptors.add(new CallServerInterceptor(forWebSocket));

  Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
      originalRequest, this, eventListener, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());

  return chain.proceed(originalRequest);
}

可以看到这里有相当多的拦截器。以下需要关注的是 ① 各个拦截器各自的作用 ② 拦截器间的前后顺序

接下来看proceed方法

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next)
    return response;
}

这里首先关注到这个index是在前面构造函数中传过来的,值为0。传给next的值递增过。该RealInterceptorChain传给下一个interceptor之后,会继续在该interceptor内调用其proceed函数(index值发送了变化)。

// RetryAndFollowUpInterceptor
@Override public Response intercept(Chain chain) throws IOException {
  ... // pre process
  response = realChain.proceed(request, streamAllocation, null, null);
  ... // after process
}

所以上面就通过interceptror.intercept(next)和realChain.proceed(request, streamAllocation, null, null)互相调用,实现了在拦截链自顶向下的逐步活动。 由于各个拦截器的作用不同,调用下一个拦截器的代码在intercept中的位置不同。众多拦截器各自的预处理和后续处理以及调用,整体构成了完整的通信过程

拦截器

retryAndFollowUpInterceptor 这个拦截器就如同它的名字retry and followUp,主要负责错误处理和重定向等问题,比如路由错误、IO异常等。

BridgeInterceptor#intercept() 在这个拦截器中,添加了必要请求头信息,gzip处理等。这个拦截器处理请求信息、cookie、gzip等。

CacheInterceptor

@Override public Response intercept(Chain chain) throws IOException {
  Response cacheCandidate = cache != null
      ? cache.get(chain.request())
      : null;

  long now = System.currentTimeMillis();

  CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
  Request networkRequest = strategy.networkRequest;
  Response cacheResponse = strategy.cacheResponse;

  if (cache != null) {
    cache.trackResponse(strategy);
  }

  if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
  }

  // If we're forbidden from using the network and the cache is insufficient, fail.
  if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
  }

  // If we don't need the network, we're done.
  if (networkRequest == null) {
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
  }
  ...
}

优先先从cache中获取,如果cache中没有,同时无法访问网络,这返回失败; 这个拦截器主要工作是做做缓存处理,如果有有缓存并且缓存可用,那就使用缓存,否则进行调用下一个拦截器 ConnectionInterceptor 进行网络请求,并将响应内容缓存。 这个拦截器说明,拦截器链不一定会调用到底,CacheInterceptor如果发现cache命中了,就会直接返回内容。

ConnectInterceptor 连接拦截器负责与目标建立连接

/** Opens a connection to the target server and proceeds to the next interceptor. */
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

ConnectionInterceptor中建立好与目标的连接即可,然后将连接传给CallServerInterceptor来执行最终实际的网络请求

CallServerInterceptor

/**
 * code in CallServerInterceptor
 * 最终的实际网络请求部分
 */
...
@Override public Response intercept(Chain chain) throws IOException {
  ...
  Response response = responseBuilder
    .request(request)
    .handshake(streamAllocation.connection().handshake())
    .sentRequestAtMillis(sentRequestMillis)
    .receivedResponseAtMillis(System.currentTimeMillis())
    .build();
  ...

这个拦截器比较特别,它没有继续调用realChain.proceed方法将拦截器链继续往下调用,因为在OkHttp官方实现中,这个已经是最后一个拦截器,没有继续往下调的必要了。

从上述的拦截链往回看,

  • 当实际的CallServerInterceptor中的网络response生成之后,调用栈回到ConnectionInterceptor之后,没有任何后续操作(ConnectionInterceptor只需要在请求实际发生之前建立好连接即可)。
  • 调用栈继续往回,走到CacheInterceptor时,拿到response之后,还有一段将response更新到cache中的逻辑(下一次则可以直接从cache中命中)。
  • 调用栈继续往上走会到达BridgeInterceptor,我们回头看一下它的注释 ```java /**
    • Bridges from application code to network code. First it builds a network request from a user
    • request. Then it proceeds to call the network. Finally it builds a user response from the network
    • response. */ ``` 桥接拦截器是一个application code到network code之间的转换器。application code与network code之前的差别主要包括一下几点:cookie、网络传输中信息是否压缩。如果使用了zip,那么实际的请求完成之后,也需要对返回的response进行相应的zip解压缩。
  • 调用栈再继续往上走,RetryAndFollowUpInterceptor中则是判断是否需要进行重定向等操作,如果需要,则设置合适的request到streamAllocation中,下一个while循环中,将会继续执行该重定向请求。

总结

一个通用完整的通信SDK需要考虑到复用、缓存、流量压缩等各个方面。拦截器从本质上去理解,就是一个个操作切面,这一个个切面将拦截器前后的逻辑完全划分开了。 整个通信过程被划分成了一个个简单的多的子区间。从这个意义上,拦截器方式是一种非常有效的复杂流程处理方式,应用场景很广泛。

基于OkHttp做流量统计

如果我们要基于OkHttp来做流量统计,那该做些什么?

  • 方案1:CallServerInterceptor负责了实际通信发送和接收过程中,而且所有实际的网络流量最终实际都会走到这个拦截器,所以可以改造CallServerInterceptor拦截器逻辑,统计其中所有发送的request的大小和收到的response的大小。 此外可以根据request的域名和response中的域名来进行流量分类,区分不同业务类型的流量。 –> 即使获取到Request和Response对象,但是无法转化成实际的流量大小。经过测试,Response中header + body的大小与stream中实际的流量字节数量并不相同。

  • 方案2

    • 找到OkHttp中实际保存有Socket的地方,将该socket进行代理Hook。–> RealConnection
    • 这个Hook方式的核心点是使用SocketWrapper替换掉原本代码中的Socket,在外部看来,他们二者有完全相同的对外接口和对外功能。 在实际实现过程中,使用一个SocketWrapper包装一下Socket,Socket作为SocketWrapper的成员变量,SocketWrapper实现与Socket完全相同的对外接口,其中大部分接口就是调用Socket的实现。 其中少量接口根据需要修改,我们这里要做流量统计,那就修改原本的获取输入输出流的接口,改为返回一个自定义的带有流量统计功能的输入输出流(自定义输入输出流基于原有输入输出流工作)。 –> 这样可以获取到流量的大小
    • 在Socket建立连接的过程中,保存好这个socket所连接的对象 –> 这样可以获取到流量关联的域名
    • 既然是sdk,还需要设计一个良好的对外暴露流量统计数据的接口