接上篇,看task_runner.Run()做了什么

代码在src/base/unix_task_runner.cc

可以看到这里进入了无限循环,先是调用了UpdateWatchTasksLocked,还记得之前看到将socket添加到watch_tasks_里了

void UnixTaskRunner::Run() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  created_thread_id_ = GetThreadId();
  quit_ = false;
  for (;;) {
    int poll_timeout_ms;
    {
      std::lock_guard<std::mutex> lock(lock_);
      if (quit_)
        return;
      poll_timeout_ms = GetDelayMsToNextTaskLocked();
      UpdateWatchTasksLocked();
    }

看下UpdateWatchTasksLocked的实现,这里就能看到处理了watch_tasks_并且把fd添加到了poll_fds_

void UnixTaskRunner::UpdateWatchTasksLocked() {
  PERFETTO_DCHECK_THREAD(thread_checker_);
#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
  if (!watch_tasks_changed_)
    return;
  watch_tasks_changed_ = false;
#endif
  poll_fds_.clear();
  for (auto& it : watch_tasks_) {
    PlatformHandle handle = it.first;
    WatchTask& watch_task = it.second;
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
    if (!watch_task.pending)
      poll_fds_.push_back(handle);
#else
    watch_task.poll_fd_index = poll_fds_.size();
    poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
#endif
  }
}

接着看Run下面的处理,可以看到使用了poll这个阻塞系统调用并且设置了超时,这里当socket fd有事件产生便会退出,下面接着调用了PostFileDescriptorWatches

    platform::BeforeMaybeBlockingSyscall();
    int ret = PERFETTO_EINTR(poll(
        &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
    platform::AfterMaybeBlockingSyscall();
    PERFETTO_CHECK(ret >= 0);
    PostFileDescriptorWatches(0 /*ignored*/);

看PostFileDescriptorWatches,这里主要是把有事件发生的fd(代码中的handle)绑定RunFileDescriptorWatch放进immediate_tasks_用作后续执行

    PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, handle));

void UnixTaskRunner::PostTask(std::function<void()> task) {
  bool was_empty;
  {
    std::lock_guard<std::mutex> lock(lock_);
    was_empty = immediate_tasks_.empty();
    immediate_tasks_.push_back(std::move(task));
  }
  if (was_empty)
    WakeUp();
}

接着看Run最后就调用了RunImmediateAndDelayedTask来执行immediate_tasks_和delayed_tasks_里的任务,这里还没有看到往delayed_tasks_放任务的地方,后续再讨论,这里使用了RunTaskWithWatchdogGuard去执行immediate_task,RunTaskWithWatchdogGuard里面设定了任务最大可执行的时间,然后执行了task,不展开,直接看执行的task就是刚刚的RunFileDescriptorWatch

void UnixTaskRunner::RunImmediateAndDelayedTask() {
  // If locking overhead becomes an issue, add a separate work queue.
  std::function<void()> immediate_task;
  std::function<void()> delayed_task;
  TimeMillis now = GetWallTimeMs();
  {
    std::lock_guard<std::mutex> lock(lock_);
    if (!immediate_tasks_.empty()) {
      immediate_task = std::move(immediate_tasks_.front());
      immediate_tasks_.pop_front();
    }
    ...
  }

  errno = 0;
  if (immediate_task)
    RunTaskWithWatchdogGuard(immediate_task);
  ...
}

看RunFileDescriptorWatch,这里取出了watch_task里的回掉,接着把回调套娃式的用RunTaskWithWatchdogGuard执行,还记得这个回调哪里传入的吗,就是在添加socket时,这个回调就是src/base/unix_socket.cc里的UnixSocket::OnEvent()

void UnixTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
  std::function<void()> task;
  {
    std::lock_guard<std::mutex> lock(lock_);
    auto it = watch_tasks_.find(fd);
    if (it == watch_tasks_.end())
      return;
    WatchTask& watch_task = it->second;
...
    task = watch_task.callback;
  }
  errno = 0;
  RunTaskWithWatchdogGuard(task);
}

看UnixSocket::OnEvent(),这里面根据不同状态做了不同处理,而一开始添加的用于监听的socket就是kListening状态,可以看前面new UnixSocket的那段确认,所以这里看kListening 的处理,做了accept并且又创建了新的UnixSocket对象用作管理connected的socket,然后调用了event_listener_->OnNewIncomingConnection,event_listener_是什么呢?如果再看前面new UnixSocket的那段就知道,event_listener_是HostImpl对象,这里强调一点,HostImpl对象分别对应着producer和consumer socket。

void UnixSocket::OnEvent() {
  ...
  // New incoming connection.
  if (state_ == State::kListening) {
    // There could be more than one incoming connection behind each FD watch
    // notification. Drain'em all.
    for (;;) {
      ScopedFile new_fd(
          PERFETTO_EINTR(accept(sock_raw_.fd(), nullptr, nullptr)));
      if (!new_fd)
        return;
      std::unique_ptr<UnixSocket> new_sock(new UnixSocket(
          event_listener_, task_runner_, std::move(new_fd), State::kConnected,
          sock_raw_.family(), sock_raw_.type(), peer_cred_mode_));
      event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
    }
  }
}

接着看HostImpl的OnNewIncomingConnection,添加了新的client,再结合前面的代码,不同HostImpl对象分别对应着producer和consumer socket。所以上面new UnixSocket时依然传递Listening状态socket里的event_listener_

void HostImpl::OnNewIncomingConnection(
    base::UnixSocket*,
    std::unique_ptr<base::UnixSocket> new_conn) {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  std::unique_ptr<ClientConnection> client(new ClientConnection());
  ClientID client_id = ++last_client_id_;
  clients_by_socket_[new_conn.get()] = client.get();
  client->id = client_id;
  client->sock = std::move(new_conn);
  client->sock->SetTxTimeout(socket_tx_timeout_ms_);
  clients_[client_id] = std::move(client);
}

连接以后就是收发数据了吧,看UnixSocket::OnEvent()的处理,这里调用了HostImpl里的OnDataAvailable

void UnixSocket::OnEvent() {
  if (state_ == State::kDisconnected)
    return;  // Some spurious event, typically queued just before Shutdown().

  if (state_ == State::kConnected)
    return event_listener_->OnDataAvailable(this);

看OnDataAvailable做了啥,这里做了数据的反序列化,不展开,最终的数据是Frame,也即perfetto::protos::gen::IPCFrame,这里看到了第一个用protobuf的地方,它的定义在protos/perfetto/ipc/wire_protocol.proto

void HostImpl::OnDataAvailable(base::UnixSocket* sock) {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  auto it = clients_by_socket_.find(sock);
  if (it == clients_by_socket_.end())
    return;
  ClientConnection* client = it->second;
  BufferedFrameDeserializer& frame_deserializer = client->frame_deserializer;
...
  for (;;) {
    std::unique_ptr<Frame> frame = frame_deserializer.PopNextFrame();
    if (!frame)
      break;
    OnReceivedFrame(client, *frame);
  }
}

看一下wire_protocol.proto的内容,这里面定义了不同的消息,大概可以猜到有绑定服务,并且回复带上服务可提供的方法,然后执行方法等,看OnReceivedFrame里便是对应到不同的Client请求message来调用不同的处理方法,下面先看BindService

message IPCFrame {
  // Client -> Host.
  message BindService { optional string service_name = 1; }

  // Host -> Client.
  message BindServiceReply {
    message MethodInfo {
      optional uint32 id = 1;
      optional string name = 2;
    }
    optional bool success = 1;
    optional uint32 service_id = 2;
    repeated MethodInfo methods = 3;
  }

  // Client -> Host.
  message InvokeMethod {
    // BindServiceReply.id.
    optional uint32 service_id = 1;

    // BindServiceReply.method.id.
    optional uint32 method_id = 2;

    // Proto-encoded request argument.
    optional bytes args_proto = 3;

    // When true the client specifies that a reply is not needed. The use case
    // is a method with an empty, where the client doesn't care about the
    // success/failure of the method invocation and rather prefers avoiding the
    // IPC roundtrip + context switch associated with the reply.
    optional bool drop_reply = 4;
  }
...

这里是不是看到了GetServiceByName来获取ExposedService,而前面也提到过ExposedService。

void HostImpl::OnBindService(ClientConnection* client, const Frame& req_frame) {
  // Binding a service doesn't do anything major. It just returns back the
  // service id and its method map.
  const Frame::BindService& req = req_frame.msg_bind_service();
  Frame reply_frame;
  reply_frame.set_request_id(req_frame.request_id());
  auto* reply = reply_frame.mutable_msg_bind_service_reply();
  const ExposedService* service = GetServiceByName(req.service_name());
  if (service) {
    reply->set_success(true);
    reply->set_service_id(service->id);
    uint32_t method_id = 1;  // method ids start at index 1.
    for (const auto& desc_method : service->instance->GetDescriptor().methods) {
      Frame::BindServiceReply::MethodInfo* method_info = reply->add_methods();
      method_info->set_name(desc_method.name);
      method_info->set_id(method_id++);
    }
  }
  SendFrame(client, reply_frame);
}

再回过头看添加ExposedService的时机,就是ServiceIPCHostImpl::DoStart里,注意producer_ipc_port和consumer_ipc_port_即为HostImpl对象,它们分别添加的服务是ProducerIPCService和ConsumerIPCService。

bool ServiceIPCHostImpl::DoStart() {
...
  svc_ = TracingService::CreateInstance(std::move(shm_factory), task_runner_,
                                        init_opts_);
...
  for (auto& producer_ipc_port : producer_ipc_ports_) {
    bool producer_service_exposed = producer_ipc_port->ExposeService(
        std::unique_ptr<ipc::Service>(new ProducerIPCService(svc_.get())));
    PERFETTO_CHECK(producer_service_exposed);
...
  }

  bool consumer_service_exposed = consumer_ipc_port_->ExposeService(
      std::unique_ptr<ipc::Service>(new ConsumerIPCService(svc_.get())));
  PERFETTO_CHECK(consumer_service_exposed);

  return true;
}

看一下ExposeService,这里通过GetDescriptor得到service name,然后在ProducerIPCService或者ConsumerIPCService找GetDescriptor,是不是发现找不到?

bool HostImpl::ExposeService(std::unique_ptr<Service> service) {
  PERFETTO_DCHECK_THREAD(thread_checker_);
  const std::string& service_name = service->GetDescriptor().service_name;
  if (GetServiceByName(service_name)) {
    PERFETTO_DLOG("Duplicate ExposeService(): %s", service_name.c_str());
    return false;
  }
  service->use_shmem_emulation_ =
      sock() && !base::SockShmemSupported(sock()->family());
  ServiceID sid = ++last_service_id_;
  ExposedService exposed_service(sid, service_name, std::move(service));
  services_.emplace(sid, std::move(exposed_service));
  return true;
}

在src/tracing/ipc/service/producer_ipc_service.h看ProducerIPCService的声明,发现它继承于protobuf ProducerPort

class ProducerIPCService : public protos::gen::ProducerPort {
 public:

继续在ProducerPort里找GetDescriptor,还是找不到,这个地方很特别,它是通过产生protobuf源代码时用plugin去修改了代码来生成的,plugin代码在src/ipc/protoc_plugin/ipc_plugin.cc,Android.bp可以看到如何使用它。

接着看生成的代码out/soong/.intermediates/external/perfetto/perfetto_protos_perfetto_ipc_ipc_gen/gen/external/perfetto/protos/perfetto/ipc/producer_port.ipc.cc,能看到GetDescriptor

const ::perfetto::ipc::ServiceDescriptor& ProducerPort::GetDescriptorStatic() {
  static auto* instance = NewDescriptor();
  return *instance;
}

// Host-side definitions.
ProducerPort::~ProducerPort() = default;

const ::perfetto::ipc::ServiceDescriptor& ProducerPort::GetDescriptor() {
  return GetDescriptorStatic();
}

并且在文件的开头能看到service_name是什么,和重要的数据结构desc->methods的初始化,再回过去看HostImpl::OnBindService也就知道返回给Client的methods是从哪来的了。

  auto* desc = new ::perfetto::ipc::ServiceDescriptor();
  desc->service_name = "ProducerPort";

  desc->methods.emplace_back(::perfetto::ipc::ServiceDescriptor::Method{
     "InitializeConnection",
     &_IPC_Decoder<InitializeConnectionRequest>,
     &_IPC_Decoder<InitializeConnectionResponse>,
     &_IPC_Invoker<ProducerPort, InitializeConnectionRequest, InitializeConnectionResponse, &ProducerPort::InitializeConnection>});

  desc->methods.emplace_back(::perfetto::ipc::ServiceDescriptor::Method{
     "RegisterDataSource",
     &_IPC_Decoder<RegisterDataSourceRequest>,
     &_IPC_Decoder<RegisterDataSourceResponse>,
     &_IPC_Invoker<ProducerPort, RegisterDataSourceRequest, RegisterDataSourceResponse, &ProducerPort::RegisterDataSource>});

  desc->methods.emplace_back(::perfetto::ipc::ServiceDescriptor::Method{
     "UnregisterDataSource",
     &_IPC_Decoder<UnregisterDataSourceRequest>,
     &_IPC_Decoder<UnregisterDataSourceResponse>,
     &_IPC_Invoker<ProducerPort, UnregisterDataSourceRequest, UnregisterDataSourceResponse, &ProducerPort::UnregisterDataSource>});

perfetto::ipc::ServiceDescriptor::Method是一个结构体,不难看出最后一个invoker即为对应的回调函数。

class ServiceDescriptor {
 public:
  struct Method {
    const char* name;

    // DecoderFunc is pointer to a function that takes a string in input
    // containing protobuf encoded data and returns a decoded protobuf message.
    using DecoderFunc = std::unique_ptr<ProtoMessage> (*)(const std::string&);

    // Function pointer to decode the request argument of the method.
    DecoderFunc request_proto_decoder;

    // Function pointer to decoded the reply argument of the method.
    DecoderFunc reply_proto_decoder;

    // Function pointer that dispatches the generic request to the corresponding
    // method implementation.
    using InvokerFunc = void (*)(Service*,
                                 const ProtoMessage& /* request_args */,
                                 DeferredBase /* deferred_reply */);
    InvokerFunc invoker;
  };

继续看HostImpl::OnInvokeMethod的处理,前面已经看到InvokeMethod也是IPCFrame里面的一个消息,可以从它的处理中看到,通过请求的method_id找到对应的method结构体并且调用了invoker

void HostImpl::OnInvokeMethod(ClientConnection* client,
                              const Frame& req_frame) {
...
  const uint32_t method_id = req.method_id();
  if (method_id == 0 || method_id > methods.size())
    return SendFrame(client, reply_frame);

  const ServiceDescriptor::Method& method = methods[method_id - 1];
...
  method.invoker(service, *decoded_req_args, std::move(deferred_reply));
  service->received_fd_ = nullptr;
  service->client_info_ = ClientInfo();
}

例如如果是producer的InitializeConnection方法,那么它对应的函数即是ProducerIPCService::InitializeConnection,可以看到这里调用了core_service_(TracingService)的ConnectProducer来注册producer,而前面为了laze start而注册的builtin producer也是用的这个方法。

// Called by the remote Producer through the IPC channel soon after connecting.
void ProducerIPCService::InitializeConnection(
    const protos::gen::InitializeConnectionRequest& req,
    DeferredInitializeConnectionResponse response) {
  const auto& client_info = ipc::Service::client_info();
  const ipc::ClientID ipc_client_id = client_info.client_id();
  PERFETTO_CHECK(ipc_client_id);

  if (producers_.count(ipc_client_id) > 0) {
    PERFETTO_DLOG(
        "The remote Producer is trying to re-initialize the connection");
    return response.Reject();
  }
...
  // ConnectProducer will call OnConnect() on the next task.
  producer->service_endpoint = core_service_->ConnectProducer(
      producer.get(), client_identity, req.producer_name(),
      req.shared_memory_size_hint_bytes(),
      /*in_process=*/false, smb_scraping_mode,
      req.shared_memory_page_size_hint_bytes(), std::move(shmem),
      req.sdk_version());

至此也就理清了traced大体的框架和通信机制。

以下是简要的模块图以便于理解。

pic1

ServiceIPCHostImpl是在main里创建的一个对象,包含了一个TracingService对象,和对应producer与consumer socket的不同HostImpl对象,

HostImpl里包含了UnixSocket对象用于处理socket状态和事件相关,具体的处理又会回调到HostImpl里的方法

HostImpl里还包含了ProducerIPCService或者ConsumerIPCService用于处理实际的用protobuf定义的客户端消息