perfetto源码解析-基础框架(3)
上篇介绍了traced运行流程和大体框架,本篇将分析producer的启动流程,以heapprofd为例
代码在src/profiling/memory
从main函数跟踪,很快就看到启动的主体部分。
int StartCentralHeapprofd() {
// We set this up before launching any threads, so we do not have to use a
// std::atomic for g_dump_evt.
g_dump_evt = new base::EventFd();
base::UnixTaskRunner task_runner;
base::Watchdog::GetInstance()->Start(); // crash on exceedingly long tasks
HeapprofdProducer producer(HeapprofdMode::kCentral, &task_runner,
/* exit_when_done= */ false);
int listening_raw_socket = GetListeningSocket();
auto listening_socket = base::UnixSocket::Listen(
base::ScopedFile(listening_raw_socket), &producer.socket_delegate(),
&task_runner, base::SockFamily::kUnix, base::SockType::kStream);
struct sigaction action = {};
action.sa_handler = [](int) { g_dump_evt->Notify(); };
// Allow to trigger a full dump by sending SIGUSR1 to heapprofd.
// This will allow manually deciding when to dump on userdebug.
PERFETTO_CHECK(sigaction(SIGUSR1, &action, nullptr) == 0);
task_runner.AddFileDescriptorWatch(g_dump_evt->fd(), [&producer] {
g_dump_evt->Clear();
producer.DumpAll();
});
producer.ConnectWithRetries(GetProducerSocket());
// TODO(fmayer): Create one producer that manages both heapprofd and Java
// producers, so we do not have two connections to traced.
JavaHprofProducer java_producer(&task_runner);
java_producer.ConnectWithRetries(GetProducerSocket());
task_runner.Run();
return 0;
}
这一部分首先启了个Watchdog,这部分不详细解释了,
然后创建了HeapprofdProducer对象,接着监听了/dev/socket/heapprofd,base::UnixSocket::Listen上一篇已经提到过,这里不展开,只是要注意,监听socket后续的事件会由producer.socket_delegate()处理,这部分也在后面讨论。
接着注册了信号处理函数处理SIGUSR1,处理方法就是由task_runner去运行producer.DumpAll()
然后执行producer.ConnectWithRetries去连接/dev/socket/traced_producer
下面同样又创建了JavaHprofProducer并去连接/dev/socket/traced_producer
接着就进入了task_runner处理各种异步事件
看HeapprofdProducer::ConnectWithRetries,设置了状态,执行ConnectService
void HeapprofdProducer::ConnectWithRetries(const char* socket_name) {
PERFETTO_DCHECK(state_ == kNotStarted);
state_ = kNotConnected;
ResetConnectionBackoff();
producer_sock_name_ = socket_name;
ConnectService();
}
再看ConnectService,这里能看到DataSource名称”android.heapprofd”
void HeapprofdProducer::ConnectService() {
SetProducerEndpoint(ProducerIPCClient::Connect(
producer_sock_name_, this, "android.heapprofd", task_runner_));
}
ProducerIPCClient::Connect里面直接创建ProducerIPCClientImpl对象,看它的构造函数,这里主要创建了Client的实例,然后调用了BindService。
ipc_channel_ =
ipc::Client::CreateInstance(std::move(conn_args), task_runner);
ipc_channel_->BindService(producer_port_->GetWeakPtr());
先看CreateInstance,这里可以看到ClientImpl,还记得前面service使用的HostImpl吗,这里正是和它对应的Client实现
std::unique_ptr<Client> Client::CreateInstance(ConnArgs conn_args,
base::TaskRunner* task_runner) {
std::unique_ptr<Client> client(
new ClientImpl(std::move(conn_args), task_runner));
return client;
}
接着看ClientImpl的构造函数,这里用的else里的TryConnect,而TryConnect调用了UnixSocket::Connect,注意这里的第二个参数this是EventListener,即ClientImpl处理socket事件的回调。
ClientImpl::ClientImpl(ConnArgs conn_args, base::TaskRunner* task_runner)
: socket_name_(conn_args.socket_name),
socket_retry_(conn_args.retry),
task_runner_(task_runner),
weak_ptr_factory_(this) {
if (conn_args.socket_fd) {
// Create the client using a connected socket. This code path will never hit
// OnConnect().
sock_ = base::UnixSocket::AdoptConnected(
std::move(conn_args.socket_fd), this, task_runner_, kClientSockFamily,
base::SockType::kStream, base::SockPeerCredMode::kIgnore);
} else {
// Connect using the socket name.
TryConnect();
}
}
void ClientImpl::TryConnect() {
PERFETTO_DCHECK(socket_name_);
sock_ = base::UnixSocket::Connect(
socket_name_, this, task_runner_, base::GetSockFamily(socket_name_),
base::SockType::kStream, base::SockPeerCredMode::kIgnore);
}
UnixSocket::Connect里首先创建了UnixSocket对象,然后调用了它的DoConnect
先看UnixSocket的构造函数,这里adopt_state传进来是kDisconnected,所以会创建一个socket,设为非阻塞,并且把它添加到task_runner_的监听列表中,还记得service socket这里做的是什么吗?前面提到service里adopt_state传进来是kListening。
state_ = State::kDisconnected;
if (adopt_state == State::kDisconnected) {
PERFETTO_DCHECK(!adopt_fd);
sock_raw_ = UnixSocketRaw::CreateMayFail(sock_family, sock_type);
if (!sock_raw_)
return;
} else if (adopt_state == State::kConnected) {
...
PERFETTO_CHECK(sock_raw_);
sock_raw_.SetBlocking(false);
WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
task_runner_->AddFileDescriptorWatch(sock_raw_.watch_handle(), [weak_ptr] {
if (weak_ptr)
weak_ptr->OnEvent();
});
上面的构造函数并没有连接service,这部分在DoConnect里做了,并且把state_从kDisconnected切换到了kConnecting,这里有一大段注释省略了,大概意思是解释了下面为什么要在task_runner_里执行一次OnEvent,OnEvent是socket状态有变化时的直接回调。
void UnixSocket::DoConnect(const std::string& socket_name) {
PERFETTO_DCHECK(state_ == State::kDisconnected);
// This is the only thing that can gracefully fail in the ctor.
if (!sock_raw_)
return NotifyConnectionState(false);
if (!sock_raw_.Connect(socket_name))
return NotifyConnectionState(false);
// At this point either connect() succeeded or started asynchronously
// (errno = EINPROGRESS).
state_ = State::kConnecting;
...
WeakPtr<UnixSocket> weak_ptr = weak_ptr_factory_.GetWeakPtr();
task_runner_->PostTask([weak_ptr] {
if (weak_ptr)
weak_ptr->OnEvent();
});
}
至此socket初始化和连接部分结束了,回到ProducerIPCClient::Connect那里,接着调用了ClientImpl::BindService,还记得前面socket的状态是什么吗?
是kConnecting,所以这里也就是把service_proxy添加到queued_bindings_等后续做处理
void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
if (!service_proxy)
return;
if (!sock_->is_connected()) {
queued_bindings_.emplace_back(service_proxy);
return;
}
...
}
这个service_proxy是什么呢,它的类型是protos::gen::ProducerPortProxy,也是protobuf自动生产
它是ProducerIPCClientImpl初始化时创建的用于和service收发消息的代理对象,并且看到这个对象接受了ProducerIPCClientImpl对象做为回调事件的处理
producer_port_(
new protos::gen::ProducerPortProxy(this /* event_listener */)),
这之后就会进入task_runner来处理各种异步事件,这里总结一下各个类的关系方便下面继续分析。
最外层的是HeapprofdProducer对象,它包含了对heap profiler的一些业务流的处理,这部分会在其他章节说明。
接下来是ProducerIPCClientImpl对象处理producer客户端相关的业务。它又是protos::gen::ProducerPortProxy的事件监听者。
然后是ClientImpl处理客户端业务。它会使用ProducerPortProxy来和服务端通信。
最后是UnixSocket处理socket事件。
刚才提到socket连接后进入了kConnecting状态,然后task_runner紧接着会回调UnixSocket::OnEvent
来看它的处理,这里正常会执行回调event_listener_->OnConnect(this, true /* connected */),而event_listener_就是ClientImpl
if (state_ == State::kConnecting) {
PERFETTO_DCHECK(sock_raw_);
int sock_err = EINVAL;
socklen_t err_len = sizeof(sock_err);
int res =
getsockopt(sock_raw_.fd(), SOL_SOCKET, SO_ERROR, &sock_err, &err_len);
if (res == 0 && sock_err == EINPROGRESS)
return; // Not connected yet, just a spurious FD watch wakeup.
if (res == 0 && sock_err == 0) {
if (peer_cred_mode_ == SockPeerCredMode::kReadOnConnect)
ReadPeerCredentialsPosix();
state_ = State::kConnected;
return event_listener_->OnConnect(this, true /* connected */);
}
PERFETTO_DLOG("Connection error: %s", strerror(sock_err));
Shutdown(false);
return event_listener_->OnConnect(this, false /* connected */);
}
看ClientImpl::OnConnect,这里又执行了BindService
auto queued_bindings = std::move(queued_bindings_);
queued_bindings_.clear();
for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings) {
if (connected) {
BindService(service_proxy);
} else if (service_proxy) {
service_proxy->OnConnect(false /* success */);
}
}
接着看BindService另一部分,这里看到组装了带有bind_service消息的protos::gen::IPCFrame并且调用了SendFrame发送,然后把request放进了queued_requests_
RequestID request_id = ++last_request_id_;
Frame frame;
frame.set_request_id(request_id);
Frame::BindService* req = frame.mutable_msg_bind_service();
const char* const service_name = service_proxy->GetDescriptor().service_name;
req->set_service_name(service_name);
if (!SendFrame(frame)) {
PERFETTO_DLOG("BindService(%s) failed", service_name);
return service_proxy->OnConnect(false /* success */);
}
QueuedRequest qr;
qr.type = Frame::kMsgBindServiceFieldNumber;
qr.request_id = request_id;
qr.service_proxy = service_proxy;
queued_requests_.emplace(request_id, std::move(qr));
SendFrame下面就是做了序列化和发送,不细说了,然后看server端处理,也就是traced。
前面章节也提到过服务端收到数据会调用UnixSocket::OnEvent,然后调用它事件监听者即HostImpl的OnDataAvailable,然后调用OnReceivedFrame。
HostImpl::OnReceivedFrame比较简单,它这里会调用OnBindService,这里直接组装msg_bind_service_reply回复,回复中带了service id和method ids。
void HostImpl::OnBindService(ClientConnection* client, const Frame& req_frame) {
// Binding a service doesn't do anything major. It just returns back the
// service id and its method map.
const Frame::BindService& req = req_frame.msg_bind_service();
Frame reply_frame;
reply_frame.set_request_id(req_frame.request_id());
auto* reply = reply_frame.mutable_msg_bind_service_reply();
const ExposedService* service = GetServiceByName(req.service_name());
if (service) {
reply->set_success(true);
reply->set_service_id(service->id);
uint32_t method_id = 1; // method ids start at index 1.
for (const auto& desc_method : service->instance->GetDescriptor().methods) {
Frame::BindServiceReply::MethodInfo* method_info = reply->add_methods();
method_info->set_name(desc_method.name);
method_info->set_id(method_id++);
}
}
SendFrame(client, reply_frame);
}
接着看客户端的处理,还记得之前发请求时存过queued_requests_,这里先拿到reply对应的request,接着用request和reply调用了OnBindServiceReply
void ClientImpl::OnFrameReceived(const Frame& frame) {
auto queued_requests_it = queued_requests_.find(frame.request_id());
if (queued_requests_it == queued_requests_.end()) {
PERFETTO_DLOG("OnFrameReceived(): got invalid request_id=%" PRIu64,
static_cast<uint64_t>(frame.request_id()));
return;
}
QueuedRequest req = std::move(queued_requests_it->second);
queued_requests_.erase(queued_requests_it);
if (req.type == Frame::kMsgBindServiceFieldNumber &&
frame.has_msg_bind_service_reply()) {
return OnBindServiceReply(std::move(req), frame.msg_bind_service_reply());
}
if (req.type == Frame::kMsgInvokeMethodFieldNumber &&
frame.has_msg_invoke_method_reply()) {
return OnInvokeMethodReply(std::move(req), frame.msg_invoke_method_reply());
}
...
}
接着看OnBindServiceReply,这里主要就是用服务端发回的service id和method ids初始化service_proxy,即ProducerPortProxy,目的是请求服务端执行某方法时,确保方法存在。同时存入service_bindings_
void ClientImpl::OnBindServiceReply(QueuedRequest req,
const Frame::BindServiceReply& reply) {
base::WeakPtr<ServiceProxy>& service_proxy = req.service_proxy;
if (!service_proxy)
return;
const char* svc_name = service_proxy->GetDescriptor().service_name;
if (!reply.success()) {
PERFETTO_DLOG("BindService(): unknown service_name=\"%s\"", svc_name);
return service_proxy->OnConnect(false /* success */);
}
auto prev_service = service_bindings_.find(reply.service_id());
if (prev_service != service_bindings_.end() && prev_service->second.get()) {
PERFETTO_DLOG(
"BindService(): Trying to bind service \"%s\" but another service "
"named \"%s\" is already bound with the same ID.",
svc_name, prev_service->second->GetDescriptor().service_name);
return service_proxy->OnConnect(false /* success */);
}
// Build the method [name] -> [remote_id] map.
std::map<std::string, MethodID> methods;
for (const auto& method : reply.methods()) {
if (method.name().empty() || method.id() <= 0) {
PERFETTO_DLOG("OnBindServiceReply(): invalid method \"%s\" -> %" PRIu64,
method.name().c_str(), static_cast<uint64_t>(method.id()));
continue;
}
methods[method.name()] = method.id();
}
service_proxy->InitializeBinding(weak_ptr_factory_.GetWeakPtr(),
reply.service_id(), std::move(methods));
service_bindings_[reply.service_id()] = service_proxy;
service_proxy->OnConnect(true /* success */);
}
接着看service_proxy->OnConnect,ProducerPortProxy继承了ServiceProxy,所以这段代码在ServiceProxy里
还记得这里事件监听者是什么么,没错,是ProducerIPCClientImpl
void ServiceProxy::OnConnect(bool success) {
if (success) {
PERFETTO_DCHECK(service_id_);
return event_listener_->OnConnect();
}
return event_listener_->OnDisconnect();
}
来看ProducerIPCClientImpl::OnConnect,这里主要做了两件事,一是调用了ProducerPortProxy的InitializeConnection, 二是调用了GetAsyncCommand,并且分别注册了回调,GetAsyncCommand非常重要,它告诉服务端已经准备好接受后续命令,并且在回调中处理后续命令,例如SetupDataSource,StartTracing等。
void ProducerIPCClientImpl::OnConnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
connected_ = true;
ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
on_init.Bind(
[this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
OnConnectionInitialized(
resp.success(),
resp.success() ? resp->using_shmem_provided_by_producer() : false,
resp.success() ? resp->direct_smb_patching_supported() : false,
resp.success() ? resp->use_shmem_emulation() : false);
});
protos::gen::InitializeConnectionRequest req;
req.set_producer_name(name_);
req.set_shared_memory_size_hint_bytes(
static_cast<uint32_t>(shared_memory_size_hint_bytes_));
req.set_shared_memory_page_size_hint_bytes(
static_cast<uint32_t>(shared_memory_page_size_hint_bytes_));
...
req.set_sdk_version(base::GetVersionString());
producer_port_->InitializeConnection(req, std::move(on_init), shm_fd);
// Create the back channel to receive commands from the Service.
ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd;
on_cmd.Bind(
[this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) {
if (!resp)
return; // The IPC channel was closed and |resp| was auto-rejected.
OnServiceRequest(*resp);
});
producer_port_->GetAsyncCommand(protos::gen::GetAsyncCommandRequest(),
std::move(on_cmd));
// If there are pending Sync() requests, send them now.
for (const auto& pending_sync : pending_sync_reqs_)
Sync(std::move(pending_sync));
pending_sync_reqs_.clear();
}
接着看生成的ProducerPortProxy::InitializeConnection是如何实现的,这段代码在out/soong/.intermediates/external/perfetto/perfetto_protos_perfetto_ipc_ipc_gen/gen/external/perfetto/protos/perfetto/ipc/producer_port.ipc.cc,它调用了BeginInvoke,实现在ServiceProxy里,前面提到过ProducerPortProxy继承于ServiceProxy
void ProducerPortProxy::InitializeConnection(const InitializeConnectionRequest& request, DeferredInitializeConnectionResponse reply, int fd) {
BeginInvoke("InitializeConnection", request, ::perfetto::ipc::DeferredBase(std::move(reply)),
fd);
}
可以看到这里首先看需要执行的方法service端是否支持,也就是前面BindService返回的,可以认为这种做法是为了后续的扩展。
紧接着调用了ClientImpl的BeginInvoke,并且把收到回复的回调放进pending_callbacks_,上面InitializeConnection的回调就是ProducerIPCClientImpl::OnConnectionInitialized
void ServiceProxy::BeginInvoke(const std::string& method_name,
const ProtoMessage& request,
DeferredBase reply,
int fd) {
...
auto remote_method_it = remote_method_ids_.find(method_name);
RequestID request_id = 0;
const bool drop_reply = !reply.IsBound();
if (remote_method_it != remote_method_ids_.end()) {
request_id =
static_cast<ClientImpl*>(client_.get())
->BeginInvoke(service_id_, method_name, remote_method_it->second,
request, drop_reply, weak_ptr_factory_.GetWeakPtr(),
fd);
} else {
PERFETTO_DLOG("Cannot find method \"%s\" on the host", method_name.c_str());
}
...
pending_callbacks_.emplace(request_id, std::move(reply));
}
接着看ClientImpl::BeginInvoke,主要做了两件事,一是发送IPCFrame带invoke_method消息,一是将请求暂存到queued_requests_,queued_requests_的用处在BindService那里已经说过,后面就不展开了
RequestID ClientImpl::BeginInvoke(ServiceID service_id,
const std::string& method_name,
MethodID remote_method_id,
const ProtoMessage& method_args,
bool drop_reply,
base::WeakPtr<ServiceProxy> service_proxy,
int fd) {
RequestID request_id = ++last_request_id_;
Frame frame;
frame.set_request_id(request_id);
Frame::InvokeMethod* req = frame.mutable_msg_invoke_method();
...
if (!SendFrame(frame, fd)) {
PERFETTO_DLOG("BeginInvoke() failed while sending the frame");
return 0;
}
if (drop_reply)
return 0;
QueuedRequest qr;
...
queued_requests_.emplace(request_id, std::move(qr));
return request_id;
}
接下来看Server端的处理,从UnixSocket::OnEvent再到HostImpl::OnDataAvailable再到HostImpl::OnReceivedFrame前面有过分析
接着会调用到HostImpl::OnInvokeMethod,这个里面首先会检查请求发来service和method是否存在,接着取出method回调,还记得这是在哪里初始化的吗,在生成的producer_port.ipc.cc里
然后构造了延时回复消息的回调,接着在构造当前请求ClientInfo赋给service,这个地方的service是之前ExposeService生成的,然后调用method.invoker
void HostImpl::OnInvokeMethod(ClientConnection* client,
const Frame& req_frame) {
const Frame::InvokeMethod& req = req_frame.msg_invoke_method();
Frame reply_frame;
RequestID request_id = req_frame.request_id();
reply_frame.set_request_id(request_id);
reply_frame.mutable_msg_invoke_method_reply()->set_success(false);
auto svc_it = services_.find(req.service_id());
if (svc_it == services_.end())
return SendFrame(client, reply_frame); // |success| == false by default.
Service* service = svc_it->second.instance.get();
const ServiceDescriptor& svc = service->GetDescriptor();
const auto& methods = svc.methods;
const uint32_t method_id = req.method_id();
if (method_id == 0 || method_id > methods.size())
return SendFrame(client, reply_frame);
const ServiceDescriptor::Method& method = methods[method_id - 1];
std::unique_ptr<ProtoMessage> decoded_req_args(
method.request_proto_decoder(req.args_proto()));
if (!decoded_req_args)
return SendFrame(client, reply_frame);
Deferred<ProtoMessage> deferred_reply;
base::WeakPtr<HostImpl> host_weak_ptr = weak_ptr_factory_.GetWeakPtr();
ClientID client_id = client->id;
if (!req.drop_reply()) {
deferred_reply.Bind([host_weak_ptr, client_id,
request_id](AsyncResult<ProtoMessage> reply) {
if (!host_weak_ptr)
return; // The reply came too late, the HostImpl has gone.
host_weak_ptr->ReplyToMethodInvocation(client_id, request_id,
std::move(reply));
});
}
auto peer_uid = client->GetPosixPeerUid();
auto scoped_key = g_crash_key_uid.SetScoped(static_cast<int64_t>(peer_uid));
service->client_info_ = ClientInfo(
client->id, peer_uid, client->GetLinuxPeerPid(), client->GetMachineID());
service->received_fd_ = &client->received_fd;
method.invoker(service, *decoded_req_args, std::move(deferred_reply));
service->received_fd_ = nullptr;
service->client_info_ = ClientInfo();
}
前面提到过method.invoker对应的例如是这个 &_IPC_Invoker<ProducerPort, InitializeConnectionRequest, InitializeConnectionResponse, &ProducerPort::InitializeConnection>,ProducerPort::InitializeConnection对应的正是ProducerIPCService::InitializeConnection
这里先取出之前暂存的client_info,细心的会发现这里不是多线程可重入的,因为每次请求来,client_info都会更新,当然这些异步事件都是在task_runner的工作线程中执行,并没有多线程问题。
这里先判断producer有没有注册过,然后会注册一个新的RemoteProducer放进producers_中,这里会调用core_service_->ConnectProducer,core_service_即为TracingService也是负责tracing业务的核心对象。最后通过response.Resolve来调用回复消息的回调,再看上一段,回调为host_weak_ptr->ReplyToMethodInvocation
void ProducerIPCService::InitializeConnection(
const protos::gen::InitializeConnectionRequest& req,
DeferredInitializeConnectionResponse response) {
const auto& client_info = ipc::Service::client_info();
const ipc::ClientID ipc_client_id = client_info.client_id();
PERFETTO_CHECK(ipc_client_id);
if (producers_.count(ipc_client_id) > 0) {
PERFETTO_DLOG(
"The remote Producer is trying to re-initialize the connection");
return response.Reject();
}
// Create a new entry.
std::unique_ptr<RemoteProducer> producer(new RemoteProducer());
...
// Copy the data fields to be emitted to trace packets into ClientIdentity.
ClientIdentity client_identity(client_info.uid(), client_info.pid(),
client_info.machine_id());
// ConnectProducer will call OnConnect() on the next task.
producer->service_endpoint = core_service_->ConnectProducer(
producer.get(), client_identity, req.producer_name(),
req.shared_memory_size_hint_bytes(),
/*in_process=*/false, smb_scraping_mode,
req.shared_memory_page_size_hint_bytes(), std::move(shmem),
req.sdk_version());
...
producers_.emplace(ipc_client_id, std::move(producer));
// Because of the std::move() |producer| is invalid after this point.
auto async_res =
ipc::AsyncResult<protos::gen::InitializeConnectionResponse>::Create();
async_res->set_using_shmem_provided_by_producer(using_producer_shmem);
async_res->set_direct_smb_patching_supported(true);
async_res->set_use_shmem_emulation(use_shmem_emulation);
response.Resolve(std::move(async_res));
}
先看ConnectProducer做了什么,创建了ProducerEndpointImpl对象放进producers_,并且将它的OnConnect放进task_runner的执行列表,再返回ProducerEndpointImpl对象,ProducerIPCService里将这个对象赋给了RemoteProducer的service_endpoint,这里和上层都有producers_,容易看晕,来理一下,TracingServiceImpl因为和tracing业务有关,所以自然想到ProducerEndpointImpl是和tracing业务相关的,然后上层的RemoteProducer会想到是和IPC相关的。后面再看是不是。
std::unique_ptr<TracingService::ProducerEndpoint>
TracingServiceImpl::ConnectProducer(Producer* producer,
const ClientIdentity& client_identity,
const std::string& producer_name,
size_t shared_memory_size_hint_bytes,
bool in_process,
ProducerSMBScrapingMode smb_scraping_mode,
size_t shared_memory_page_size_hint_bytes,
std::unique_ptr<SharedMemory> shm,
const std::string& sdk_version) {
...
const ProducerID id = GetNextProducerID();
...
std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
id, client_identity, this, task_runner_, producer, producer_name,
sdk_version, in_process, smb_scraping_enabled));
auto it_and_inserted = producers_.emplace(id, endpoint.get());
...
// Producer::OnConnect() should run before Producer::OnTracingSetup(). The
// latter may be posted by SetupSharedMemory() below, so post OnConnect() now.
auto weak_ptr = endpoint->weak_ptr_factory_.GetWeakPtr();
task_runner_->PostTask([weak_ptr] {
if (weak_ptr)
weak_ptr->producer_->OnConnect();
});
return std::unique_ptr<ProducerEndpoint>(std::move(endpoint));
}
在看回复消息的回调ReplyToMethodInvocation,这里比较简单,发送了invoke_method_reply消息
void HostImpl::ReplyToMethodInvocation(ClientID client_id,
RequestID request_id,
AsyncResult<ProtoMessage> reply) {
auto client_iter = clients_.find(client_id);
if (client_iter == clients_.end())
return; // client has disconnected by the time we got the async reply.
ClientConnection* client = client_iter->second.get();
Frame reply_frame;
reply_frame.set_request_id(request_id);
...
auto* reply_frame_data = reply_frame.mutable_msg_invoke_method_reply();
...
SendFrame(client, reply_frame, reply.fd());
}
下面看ClientImpl::OnInvokeMethodReply,这里主要就是拿出service_proxy,并且调用EndInvoke
void ClientImpl::OnInvokeMethodReply(QueuedRequest req,
const Frame::InvokeMethodReply& reply) {
base::WeakPtr<ServiceProxy> service_proxy = req.service_proxy;
if (!service_proxy)
return;
std::unique_ptr<ProtoMessage> decoded_reply;
...
const RequestID request_id = req.request_id;
invoking_method_reply_ = true;
service_proxy->EndInvoke(request_id, std::move(decoded_reply),
reply.has_more());
invoking_method_reply_ = false;
// If this is a streaming method and future replies will be resolved, put back
// the |req| with the callback into the set of active requests.
if (reply.has_more())
queued_requests_.emplace(request_id, std::move(req));
}
看EndInvoke的处理,就是调用了一开始调用BeginInvoke时传入的回调,往前看InitializeConnection的回调是什么?
void ServiceProxy::EndInvoke(RequestID request_id,
std::unique_ptr<ProtoMessage> result,
bool has_more) {
auto callback_it = pending_callbacks_.find(request_id);
if (callback_it == pending_callbacks_.end()) {
// Either we are getting a reply for a method we never invoked, or we are
// getting a reply to a method marked drop_reply (that has been invoked
// without binding any callback in the Defererd response object).
PERFETTO_DFATAL("Unexpected reply received.");
return;
}
DeferredBase& reply_callback = callback_it->second;
AsyncResult<ProtoMessage> reply(std::move(result), has_more);
reply_callback.Resolve(std::move(reply));
if (!has_more)
pending_callbacks_.erase(callback_it);
}
再贴一次,回调是ProducerIPCClientImpl::OnConnectionInitialized
void ProducerIPCClientImpl::OnConnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
connected_ = true;
ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
on_init.Bind(
[this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
OnConnectionInitialized(
resp.success(),
resp.success() ? resp->using_shmem_provided_by_producer() : false,
resp.success() ? resp->direct_smb_patching_supported() : false,
resp.success() ? resp->use_shmem_emulation() : false);
});
接着看OnConnectionInitialized,这里就是调用了producer_->OnConnect();,还记得producer_是什么吗?就是在创建ProducerIPCClientImpl时传入的上层事件监听者,也是处理实际producer业务的对象,这里可以是HeapprofdProducer,因为接下来就是注册data source,这是与tracing业务直接相关的。
void ProducerIPCClientImpl::OnConnectionInitialized(
bool connection_succeeded,
bool using_shmem_provided_by_producer,
bool direct_smb_patching_supported,
bool use_shmem_emulation) {
PERFETTO_DCHECK_THREAD(thread_checker_);
// If connection_succeeded == false, the OnDisconnect() call will follow next
// and there we'll notify the |producer_|. TODO: add a test for this.
if (!connection_succeeded)
return;
is_shmem_provided_by_producer_ = using_shmem_provided_by_producer;
direct_smb_patching_supported_ = direct_smb_patching_supported;
// The tracing service may reject using shared memory and tell the client to
// commit data over the socket. This can happen when the client connects to
// the service via a relay service:
// client <-Unix socket-> relay service <- vsock -> tracing service.
use_shmem_emulation_ = use_shmem_emulation;
producer_->OnConnect();
// Bail out if the service failed to adopt our producer-allocated SMB.
// TODO(eseckler): Handle adoption failure more gracefully.
if (shared_memory_ && !is_shmem_provided_by_producer_) {
PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting.");
Disconnect();
return;
}
}
看HeapprofdProducer::OnConnect,能看出接着就是注册data source
void HeapprofdProducer::OnConnect() {
PERFETTO_DCHECK(state_ == kConnecting);
state_ = kConnected;
ResetConnectionBackoff();
PERFETTO_LOG("Connected to the service, mode [%s].",
mode_ == HeapprofdMode::kCentral ? "central" : "child");
DataSourceDescriptor desc;
desc.set_name(kHeapprofdDataSource);
desc.set_will_notify_on_stop(true);
endpoint_->RegisterDataSource(desc);
}
先是调用ProducerEndpointImpl::RegisterDataSource,这里很简单,这里的service_是执行ProducerIPCClient::Connect时传入的ProducerIPCClientImpl
void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
const DataSourceDescriptor& desc) {
PERFETTO_DCHECK_THREAD(thread_checker_);
service_->RegisterDataSource(id_, desc);
}
接着看ProducerIPCClientImpl::RegisterDataSource,可以看到这里的回调比较简单,也就是说对于producer,RegisterDataSource后初始化工作就结束了。
void ProducerIPCClientImpl::RegisterDataSource(
const DataSourceDescriptor& descriptor) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!connected_) {
PERFETTO_DLOG(
"Cannot RegisterDataSource(), not connected to tracing service");
}
protos::gen::RegisterDataSourceRequest req;
*req.mutable_data_source_descriptor() = descriptor;
ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response;
async_response.Bind(
[](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) {
if (!response)
PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
});
producer_port_->RegisterDataSource(req, std::move(async_response));
}
接着看service端的处理,这里先拿到对应的的RemoteProducer,这个producer就是在上面处理InitializeConnection时创建的。 然后通过service_endpoint调用RegisterDataSource,这里的service_endpoint和上文client端的调用RegisterDataSource的endpoint_都是ProducerEndpointImpl对象,区别是传递的service_不一样,client端的service_是ProducerIPCClientImpl,而service端就是TracingServiceImpl。这里也能看到producer的注册最终会走到TracingServiceImpl这个主要的业务对象
void ProducerIPCService::RegisterDataSource(
const protos::gen::RegisterDataSourceRequest& req,
DeferredRegisterDataSourceResponse response) {
RemoteProducer* producer = GetProducerForCurrentRequest();
if (!producer) {
PERFETTO_DLOG(
"Producer invoked RegisterDataSource() before InitializeConnection()");
if (response.IsBound())
response.Reject();
return;
}
const DataSourceDescriptor& dsd = req.data_source_descriptor();
GetProducerForCurrentRequest()->service_endpoint->RegisterDataSource(dsd);
// RegisterDataSource doesn't expect any meaningful response.
if (response.IsBound()) {
response.Resolve(
ipc::AsyncResult<protos::gen::RegisterDataSourceResponse>::Create());
}
}
这里主要的处理就是将data source添加到data_sources_里
void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
const DataSourceDescriptor& desc) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (desc.name().empty()) {
PERFETTO_DLOG("Received RegisterDataSource() with empty name");
return;
}
ProducerEndpointImpl* producer = GetProducer(producer_id);
if (!producer) {
PERFETTO_DFATAL("Producer not found.");
return;
}
// Check that the producer doesn't register two data sources with the same ID.
// Note that we tolerate |id| == 0 because until Android T / v22 the |id|
// field didn't exist.
for (const auto& kv : data_sources_) {
if (desc.id() && kv.second.producer_id == producer_id &&
kv.second.descriptor.id() == desc.id()) {
PERFETTO_ELOG(
"Failed to register data source \"%s\". A data source with the same "
"id %" PRIu64 " (name=\"%s\") is already registered for producer %d",
desc.name().c_str(), desc.id(), kv.second.descriptor.name().c_str(),
producer_id);
return;
}
}
PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
producer_id, desc.name().c_str());
auto reg_ds = data_sources_.emplace(desc.name(),
RegisteredDataSource{producer_id, desc});
...
}
至此,producer的启动初始化工作就结束了,后面就是由consumer来触发trace了,将在下一篇介绍。