perfetto源码解析-基础框架(2)
接上篇,看task_runner.Run()做了什么
代码在src/base/unix_task_runner.cc
可以看到这里进入了无限循环,先是调用了UpdateWatchTasksLocked,还记得之前看到将socket添加到watch_tasks_里了
void UnixTaskRunner::Run() {
PERFETTO_DCHECK_THREAD(thread_checker_);
created_thread_id_ = GetThreadId();
quit_ = false;
for (;;) {
int poll_timeout_ms;
{
std::lock_guard<std::mutex> lock(lock_);
if (quit_)
return;
poll_timeout_ms = GetDelayMsToNextTaskLocked();
UpdateWatchTasksLocked();
}
看下UpdateWatchTasksLocked的实现,这里就能看到处理了watch_tasks_并且把fd添加到了poll_fds_
void UnixTaskRunner::UpdateWatchTasksLocked() {
PERFETTO_DCHECK_THREAD(thread_checker_);
#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
if (!watch_tasks_changed_)
return;
watch_tasks_changed_ = false;
#endif
poll_fds_.clear();
for (auto& it : watch_tasks_) {
PlatformHandle handle = it.first;
WatchTask& watch_task = it.second;
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
if (!watch_task.pending)
poll_fds_.push_back(handle);
#else
watch_task.poll_fd_index = poll_fds_.size();
poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
#endif
}
}
接着看Run下面的处理,可以看到使用了poll这个阻塞系统调用并且设置了超时,这里当socket fd有事件产生便会退出,下面接着调用了PostFileDescriptorWatches
platform::BeforeMaybeBlockingSyscall();
int ret = PERFETTO_EINTR(poll(
&poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
platform::AfterMaybeBlockingSyscall();
PERFETTO_CHECK(ret >= 0);
PostFileDescriptorWatches(0 /*ignored*/);
看PostFileDescriptorWatches,这里主要是把有事件发生的fd(代码中的handle)绑定RunFileDescriptorWatch放进immediate_tasks_用作后续执行
PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, handle));
void UnixTaskRunner::PostTask(std::function<void()> task) {
bool was_empty;
{
std::lock_guard<std::mutex> lock(lock_);
was_empty = immediate_tasks_.empty();
immediate_tasks_.push_back(std::move(task));
}
if (was_empty)
WakeUp();
}
接着看Run最后就调用了RunImmediateAndDelayedTask来执行immediate_tasks_和delayed_tasks_里的任务,这里还没有看到往delayed_tasks_放任务的地方,后续再讨论,这里使用了RunTaskWithWatchdogGuard去执行immediate_task,RunTaskWithWatchdogGuard里面设定了任务最大可执行的时间,然后执行了task,不展开,直接看执行的task就是刚刚的RunFileDescriptorWatch
void UnixTaskRunner::RunImmediateAndDelayedTask() {
// If locking overhead becomes an issue, add a separate work queue.
std::function<void()> immediate_task;
std::function<void()> delayed_task;
TimeMillis now = GetWallTimeMs();
{
std::lock_guard<std::mutex> lock(lock_);
if (!immediate_tasks_.empty()) {
immediate_task = std::move(immediate_tasks_.front());
immediate_tasks_.pop_front();
}
...
}
errno = 0;
if (immediate_task)
RunTaskWithWatchdogGuard(immediate_task);
...
}
看RunFileDescriptorWatch,这里取出了watch_task里的回掉,接着把回调套娃式的用RunTaskWithWatchdogGuard执行,还记得这个回调哪里传入的吗,就是在添加socket时,这个回调就是src/base/unix_socket.cc里的UnixSocket::OnEvent()
void UnixTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
std::function<void()> task;
{
std::lock_guard<std::mutex> lock(lock_);
auto it = watch_tasks_.find(fd);
if (it == watch_tasks_.end())
return;
WatchTask& watch_task = it->second;
...
task = watch_task.callback;
}
errno = 0;
RunTaskWithWatchdogGuard(task);
}
看UnixSocket::OnEvent(),这里面根据不同状态做了不同处理,而一开始添加的用于监听的socket就是kListening状态,可以看前面new UnixSocket的那段确认,所以这里看kListening 的处理,做了accept并且又创建了新的UnixSocket对象用作管理connected的socket,然后调用了event_listener_->OnNewIncomingConnection,event_listener_是什么呢?如果再看前面new UnixSocket的那段就知道,event_listener_是HostImpl对象,这里强调一点,HostImpl对象分别对应着producer和consumer socket。
void UnixSocket::OnEvent() {
...
// New incoming connection.
if (state_ == State::kListening) {
// There could be more than one incoming connection behind each FD watch
// notification. Drain'em all.
for (;;) {
ScopedFile new_fd(
PERFETTO_EINTR(accept(sock_raw_.fd(), nullptr, nullptr)));
if (!new_fd)
return;
std::unique_ptr<UnixSocket> new_sock(new UnixSocket(
event_listener_, task_runner_, std::move(new_fd), State::kConnected,
sock_raw_.family(), sock_raw_.type(), peer_cred_mode_));
event_listener_->OnNewIncomingConnection(this, std::move(new_sock));
}
}
}
接着看HostImpl的OnNewIncomingConnection,添加了新的client,再结合前面的代码,不同HostImpl对象分别对应着producer和consumer socket。所以上面new UnixSocket时依然传递Listening状态socket里的event_listener_
void HostImpl::OnNewIncomingConnection(
base::UnixSocket*,
std::unique_ptr<base::UnixSocket> new_conn) {
PERFETTO_DCHECK_THREAD(thread_checker_);
std::unique_ptr<ClientConnection> client(new ClientConnection());
ClientID client_id = ++last_client_id_;
clients_by_socket_[new_conn.get()] = client.get();
client->id = client_id;
client->sock = std::move(new_conn);
client->sock->SetTxTimeout(socket_tx_timeout_ms_);
clients_[client_id] = std::move(client);
}
连接以后就是收发数据了吧,看UnixSocket::OnEvent()的处理,这里调用了HostImpl里的OnDataAvailable
void UnixSocket::OnEvent() {
if (state_ == State::kDisconnected)
return; // Some spurious event, typically queued just before Shutdown().
if (state_ == State::kConnected)
return event_listener_->OnDataAvailable(this);
看OnDataAvailable做了啥,这里做了数据的反序列化,不展开,最终的数据是Frame,也即perfetto::protos::gen::IPCFrame,这里看到了第一个用protobuf的地方,它的定义在protos/perfetto/ipc/wire_protocol.proto
void HostImpl::OnDataAvailable(base::UnixSocket* sock) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto it = clients_by_socket_.find(sock);
if (it == clients_by_socket_.end())
return;
ClientConnection* client = it->second;
BufferedFrameDeserializer& frame_deserializer = client->frame_deserializer;
...
for (;;) {
std::unique_ptr<Frame> frame = frame_deserializer.PopNextFrame();
if (!frame)
break;
OnReceivedFrame(client, *frame);
}
}
看一下wire_protocol.proto的内容,这里面定义了不同的消息,大概可以猜到有绑定服务,并且回复带上服务可提供的方法,然后执行方法等,看OnReceivedFrame里便是对应到不同的Client请求message来调用不同的处理方法,下面先看BindService
message IPCFrame {
// Client -> Host.
message BindService { optional string service_name = 1; }
// Host -> Client.
message BindServiceReply {
message MethodInfo {
optional uint32 id = 1;
optional string name = 2;
}
optional bool success = 1;
optional uint32 service_id = 2;
repeated MethodInfo methods = 3;
}
// Client -> Host.
message InvokeMethod {
// BindServiceReply.id.
optional uint32 service_id = 1;
// BindServiceReply.method.id.
optional uint32 method_id = 2;
// Proto-encoded request argument.
optional bytes args_proto = 3;
// When true the client specifies that a reply is not needed. The use case
// is a method with an empty, where the client doesn't care about the
// success/failure of the method invocation and rather prefers avoiding the
// IPC roundtrip + context switch associated with the reply.
optional bool drop_reply = 4;
}
...
这里是不是看到了GetServiceByName来获取ExposedService,而前面也提到过ExposedService。
void HostImpl::OnBindService(ClientConnection* client, const Frame& req_frame) {
// Binding a service doesn't do anything major. It just returns back the
// service id and its method map.
const Frame::BindService& req = req_frame.msg_bind_service();
Frame reply_frame;
reply_frame.set_request_id(req_frame.request_id());
auto* reply = reply_frame.mutable_msg_bind_service_reply();
const ExposedService* service = GetServiceByName(req.service_name());
if (service) {
reply->set_success(true);
reply->set_service_id(service->id);
uint32_t method_id = 1; // method ids start at index 1.
for (const auto& desc_method : service->instance->GetDescriptor().methods) {
Frame::BindServiceReply::MethodInfo* method_info = reply->add_methods();
method_info->set_name(desc_method.name);
method_info->set_id(method_id++);
}
}
SendFrame(client, reply_frame);
}
再回过头看添加ExposedService的时机,就是ServiceIPCHostImpl::DoStart里,注意producer_ipc_port和consumer_ipc_port_即为HostImpl对象,它们分别添加的服务是ProducerIPCService和ConsumerIPCService。
bool ServiceIPCHostImpl::DoStart() {
...
svc_ = TracingService::CreateInstance(std::move(shm_factory), task_runner_,
init_opts_);
...
for (auto& producer_ipc_port : producer_ipc_ports_) {
bool producer_service_exposed = producer_ipc_port->ExposeService(
std::unique_ptr<ipc::Service>(new ProducerIPCService(svc_.get())));
PERFETTO_CHECK(producer_service_exposed);
...
}
bool consumer_service_exposed = consumer_ipc_port_->ExposeService(
std::unique_ptr<ipc::Service>(new ConsumerIPCService(svc_.get())));
PERFETTO_CHECK(consumer_service_exposed);
return true;
}
看一下ExposeService,这里通过GetDescriptor得到service name,然后在ProducerIPCService或者ConsumerIPCService找GetDescriptor,是不是发现找不到?
bool HostImpl::ExposeService(std::unique_ptr<Service> service) {
PERFETTO_DCHECK_THREAD(thread_checker_);
const std::string& service_name = service->GetDescriptor().service_name;
if (GetServiceByName(service_name)) {
PERFETTO_DLOG("Duplicate ExposeService(): %s", service_name.c_str());
return false;
}
service->use_shmem_emulation_ =
sock() && !base::SockShmemSupported(sock()->family());
ServiceID sid = ++last_service_id_;
ExposedService exposed_service(sid, service_name, std::move(service));
services_.emplace(sid, std::move(exposed_service));
return true;
}
在src/tracing/ipc/service/producer_ipc_service.h看ProducerIPCService的声明,发现它继承于protobuf ProducerPort
class ProducerIPCService : public protos::gen::ProducerPort {
public:
继续在ProducerPort里找GetDescriptor,还是找不到,这个地方很特别,它是通过产生protobuf源代码时用plugin去修改了代码来生成的,plugin代码在src/ipc/protoc_plugin/ipc_plugin.cc,Android.bp可以看到如何使用它。
接着看生成的代码out/soong/.intermediates/external/perfetto/perfetto_protos_perfetto_ipc_ipc_gen/gen/external/perfetto/protos/perfetto/ipc/producer_port.ipc.cc,能看到GetDescriptor
const ::perfetto::ipc::ServiceDescriptor& ProducerPort::GetDescriptorStatic() {
static auto* instance = NewDescriptor();
return *instance;
}
// Host-side definitions.
ProducerPort::~ProducerPort() = default;
const ::perfetto::ipc::ServiceDescriptor& ProducerPort::GetDescriptor() {
return GetDescriptorStatic();
}
并且在文件的开头能看到service_name是什么,和重要的数据结构desc->methods的初始化,再回过去看HostImpl::OnBindService也就知道返回给Client的methods是从哪来的了。
auto* desc = new ::perfetto::ipc::ServiceDescriptor();
desc->service_name = "ProducerPort";
desc->methods.emplace_back(::perfetto::ipc::ServiceDescriptor::Method{
"InitializeConnection",
&_IPC_Decoder<InitializeConnectionRequest>,
&_IPC_Decoder<InitializeConnectionResponse>,
&_IPC_Invoker<ProducerPort, InitializeConnectionRequest, InitializeConnectionResponse, &ProducerPort::InitializeConnection>});
desc->methods.emplace_back(::perfetto::ipc::ServiceDescriptor::Method{
"RegisterDataSource",
&_IPC_Decoder<RegisterDataSourceRequest>,
&_IPC_Decoder<RegisterDataSourceResponse>,
&_IPC_Invoker<ProducerPort, RegisterDataSourceRequest, RegisterDataSourceResponse, &ProducerPort::RegisterDataSource>});
desc->methods.emplace_back(::perfetto::ipc::ServiceDescriptor::Method{
"UnregisterDataSource",
&_IPC_Decoder<UnregisterDataSourceRequest>,
&_IPC_Decoder<UnregisterDataSourceResponse>,
&_IPC_Invoker<ProducerPort, UnregisterDataSourceRequest, UnregisterDataSourceResponse, &ProducerPort::UnregisterDataSource>});
perfetto::ipc::ServiceDescriptor::Method是一个结构体,不难看出最后一个invoker即为对应的回调函数。
class ServiceDescriptor {
public:
struct Method {
const char* name;
// DecoderFunc is pointer to a function that takes a string in input
// containing protobuf encoded data and returns a decoded protobuf message.
using DecoderFunc = std::unique_ptr<ProtoMessage> (*)(const std::string&);
// Function pointer to decode the request argument of the method.
DecoderFunc request_proto_decoder;
// Function pointer to decoded the reply argument of the method.
DecoderFunc reply_proto_decoder;
// Function pointer that dispatches the generic request to the corresponding
// method implementation.
using InvokerFunc = void (*)(Service*,
const ProtoMessage& /* request_args */,
DeferredBase /* deferred_reply */);
InvokerFunc invoker;
};
继续看HostImpl::OnInvokeMethod的处理,前面已经看到InvokeMethod也是IPCFrame里面的一个消息,可以从它的处理中看到,通过请求的method_id找到对应的method结构体并且调用了invoker
void HostImpl::OnInvokeMethod(ClientConnection* client,
const Frame& req_frame) {
...
const uint32_t method_id = req.method_id();
if (method_id == 0 || method_id > methods.size())
return SendFrame(client, reply_frame);
const ServiceDescriptor::Method& method = methods[method_id - 1];
...
method.invoker(service, *decoded_req_args, std::move(deferred_reply));
service->received_fd_ = nullptr;
service->client_info_ = ClientInfo();
}
例如如果是producer的InitializeConnection方法,那么它对应的函数即是ProducerIPCService::InitializeConnection,可以看到这里调用了core_service_(TracingService)的ConnectProducer来注册producer,而前面为了laze start而注册的builtin producer也是用的这个方法。
// Called by the remote Producer through the IPC channel soon after connecting.
void ProducerIPCService::InitializeConnection(
const protos::gen::InitializeConnectionRequest& req,
DeferredInitializeConnectionResponse response) {
const auto& client_info = ipc::Service::client_info();
const ipc::ClientID ipc_client_id = client_info.client_id();
PERFETTO_CHECK(ipc_client_id);
if (producers_.count(ipc_client_id) > 0) {
PERFETTO_DLOG(
"The remote Producer is trying to re-initialize the connection");
return response.Reject();
}
...
// ConnectProducer will call OnConnect() on the next task.
producer->service_endpoint = core_service_->ConnectProducer(
producer.get(), client_identity, req.producer_name(),
req.shared_memory_size_hint_bytes(),
/*in_process=*/false, smb_scraping_mode,
req.shared_memory_page_size_hint_bytes(), std::move(shmem),
req.sdk_version());
至此也就理清了traced大体的框架和通信机制。
以下是简要的模块图以便于理解。

ServiceIPCHostImpl是在main里创建的一个对象,包含了一个TracingService对象,和对应producer与consumer socket的不同HostImpl对象,
HostImpl里包含了UnixSocket对象用于处理socket状态和事件相关,具体的处理又会回调到HostImpl里的方法
HostImpl里还包含了ProducerIPCService或者ConsumerIPCService用于处理实际的用protobuf定义的客户端消息