上篇介绍了traced运行流程和大体框架,本篇将分析producer的启动流程,以heapprofd为例

代码在src/profiling/memory

从main函数跟踪,很快就看到启动的主体部分。

int StartCentralHeapprofd() {
  // We set this up before launching any threads, so we do not have to use a
  // std::atomic for g_dump_evt.
  g_dump_evt = new base::EventFd();

  base::UnixTaskRunner task_runner;
  base::Watchdog::GetInstance()->Start();  // crash on exceedingly long tasks
  HeapprofdProducer producer(HeapprofdMode::kCentral, &task_runner,
                             /* exit_when_done= */ false);

  int listening_raw_socket = GetListeningSocket();
  auto listening_socket = base::UnixSocket::Listen(
      base::ScopedFile(listening_raw_socket), &producer.socket_delegate(),
      &task_runner, base::SockFamily::kUnix, base::SockType::kStream);

  struct sigaction action = {};
  action.sa_handler = [](int) { g_dump_evt->Notify(); };
  // Allow to trigger a full dump by sending SIGUSR1 to heapprofd.
  // This will allow manually deciding when to dump on userdebug.
  PERFETTO_CHECK(sigaction(SIGUSR1, &action, nullptr) == 0);
  task_runner.AddFileDescriptorWatch(g_dump_evt->fd(), [&producer] {
    g_dump_evt->Clear();
    producer.DumpAll();
  });
  producer.ConnectWithRetries(GetProducerSocket());
  // TODO(fmayer): Create one producer that manages both heapprofd and Java
  // producers, so we do not have two connections to traced.
  JavaHprofProducer java_producer(&task_runner);
  java_producer.ConnectWithRetries(GetProducerSocket());
  task_runner.Run();
  return 0;
}

这一部分首先启了个Watchdog,这部分不详细解释了,

然后创建了HeapprofdProducer对象,接着监听了/dev/socket/heapprofd,base::UnixSocket::Listen上一篇已经提到过,这里不展开,只是要注意,监听socket后续的事件会由producer.socket_delegate()处理,这部分也在后面讨论。

接着注册了信号处理函数处理SIGUSR1,处理方法就是由task_runner去运行producer.DumpAll()

然后执行producer.ConnectWithRetries去连接/dev/socket/traced_producer

下面同样又创建了JavaHprofProducer并去连接/dev/socket/traced_producer

接着就进入了task_runner处理各种异步事件

看HeapprofdProducer::ConnectWithRetries,设置了状态,执行ConnectService

void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
  PERFETTO_DCHECK(state_ == kNotStarted);
  state_ = kNotConnected;

  ResetConnectionBackoff();
  producer_sock_name_ = socket_name;
  ConnectService();
}

再看ConnectService,这里能看到DataSource名称”android.heapprofd”

void HeapprofdProducer::ConnectService() {
  SetProducerEndpoint(ProducerIPCClient::Connect(
      producer_sock_name_, this, "android.heapprofd", task_runner_));
}

ProducerIPCClient::Connect里面直接创建ProducerIPCClientImpl对象,看它的构造函数,这里主要创建了Client的实例,然后调用了BindService。

ipc_channel_ =
    ipc::Client::CreateInstance(std::move(conn_args), task_runner);
ipc_channel_->BindService(producer_port_->GetWeakPtr());

先看CreateInstance,这里可以看到ClientImpl,还记得前面service使用的HostImpl吗,这里正是和它对应的Client实现

std::unique_ptr<Client> Client::CreateInstance(ConnArgs conn_args,
                                               base::TaskRunner* task_runner) {
  std::unique_ptr<Client> client(
      new ClientImpl(std::move(conn_args), task_runner));
  return client;
}

接着看ClientImpl的构造函数,这里用的else里的TryConnect,而TryConnect调用了UnixSocket::Connect,注意这里的第二个参数this是EventListener,即ClientImpl处理socket事件的回调。

ClientImpl::ClientImpl(ConnArgs conn_args, base::TaskRunner* task_runner)
    : socket_name_(conn_args.socket_name),
      socket_retry_(conn_args.retry),
      task_runner_(task_runner),
      weak_ptr_factory_(this) {
  if (conn_args.socket_fd) {
    // Create the client using a connected socket. This code path will never hit
    // OnConnect().
    sock_ = base::UnixSocket::AdoptConnected(
        std::move(conn_args.socket_fd), this, task_runner_, kClientSockFamily,
        base::SockType::kStream, base::SockPeerCredMode::kIgnore);
  } else {
    // Connect using the socket name.
    TryConnect();
  }
}

void ClientImpl::TryConnect() {
  PERFETTO_DCHECK(socket_name_);
  sock_ = base::UnixSocket::Connect(
      socket_name_, this, task_runner_, base::GetSockFamily(socket_name_),
      base::SockType::kStream, base::SockPeerCredMode::kIgnore);
}

UnixSocket::Connect里首先创建了UnixSocket对象,然后调用了它的DoConnect

先看UnixSocket的构造函数,这里adopt_state传进来是kDisconnected,所以会创建一个socket,设为非阻塞,并且把它添加到task_runner_的监听列表中,还记得service socket这里做的是什么吗?前面提到service里adopt_state传进来是kListening。

  state_ = State::kDisconnected;
  if (adopt_state == State::kDisconnected) {
    PERFETTO_DCHECK(!adopt_fd);
    sock_raw_ = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
    if (!sock_raw_)
      return;
  } else if (adopt_state == State::kConnected) {
...
  PERFETTO_CHECK(sock_raw_);
  sock_raw_.SetBlocking(false);
  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
  task_runner_->AddFileDescriptorWatch(sock_raw_.watch_handle(), [weak_ptr] {
    if (weak_ptr)
      weak_ptr->OnEvent();
  });

上面的构造函数并没有连接service,这部分在DoConnect里做了,并且把state_从kDisconnected切换到了kConnecting,这里有一大段注释省略了,大概意思是解释了下面为什么要在task_runner_里执行一次OnEvent,OnEvent是socket状态有变化时的直接回调。

void UnixSocket::DoConnect(const std::string& socket_name) {
  PERFETTO_DCHECK(state_ == State::kDisconnected);

  // This is the only thing that can gracefully fail in the ctor.
  if (!sock_raw_)
    return NotifyConnectionState(false);

  if (!sock_raw_.Connect(socket_name))
    return NotifyConnectionState(false);

  // At this point either connect() succeeded or started asynchronously
  // (errno = EINPROGRESS).
  state_ = State::kConnecting;
...
  WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
  task_runner_->PostTask([weak_ptr] {
    if (weak_ptr)
      weak_ptr->OnEvent();
  });
}

至此socket初始化和连接部分结束了,回到ProducerIPCClient::Connect那里,接着调用了ClientImpl::BindService,还记得前面socket的状态是什么吗?

是kConnecting,所以这里也就是把service_proxy添加到queued_bindings_等后续做处理

void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
  if (!service_proxy)
    return;
  if (!sock_->is_connected()) {
    queued_bindings_.emplace_back(service_proxy);
    return;
  }
  ...
}

这个service_proxy是什么呢,它的类型是protos::gen::ProducerPortProxy,也是protobuf自动生产

它是ProducerIPCClientImpl初始化时创建的用于和service收发消息的代理对象,并且看到这个对象接受了ProducerIPCClientImpl对象做为回调事件的处理

      producer_port_(
          new protos::gen::ProducerPortProxy(this /* event_listener */)),

这之后就会进入task_runner来处理各种异步事件,这里总结一下各个类的关系方便下面继续分析。

最外层的是HeapprofdProducer对象,它包含了对heap profiler的一些业务流的处理,这部分会在其他章节说明。

接下来是ProducerIPCClientImpl对象处理producer客户端相关的业务。它又是protos::gen::ProducerPortProxy的事件监听者。

然后是ClientImpl处理客户端业务。它会使用ProducerPortProxy来和服务端通信。

最后是UnixSocket处理socket事件。

刚才提到socket连接后进入了kConnecting状态,然后task_runner紧接着会回调UnixSocket::OnEvent

来看它的处理,这里正常会执行回调event_listener_->OnConnect(this, true /* connected */),而event_listener_就是ClientImpl

  if (state_ == State::kConnecting) {
    PERFETTO_DCHECK(sock_raw_);
    int sock_err = EINVAL;
    socklen_t err_len = sizeof(sock_err);
    int res =
        getsockopt(sock_raw_.fd(), SOL_SOCKET, SO_ERROR, &sock_err, &err_len);

    if (res == 0 && sock_err == EINPROGRESS)
      return;  // Not connected yet, just a spurious FD watch wakeup.
    if (res == 0 && sock_err == 0) {
      if (peer_cred_mode_ == SockPeerCredMode::kReadOnConnect)
        ReadPeerCredentialsPosix();
      state_ = State::kConnected;
      return event_listener_->OnConnect(this, true /* connected */);
    }
    PERFETTO_DLOG("Connection error: %s", strerror(sock_err));
    Shutdown(false);
    return event_listener_->OnConnect(this, false /* connected */);
  }

看ClientImpl::OnConnect,这里又执行了BindService

  auto queued_bindings = std::move(queued_bindings_);
  queued_bindings_.clear();
  for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings) {
    if (connected) {
      BindService(service_proxy);
    } else if (service_proxy) {
      service_proxy->OnConnect(false /* success */);
    }
  }

接着看BindService另一部分,这里看到组装了带有bind_service消息的protos::gen::IPCFrame并且调用了SendFrame发送,然后把request放进了queued_requests_

  RequestID request_id = ++last_request_id_;
  Frame frame;
  frame.set_request_id(request_id);
  Frame::BindService* req = frame.mutable_msg_bind_service();
  const char* const service_name = service_proxy->GetDescriptor().service_name;
  req->set_service_name(service_name);
  if (!SendFrame(frame)) {
    PERFETTO_DLOG("BindService(%s) failed", service_name);
    return service_proxy->OnConnect(false /* success */);
  }
  QueuedRequest qr;
  qr.type = Frame::kMsgBindServiceFieldNumber;
  qr.request_id = request_id;
  qr.service_proxy = service_proxy;
  queued_requests_.emplace(request_id, std::move(qr));

SendFrame下面就是做了序列化和发送,不细说了,然后看server端处理,也就是traced。

前面章节也提到过服务端收到数据会调用UnixSocket::OnEvent,然后调用它事件监听者即HostImpl的OnDataAvailable,然后调用OnReceivedFrame。

HostImpl::OnReceivedFrame比较简单,它这里会调用OnBindService,这里直接组装msg_bind_service_reply回复,回复中带了service id和method ids。

void HostImpl::OnBindService(ClientConnection* client, const Frame& req_frame) {
  // Binding a service doesn't do anything major. It just returns back the
  // service id and its method map.
  const Frame::BindService& req = req_frame.msg_bind_service();
  Frame reply_frame;
  reply_frame.set_request_id(req_frame.request_id());
  auto* reply = reply_frame.mutable_msg_bind_service_reply();
  const ExposedService* service = GetServiceByName(req.service_name());
  if (service) {
    reply->set_success(true);
    reply->set_service_id(service->id);
    uint32_t method_id = 1;  // method ids start at index 1.
    for (const auto& desc_method : service->instance->GetDescriptor().methods) {
      Frame::BindServiceReply::MethodInfo* method_info = reply->add_methods();
      method_info->set_name(desc_method.name);
      method_info->set_id(method_id++);
    }
  }
  SendFrame(client, reply_frame);
}

接着看客户端的处理,还记得之前发请求时存过queued_requests_,这里先拿到reply对应的request,接着用request和reply调用了OnBindServiceReply

void ClientImpl::OnFrameReceived(const Frame& frame) {
  auto queued_requests_it = queued_requests_.find(frame.request_id());
  if (queued_requests_it == queued_requests_.end()) {
    PERFETTO_DLOG("OnFrameReceived(): got invalid request_id=%" PRIu64,
                  static_cast<uint64_t>(frame.request_id()));
    return;
  }
  QueuedRequest req = std::move(queued_requests_it->second);
  queued_requests_.erase(queued_requests_it);

  if (req.type == Frame::kMsgBindServiceFieldNumber &&
      frame.has_msg_bind_service_reply()) {
    return OnBindServiceReply(std::move(req), frame.msg_bind_service_reply());
  }
  if (req.type == Frame::kMsgInvokeMethodFieldNumber &&
      frame.has_msg_invoke_method_reply()) {
    return OnInvokeMethodReply(std::move(req), frame.msg_invoke_method_reply());
  }
...
}

接着看OnBindServiceReply,这里主要就是用服务端发回的service id和method ids初始化service_proxy,即ProducerPortProxy,目的是请求服务端执行某方法时,确保方法存在。同时存入service_bindings_

void ClientImpl::OnBindServiceReply(QueuedRequest req,
                                    const Frame::BindServiceReply& reply) {
  base::WeakPtr<ServiceProxy>& service_proxy = req.service_proxy;
  if (!service_proxy)
    return;
  const char* svc_name = service_proxy->GetDescriptor().service_name;
  if (!reply.success()) {
    PERFETTO_DLOG("BindService(): unknown service_name=\"%s\"", svc_name);
    return service_proxy->OnConnect(false /* success */);
  }

  auto prev_service = service_bindings_.find(reply.service_id());
  if (prev_service != service_bindings_.end() && prev_service->second.get()) {
    PERFETTO_DLOG(
        "BindService(): Trying to bind service \"%s\" but another service "
        "named \"%s\" is already bound with the same ID.",
        svc_name, prev_service->second->GetDescriptor().service_name);
    return service_proxy->OnConnect(false /* success */);
  }

  // Build the method [name] -> [remote_id] map.
  std::map<std::string, MethodID> methods;
  for (const auto& method : reply.methods()) {
    if (method.name().empty() || method.id() <= 0) {
      PERFETTO_DLOG("OnBindServiceReply(): invalid method \"%s\" -> %" PRIu64,
                    method.name().c_str(), static_cast<uint64_t>(method.id()));
      continue;
    }
    methods[method.name()] = method.id();
  }
  service_proxy->InitializeBinding(weak_ptr_factory_.GetWeakPtr(),
                                   reply.service_id(), std::move(methods));
  service_bindings_[reply.service_id()] = service_proxy;
  service_proxy->OnConnect(true /* success */);
}

接着看service_proxy->OnConnect,ProducerPortProxy继承了ServiceProxy,所以这段代码在ServiceProxy里

还记得这里事件监听者是什么么,没错,是ProducerIPCClientImpl

void ServiceProxy::OnConnect(bool success) {
  if (success) {
    PERFETTO_DCHECK(service_id_);
    return event_listener_->OnConnect();
  }
  return event_listener_->OnDisconnect();
}

来看ProducerIPCClientImpl::OnConnect,这里主要做了两件事,一是调用了ProducerPortProxy的InitializeConnection, 二是调用了GetAsyncCommand,并且分别注册了回调,GetAsyncCommand非常重要,它告诉服务端已经准备好接受后续命令,并且在回调中处理后续命令,例如SetupDataSource,StartTracing等。

void ProducerIPCClientImpl::OnConnect() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  connected_ = true;

  ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
  on_init.Bind(
      [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
        OnConnectionInitialized(
            resp.success(),
            resp.success() ? resp->using_shmem_provided_by_producer() : false,
            resp.success() ? resp->direct_smb_patching_supported() : false,
            resp.success() ? resp->use_shmem_emulation() : false);
      });
  protos::gen::InitializeConnectionRequest req;
  req.set_producer_name(name_);
  req.set_shared_memory_size_hint_bytes(
      static_cast<uint32_t>(shared_memory_size_hint_bytes_));
  req.set_shared_memory_page_size_hint_bytes(
      static_cast<uint32_t>(shared_memory_page_size_hint_bytes_));
  ...

  req.set_sdk_version(base::GetVersionString());
  producer_port_->InitializeConnection(req, std::move(on_init), shm_fd);

  // Create the back channel to receive commands from the Service.
  ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd;
  on_cmd.Bind(
      [this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) {
        if (!resp)
          return;  // The IPC channel was closed and |resp| was auto-rejected.
        OnServiceRequest(*resp);
      });
  producer_port_->GetAsyncCommand(protos::gen::GetAsyncCommandRequest(),
                                  std::move(on_cmd));

  // If there are pending Sync() requests, send them now.
  for (const auto& pending_sync : pending_sync_reqs_)
    Sync(std::move(pending_sync));
  pending_sync_reqs_.clear();
}

接着看生成的ProducerPortProxy::InitializeConnection是如何实现的,这段代码在out/soong/.intermediates/external/perfetto/perfetto_protos_perfetto_ipc_ipc_gen/gen/external/perfetto/protos/perfetto/ipc/producer_port.ipc.cc,它调用了BeginInvoke,实现在ServiceProxy里,前面提到过ProducerPortProxy继承于ServiceProxy

void ProducerPortProxy::InitializeConnection(const InitializeConnectionRequest& request, DeferredInitializeConnectionResponse reply, int fd) {
  BeginInvoke("InitializeConnection", request, ::perfetto::ipc::DeferredBase(std::move(reply)),
              fd);
}

可以看到这里首先看需要执行的方法service端是否支持,也就是前面BindService返回的,可以认为这种做法是为了后续的扩展。

紧接着调用了ClientImpl的BeginInvoke,并且把收到回复的回调放进pending_callbacks_,上面InitializeConnection的回调就是ProducerIPCClientImpl::OnConnectionInitialized

void ServiceProxy::BeginInvoke(const std::string& method_name,
                               const ProtoMessage& request,
                               DeferredBase reply,
                               int fd) {
...
  auto remote_method_it = remote_method_ids_.find(method_name);
  RequestID request_id = 0;
  const bool drop_reply = !reply.IsBound();
  if (remote_method_it != remote_method_ids_.end()) {
    request_id =
        static_cast<ClientImpl*>(client_.get())
            ->BeginInvoke(service_id_, method_name, remote_method_it->second,
                          request, drop_reply, weak_ptr_factory_.GetWeakPtr(),
                          fd);
  } else {
    PERFETTO_DLOG("Cannot find method \"%s\" on the host", method_name.c_str());
  }
...
  pending_callbacks_.emplace(request_id, std::move(reply));
}

接着看ClientImpl::BeginInvoke,主要做了两件事,一是发送IPCFrame带invoke_method消息,一是将请求暂存到queued_requests_,queued_requests_的用处在BindService那里已经说过,后面就不展开了

RequestID ClientImpl::BeginInvoke(ServiceID service_id,
                                  const std::string& method_name,
                                  MethodID remote_method_id,
                                  const ProtoMessage& method_args,
                                  bool drop_reply,
                                  base::WeakPtr<ServiceProxy> service_proxy,
                                  int fd) {
  RequestID request_id = ++last_request_id_;
  Frame frame;
  frame.set_request_id(request_id);
  Frame::InvokeMethod* req = frame.mutable_msg_invoke_method();
...
  if (!SendFrame(frame, fd)) {
    PERFETTO_DLOG("BeginInvoke() failed while sending the frame");
    return 0;
  }
  if (drop_reply)
    return 0;
  QueuedRequest qr;
...
  queued_requests_.emplace(request_id, std::move(qr));
  return request_id;
}

接下来看Server端的处理,从UnixSocket::OnEvent再到HostImpl::OnDataAvailable再到HostImpl::OnReceivedFrame前面有过分析

接着会调用到HostImpl::OnInvokeMethod,这个里面首先会检查请求发来service和method是否存在,接着取出method回调,还记得这是在哪里初始化的吗,在生成的producer_port.ipc.cc里

然后构造了延时回复消息的回调,接着在构造当前请求ClientInfo赋给service,这个地方的service是之前ExposeService生成的,然后调用method.invoker

void HostImpl::OnInvokeMethod(ClientConnection* client,
                              const Frame& req_frame) {
  const Frame::InvokeMethod& req = req_frame.msg_invoke_method();
  Frame reply_frame;
  RequestID request_id = req_frame.request_id();
  reply_frame.set_request_id(request_id);
  reply_frame.mutable_msg_invoke_method_reply()->set_success(false);
  auto svc_it = services_.find(req.service_id());
  if (svc_it == services_.end())
    return SendFrame(client, reply_frame);  // |success| == false by default.

  Service* service = svc_it->second.instance.get();
  const ServiceDescriptor& svc = service->GetDescriptor();
  const auto& methods = svc.methods;
  const uint32_t method_id = req.method_id();
  if (method_id == 0 || method_id > methods.size())
    return SendFrame(client, reply_frame);

  const ServiceDescriptor::Method& method = methods[method_id - 1];
  std::unique_ptr<ProtoMessage> decoded_req_args(
      method.request_proto_decoder(req.args_proto()));
  if (!decoded_req_args)
    return SendFrame(client, reply_frame);

  Deferred<ProtoMessage> deferred_reply;
  base::WeakPtr<HostImpl> host_weak_ptr = weak_ptr_factory_.GetWeakPtr();
  ClientID client_id = client->id;

  if (!req.drop_reply()) {
    deferred_reply.Bind([host_weak_ptr, client_id,
                         request_id](AsyncResult<ProtoMessage> reply) {
      if (!host_weak_ptr)
        return;  // The reply came too late, the HostImpl has gone.
      host_weak_ptr->ReplyToMethodInvocation(client_id, request_id,
                                             std::move(reply));
    });
  }

  auto peer_uid = client->GetPosixPeerUid();
  auto scoped_key = g_crash_key_uid.SetScoped(static_cast<int64_t>(peer_uid));
  service->client_info_ = ClientInfo(
      client->id, peer_uid, client->GetLinuxPeerPid(), client->GetMachineID());
  service->received_fd_ = &client->received_fd;
  method.invoker(service, *decoded_req_args, std::move(deferred_reply));
  service->received_fd_ = nullptr;
  service->client_info_ = ClientInfo();
}

前面提到过method.invoker对应的例如是这个 &_IPC_Invoker<ProducerPort, InitializeConnectionRequest, InitializeConnectionResponse, &ProducerPort::InitializeConnection>,ProducerPort::InitializeConnection对应的正是ProducerIPCService::InitializeConnection

这里先取出之前暂存的client_info,细心的会发现这里不是多线程可重入的,因为每次请求来,client_info都会更新,当然这些异步事件都是在task_runner的工作线程中执行,并没有多线程问题。

这里先判断producer有没有注册过,然后会注册一个新的RemoteProducer放进producers_中,这里会调用core_service_->ConnectProducer,core_service_即为TracingService也是负责tracing业务的核心对象。最后通过response.Resolve来调用回复消息的回调,再看上一段,回调为host_weak_ptr->ReplyToMethodInvocation

void ProducerIPCService::InitializeConnection(
    const protos::gen::InitializeConnectionRequest& req,
    DeferredInitializeConnectionResponse response) {
  const auto& client_info = ipc::Service::client_info();
  const ipc::ClientID ipc_client_id = client_info.client_id();
  PERFETTO_CHECK(ipc_client_id);

  if (producers_.count(ipc_client_id) > 0) {
    PERFETTO_DLOG(
        "The remote Producer is trying to re-initialize the connection");
    return response.Reject();
  }

  // Create a new entry.
  std::unique_ptr<RemoteProducer> producer(new RemoteProducer());
...
  // Copy the data fields to be emitted to trace packets into ClientIdentity.
  ClientIdentity client_identity(client_info.uid(), client_info.pid(),
                                 client_info.machine_id());
  // ConnectProducer will call OnConnect() on the next task.
  producer->service_endpoint = core_service_->ConnectProducer(
      producer.get(), client_identity, req.producer_name(),
      req.shared_memory_size_hint_bytes(),
      /*in_process=*/false, smb_scraping_mode,
      req.shared_memory_page_size_hint_bytes(), std::move(shmem),
      req.sdk_version());
...

  producers_.emplace(ipc_client_id, std::move(producer));
  // Because of the std::move() |producer| is invalid after this point.

  auto async_res =
      ipc::AsyncResult<protos::gen::InitializeConnectionResponse>::Create();
  async_res->set_using_shmem_provided_by_producer(using_producer_shmem);
  async_res->set_direct_smb_patching_supported(true);
  async_res->set_use_shmem_emulation(use_shmem_emulation);
  response.Resolve(std::move(async_res));
}

先看ConnectProducer做了什么,创建了ProducerEndpointImpl对象放进producers_,并且将它的OnConnect放进task_runner的执行列表,再返回ProducerEndpointImpl对象,ProducerIPCService里将这个对象赋给了RemoteProducer的service_endpoint,这里和上层都有producers_,容易看晕,来理一下,TracingServiceImpl因为和tracing业务有关,所以自然想到ProducerEndpointImpl是和tracing业务相关的,然后上层的RemoteProducer会想到是和IPC相关的。后面再看是不是。

std::unique_ptr<TracingService::ProducerEndpoint>
TracingServiceImpl::ConnectProducer(Producer* producer,
                                    const ClientIdentity& client_identity,
                                    const std::string& producer_name,
                                    size_t shared_memory_size_hint_bytes,
                                    bool in_process,
                                    ProducerSMBScrapingMode smb_scraping_mode,
                                    size_t shared_memory_page_size_hint_bytes,
                                    std::unique_ptr<SharedMemory> shm,
                                    const std::string& sdk_version) {
...
  const ProducerID id = GetNextProducerID();
...
  std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
      id, client_identity, this, task_runner_, producer, producer_name,
      sdk_version, in_process, smb_scraping_enabled));
  auto it_and_inserted = producers_.emplace(id, endpoint.get());
...
  // Producer::OnConnect() should run before Producer::OnTracingSetup(). The
  // latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
  auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
  task_runner_->PostTask([weak_ptr] {
    if (weak_ptr)
      weak_ptr->producer_->OnConnect();
  });

  return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
}

在看回复消息的回调ReplyToMethodInvocation,这里比较简单,发送了invoke_method_reply消息

void HostImpl::ReplyToMethodInvocation(ClientID client_id,
                                       RequestID request_id,
                                       AsyncResult<ProtoMessage> reply) {
  auto client_iter = clients_.find(client_id);
  if (client_iter == clients_.end())
    return;  // client has disconnected by the time we got the async reply.

  ClientConnection* client = client_iter->second.get();
  Frame reply_frame;
  reply_frame.set_request_id(request_id);
...
  auto* reply_frame_data = reply_frame.mutable_msg_invoke_method_reply();
...
  SendFrame(client, reply_frame, reply.fd());
}

下面看ClientImpl::OnInvokeMethodReply,这里主要就是拿出service_proxy,并且调用EndInvoke

void ClientImpl::OnInvokeMethodReply(QueuedRequest req,
                                     const Frame::InvokeMethodReply& reply) {
  base::WeakPtr<ServiceProxy> service_proxy = req.service_proxy;
  if (!service_proxy)
    return;
  std::unique_ptr<ProtoMessage> decoded_reply;
...
  const RequestID request_id = req.request_id;
  invoking_method_reply_ = true;
  service_proxy->EndInvoke(request_id, std::move(decoded_reply),
                           reply.has_more());
  invoking_method_reply_ = false;

  // If this is a streaming method and future replies will be resolved, put back
  // the |req| with the callback into the set of active requests.
  if (reply.has_more())
    queued_requests_.emplace(request_id, std::move(req));
}

看EndInvoke的处理,就是调用了一开始调用BeginInvoke时传入的回调,往前看InitializeConnection的回调是什么?

void ServiceProxy::EndInvoke(RequestID request_id,
                             std::unique_ptr<ProtoMessage> result,
                             bool has_more) {
  auto callback_it = pending_callbacks_.find(request_id);
  if (callback_it == pending_callbacks_.end()) {
    // Either we are getting a reply for a method we never invoked, or we are
    // getting a reply to a method marked drop_reply (that has been invoked
    // without binding any callback in the Defererd response object).
    PERFETTO_DFATAL("Unexpected reply received.");
    return;
  }
  DeferredBase& reply_callback = callback_it->second;
  AsyncResult<ProtoMessage> reply(std::move(result), has_more);
  reply_callback.Resolve(std::move(reply));
  if (!has_more)
    pending_callbacks_.erase(callback_it);
}

再贴一次,回调是ProducerIPCClientImpl::OnConnectionInitialized

void ProducerIPCClientImpl::OnConnect() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  connected_ = true;

  ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
  on_init.Bind(
      [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
        OnConnectionInitialized(
            resp.success(),
            resp.success() ? resp->using_shmem_provided_by_producer() : false,
            resp.success() ? resp->direct_smb_patching_supported() : false,
            resp.success() ? resp->use_shmem_emulation() : false);
      });

接着看OnConnectionInitialized,这里就是调用了producer_->OnConnect();,还记得producer_是什么吗?就是在创建ProducerIPCClientImpl时传入的上层事件监听者,也是处理实际producer业务的对象,这里可以是HeapprofdProducer,因为接下来就是注册data source,这是与tracing业务直接相关的。

void ProducerIPCClientImpl::OnConnectionInitialized(
    bool connection_succeeded,
    bool using_shmem_provided_by_producer,
    bool direct_smb_patching_supported,
    bool use_shmem_emulation) {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  // If connection_succeeded == false, the OnDisconnect() call will follow next
  // and there we'll notify the |producer_|. TODO: add a test for this.
  if (!connection_succeeded)
    return;
  is_shmem_provided_by_producer_ = using_shmem_provided_by_producer;
  direct_smb_patching_supported_ = direct_smb_patching_supported;
  // The tracing service may reject using shared memory and tell the client to
  // commit data over the socket. This can happen when the client connects to
  // the service via a relay service:
  // client <-Unix socket-> relay service <- vsock -> tracing service.
  use_shmem_emulation_ = use_shmem_emulation;
  producer_->OnConnect();

  // Bail out if the service failed to adopt our producer-allocated SMB.
  // TODO(eseckler): Handle adoption failure more gracefully.
  if (shared_memory_ && !is_shmem_provided_by_producer_) {
    PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting.");
    Disconnect();
    return;
  }
}

看HeapprofdProducer::OnConnect,能看出接着就是注册data source

void HeapprofdProducer::OnConnect() {
  PERFETTO_DCHECK(state_ == kConnecting);
  state_ = kConnected;
  ResetConnectionBackoff();
  PERFETTO_LOG("Connected to the service, mode [%s].",
               mode_ == HeapprofdMode::kCentral ? "central" : "child");

  DataSourceDescriptor desc;
  desc.set_name(kHeapprofdDataSource);
  desc.set_will_notify_on_stop(true);
  endpoint_->RegisterDataSource(desc);
}

先是调用ProducerEndpointImpl::RegisterDataSource,这里很简单,这里的service_是执行ProducerIPCClient::Connect时传入的ProducerIPCClientImpl

void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
    const DataSourceDescriptor& desc) {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  service_->RegisterDataSource(id_, desc);
}

接着看ProducerIPCClientImpl::RegisterDataSource,可以看到这里的回调比较简单,也就是说对于producer,RegisterDataSource后初始化工作就结束了。

void ProducerIPCClientImpl::RegisterDataSource(
    const DataSourceDescriptor& descriptor) {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  if (!connected_) {
    PERFETTO_DLOG(
        "Cannot RegisterDataSource(), not connected to tracing service");
  }
  protos::gen::RegisterDataSourceRequest req;
  *req.mutable_data_source_descriptor() = descriptor;
  ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response;
  async_response.Bind(
      [](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) {
        if (!response)
          PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
      });
  producer_port_->RegisterDataSource(req, std::move(async_response));
}

接着看service端的处理,这里先拿到对应的的RemoteProducer,这个producer就是在上面处理InitializeConnection时创建的。 然后通过service_endpoint调用RegisterDataSource,这里的service_endpoint和上文client端的调用RegisterDataSource的endpoint_都是ProducerEndpointImpl对象,区别是传递的service_不一样,client端的service_是ProducerIPCClientImpl,而service端就是TracingServiceImpl。这里也能看到producer的注册最终会走到TracingServiceImpl这个主要的业务对象

void ProducerIPCService::RegisterDataSource(
    const protos::gen::RegisterDataSourceRequest& req,
    DeferredRegisterDataSourceResponse response) {
  RemoteProducer* producer = GetProducerForCurrentRequest();
  if (!producer) {
    PERFETTO_DLOG(
        "Producer invoked RegisterDataSource() before InitializeConnection()");
    if (response.IsBound())
      response.Reject();
    return;
  }

  const DataSourceDescriptor& dsd = req.data_source_descriptor();
  GetProducerForCurrentRequest()->service_endpoint->RegisterDataSource(dsd);

  // RegisterDataSource doesn't expect any meaningful response.
  if (response.IsBound()) {
    response.Resolve(
        ipc::AsyncResult<protos::gen::RegisterDataSourceResponse>::Create());
  }
}

这里主要的处理就是将data source添加到data_sources_里

void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
                                            const DataSourceDescriptor& desc) {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  if (desc.name().empty()) {
    PERFETTO_DLOG("Received RegisterDataSource() with empty name");
    return;
  }

  ProducerEndpointImpl* producer = GetProducer(producer_id);
  if (!producer) {
    PERFETTO_DFATAL("Producer not found.");
    return;
  }

  // Check that the producer doesn't register two data sources with the same ID.
  // Note that we tolerate |id| == 0 because until Android T / v22 the |id|
  // field didn't exist.
  for (const auto& kv : data_sources_) {
    if (desc.id() && kv.second.producer_id == producer_id &&
        kv.second.descriptor.id() == desc.id()) {
      PERFETTO_ELOG(
          "Failed to register data source \"%s\". A data source with the same "
          "id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
          desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
          producer_id);
      return;
    }
  }

  PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
                producer_id, desc.name().c_str());

  auto reg_ds = data_sources_.emplace(desc.name(),
                                      RegisteredDataSource{producer_id, desc});

  ...
}

至此,producer的启动初始化工作就结束了,后面就是由consumer来触发trace了,将在下一篇介绍。