首页 > 后端开发 > 最新文章

RPC魔法揭秘:从原理到BRPC实战,用C++玩转分布式通信

CSDN博客 2026-05-12 22:26:46 人看过

本篇摘要

本文从RPC核心概念出发,阐释其“透明远程调用”的本质与工作原理,对比主流框架后聚焦百度开源的C++高性能RPC框架BRPC,详解其安装、Echo服务示例代码(含客户端/服务端实现),并延伸介绍基于ETCD的服务注册发现与信道管理封装,完整呈现分布式通信方案落地过程。

一.什么是rpc

简单理解

RPC(远程过程调用)就是让程序调用远程  服务器 上的功能,像调用本地函数一样简单,不用管网络传输细节。比如你手机App点“下单”,实际是调用电商服务器的“创建订单”功能(client端调用rpc服务函数请求端),RPC帮你隐藏了网络请求、数据打包等复杂操作,开发者只需写一句类似 createOrder() 的代码(rpc服务端某服务),系统自动完成远程调用并返回结果。

核心特点

透明性:调用远程服务就像调用本地函数,开发者无需关心网络通信细节。

跨进程/跨机器通信:RPC通常用于不同进程(甚至不同主机)之间的功能调用。

支持多种协议和传输方式:比如HTTP、TCP、自定义协议等,常用高性能协议如gRPC(基于HTTP/2和Protocol Buffers)。

常与序列化技术配合:调用参数和返回结果需要序列化成字节流在网络中传输,常用格式如JSON、Protobuf、Thrift等。

RPC 工作原理

客户端调用:客户端代码中调用一个看似本地的方法(如 userService.getUser(id))。

代理/Stub:客户端通过一个代理对象(Stub)把调用转换成网络请求,包括方法名、参数等。

序列化:将方法名、参数等信息序列化成二进制或文本格式(如JSON/Protobuf)。

网络传输:通过底层网络协议(如TCP/HTTP)把数据发送到服务端。

服务端处理:服务端接收请求,反序列化得到方法名和参数,找到对应的函数并执行。

返回结果:服务端将执行结果序列化后通过网络返回给客户端,客户端再反序列化得到最终结果。

常见 RPC 框架

gRPC:Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers,支持多语言。

Thrift:Facebook 开源的跨语言 RPC 框架,支持多种传输协议和数据格式。

Dubbo:阿里巴巴开源的 Java RPC 框架,广泛用于微服务架构。

brpc:百度开源的高性能 C++ RPC 框架,支持多种协议和多线程模型。

JSON-RPC / XML-RPC:基于 JSON 或 XML 的轻量级 RPC 协议,常用于 Web 服务。

典型使用场景

微服务架构服务之间通过 RPC 互相调用,实现功能解耦和分布式部署

分布式系统:比如分布式存储、计算任务调度、数据库中间层等。

前后端分离/服务化:后端将核心功能封装成 RPC 服务,供前端、App 或其他服务调用。

简单概括下:

可以理解成在rpc服务中添加一个或者多个服务,然后把rpc服务部署在某个ip+port处运行(这里也可以分散开来部署在不同主机等,只要用户能拿到对应ip+port并且rpc服务正常注册并运行即可);然后客户就可以通过调用rpc某个服务的rpc请求调用对象通过构建对应信道进行向对应rpc服务发起请求然后收到应答等。

二.BRPC介绍

是什么?

BRPC是百度开源的C++专用RPC框架,让不同服务器上的C++程序像调用本地函数一样快速通信,支撑了百度万亿级请求的核心服务。

比gRPC强在哪?

更快:延迟更低、每秒能处理的请求更多(尤其适合C++高并发场景)。

更简单专为C++设计,不用学复杂配置,接口更直观

更灵活:支持多种协议,调试和功能扩展更方便(比如动态调参数)。

更省事:依赖少,部署简单,和百度工具链直接兼容。

简单说C++项目追求性能和省心,选BRPC;需要多语言支持,用gRPC。

三.基于brpc实现简单的服务调用

上面说了对应的brpc的优点,因此这里可以看出更适合用brpc而不是grpc来实现。

brpc安装教程

安装步骤:

安装依赖:

sudo apt-get install -y git g++ make libssl-dev libprotobuf-dev librocksdb-dev libprotoc-dev protobuf-compiler libleveldb-dev

安装 brpc:

git clone https://github.com/apache/brpc.git cd brpc/ mkdir build && cd build cmake -DCMAKE_INSTALL_PREFIX=/usr .. cmake --build . -j6 make sudo make install

简单实现客户端向brpc服务端口请求服务完成应答过程(以echo回显为例)

首先先说下对应思路:

rpc服务端进行对应echo服务注册(通过Protobuf协议为服务端与客户端生成对应类,然后服务端填充对应的echo功能完成添加进入rpc-server中),最后启动对应server,也就是把rpc服务部署在对应ip+port处(即包含对应echo服务功能)。

然后对应客户端通过约定好的Protobuf协议生成的对应调用相关rpc服务的rpc请求接口进行调用(拿对应rpc服务信道进行初始化(rpc服务部署ip+port以及对应选项来初始化对应信道));调用rpc服务请求接口发送对应请求等。

rpc服务端收到对应请求就去拿着请求调用用户注册进来的服务函数进行操作,最后填充好答复以及其他相关信息发送回去。

客户端收到对应答复后进行解析拿到结果以及其他信息,然后调用回调函数进行操作或者通过答复结果进行其他处理操作。

下面看下博主总结的关于这个过程的图:

在这里插入图片描述

简单来说分为rpc服务端与rpc客户端(具体操作如下):

1·rpc服务端:关闭默认日志+构建服务对象向rpc-server中添加+启动rpc服务器。

2·rpc客户端:初始化信道+构建并发送rpc请求对象+调用回调恢复(可选)。

测试效果

在这里插入图片描述

rpc-server注册完对应的echo服务后启动在这等待。

在这里插入图片描述

服务端接收到客户端的请求进行相关服务处理(echo服务处理函数)。

在这里插入图片描述

rpc-client通过特定接口进行rpc请求,之后拿到对应响应答复后异步调用回调进行操作。

代码汇总

对应代码及详细注释:

1.Protobuf用于后续设定对应rpc服务:

syntax="proto3"; package example; option cc_generic_services = true; message EchoRequest {    string message = 1; } message EchoResponse {    string message = 1; } // 搭建对应的服务,然后为这个服务实例化出模版echo基类提供用户填充 service EchoService {    rpc Echo(EchoRequest) returns (EchoResponse); }

2.makefile(注意这里链接的所有库及顺序):

all :server client server:server.cc  echo.pb.cc g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt  -letcd-cpp-api -lcpprest -lbrpc -lgflags   -lssl -lcrypto -lprotobuf -lleveldb client:client.cc echo.pb.cc g++ -g -std=c++17 $^ -o $@ -L/usr/local/lib -lspdlog -lfmt  -letcd-cpp-api -lcpprest -lbrpc -lgflags  -lssl -lcrypto  -lprotobuf  -lleveldb .PHONY:clean clean: rm -r server client # brpc库依赖glags库,故必须把gflags链接在brpc库后面,否则报错(这里链接的Protobuf库的版本必须和protoc的时候是一样的否则报错) #-L/usr/local/lib  当前没有对应库再去系统默认的地方链接找库

3.rpc-client::

#include <brpc/channel.h> #include <thread> #include "echo.pb.h" void clientcallback(brpc::Controller *cntl, ::example::EchoResponse *response) {    //把指针直接给智能指针对象此时直接被智能指针保存地址即可,智能指针就不用在内部new了,出了函数作用自动帮忙把客户端构建对象析构释放:    std::unique_ptr<brpc::Controller> cntl_guard(cntl);    std::unique_ptr<example::EchoResponse> resp_guard(response);    if (cntl->Failed() == true)    {        std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;        return;    }    std::cout << "收到响应: " << response->message() << std::endl; } int main() {    // 客户端调用rpc接口通过信道与rpc服务端进行通信    brpc::ChannelOptions options;    options.connect_timeout_ms = -1; // 连接等待超时时间,-1表示一直等待    options.timeout_ms = -1;         // rpc请求等待超时时间,-1表示一直等待    options.max_retry = 3;           // 请求重试次数    options.protocol = "baidu_std";  // 序列化协议,默认使用baidu_std    brpc::Channel channel;    auto ret = channel.Init("127.0.0.1:8080", &options);    if (ret == -1)    {        std::cout << "初始化信道失败!\n";        return -1;    }    // 构建EchoService_Stub对象用于向服务端rpc发请求:    example::EchoService_Stub stub(&channel);    brpc::Controller *cntl = new brpc::Controller();    example::EchoResponse *rsp = new example::EchoResponse();    example::EchoRequest req;    req.set_message("你好rpc!");    auto closure = google::protobuf::NewCallback(clientcallback, cntl, rsp);    //异步线程出动走closure调用也就是回调函数的处理,主线程收到答复填充完接着往下走:    //主线程这里收到对应答复进行填充好对应的内容如rsp cntl等然后如果closure不为空就获取对应回调通知然后派异步线程去执行对应clientcallback回调函数处理即可    stub.Echo(cntl, &req, rsp, closure); //向rpcserver发送对应req,然后接收到response填充rsp及cntl,之后再去调用closure这个可调用对象即调用clientcallback      std::cout << "异步调用!\n";    std::this_thread::sleep_for(std::chrono::seconds(3));    return 0; }

4.rpc-server:

#include <brpc/server.h> #include <butil/logging.h> #include "echo.pb.h" class EchoService : public example::EchoService {    void Echo(google::protobuf::RpcController *controller,              const ::example::EchoRequest *request,              ::example::EchoResponse *response,              ::google::protobuf::Closure *done)    {        // 创建一个 brpc::ClosureGuard对象,用于管理 done的生命周期。当 rpc_guard对象析构时,会自动调用 done,通知框架RPC处理完成。        brpc::ClosureGuard rpc_guard(done);//这里析构的时候调用对应的done->run()构建对应答复信息来通知请求方可以进行自己的回调处理了        std::cout << "收到消息:" << request->message() << std::endl;        std::string str = request->message() + "--这是响应!!";        response->set_message(str);    }    //调用完对应自定义函数echo后由rpc发送对应答复集合。 }; int main() {    // 关闭brpc默认输出日志:    logging::LoggingSettings logset;    logset.logging_dest = logging::LoggingDestination::LOG_TO_NONE;    logging::InitLogging(logset);    // 创建rpc服务器注册对应服务:    brpc::Server server;    EchoService echo;    server.AddService(&echo, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);    // 添加选项并启动rpc服务:    brpc::ServerOptions options;    options.idle_timeout_sec = -1; // 连接空闲超时时间-超时后连接被关闭    options.num_threads = 1;       // io线程数量    auto ret = server.Start(8080, &options);//rpc服务端对应的echo服务启动了:也就是收到对应echo的resquest就去调用对应Echo函数完成response构建然后自动发送    if (ret == -1)    {        std::cout << "启动服务器失败!\n";        return -1;    }    server.RunUntilAskedToQuit(); // 修改等待运行结束      return 0; }

四.封装每个服务的channels及所有服务管理者

rpc 调用这里的封装,因为不同的服务调用使用的是不同的 Stub,这个封装起来的意义不大,因此这里封装的是每个服务占用的信道集合和管理起来所有服务。

封装思想(需要封装的两大类分别是service-channelsservicesmanager):

1·ServiceChannels:


首先要知道注册中心注册(etcd)注册对应服务是以服务+单例名称+ip-port来注册的,也就是说明一个指定服务可以有多个单例来运行(即多个rpc服务,可以是在不同ip+port处,因此就有了多个信道概念),得出结论是一个服务对应多个服务单例。


ServiceChannels就是管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度)。


因此这个类就需要对应信道的增删查等功能,完成对应主机名与服务信道的映射,以及采取RR轮转方式使用对应服务的信道。

2·servicesmanager:

这个其实就是把上面的service-channels管理起来,暴露给外面使用的,并集合对应etcd的watch监控来用,当用户线设定好要关系你的服务后,把上线和下线逻辑函数交给watch进行回调,也就是当有服务上下线后(某个服务的单例)判断是否为当前用户关心来完成对应增加删除对应服务信道而已。

当用户进行选择服务的时候,采取RR轮转选取对应服务的某个信道。

过程图如下:

在这里插入图片描述

实现代码即对应注释(channel.hpp):

#pragma once #include <brpc/channel.h> #include <string> #include <vector> #include <unordered_map> #include <mutex> #include "log.hpp" // 使用指针管理对象都:: // etcd通知某个服务上线或者下线是通过host:ip+port通知的,然后这边收到就要进行对应的管理,故进行host与对应信道映射。 // 管理一个服务间所有关于它的实例信道(也就是站在某个服务的所有单例对象的信道管理者角度) class ServiceChannels { public:    using Ptr = std::shared_ptr<ServiceChannels>;    using channelptr = std::shared_ptr<brpc::Channel>;    ServiceChannels(const std::string &name) : _service_name(name), _idx(0) {}    void Append(const std::string &host)    {        std::unique_lock<std::mutex> lock(_mtx);        auto it = _hosts.find(host);        if (it == _hosts.end())        {            // 构建host对应的信道进行初始化:            std::shared_ptr<brpc::Channel> pchannel = std::make_shared<brpc::Channel>();            brpc::ChannelOptions options;            options.connect_timeout_ms = -1;            options.timeout_ms = -1;            options.max_retry = 3;            options.protocol = "baidu_std";            auto ok = pchannel->Init(host.c_str(), &options);            if (ok == -1)            {                LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);                return;            }            // 完成信道这边的管理:            _hosts[host] = pchannel;            _channels.push_back(pchannel);        }    }    void Remove(const std::string &host)    {        std::unique_lock<std::mutex> lock(_mtx);        auto it = _hosts.find(host);        if (it == _hosts.end())        {            LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);            return;        }        for (auto vit = _channels.begin(); vit != _channels.end(); vit++)        {            if (*vit == it->second)            {                _channels.erase(vit);                break;            }        }        _hosts.erase(host);    }    channelptr Choose()    {        std::unique_lock<std::mutex> lock(_mtx);        if (!_channels.size())            return channelptr();        // 存在进行轮询输出:        int32_t index = _idx++ % _channels.size();        return _channels[index];    } private:    std::mutex _mtx;    std::string _service_name;    std::unordered_map<std::string, channelptr> _hosts; // 主机名(ip+port)与brpc信道之间映射关系    std::vector<channelptr> _channels;                  // 维护所有的channel    int32_t _idx;                                       // 用于RR轮转策略 }; // 所有服务的管理者,负责接收到etcd服务通知进行对应注册删除服务(或者为服务注册单例对象也就是信道等)等 class ServiceManager { public:    using Ptr = std::shared_ptr<ServiceManager>;    ServiceManager() {}    // 先声明,我关注哪些服务的上下线,不关心的就不需要管理了    void Cared(const std::string &service_name)    {        std::unique_lock<std::mutex> lock(_mtx);        _care_services.insert(service_name);    }    // etcd监视的回调函数    void OnlineService(const std::string &service_instance_name, const std::string &host)    {        const std::string &service_name = GetService(service_instance_name);        auto service = ServiceChannels::Ptr(); // 空的服务信道集        {            std::unique_lock<std::mutex> lock(_mtx);            auto fit = _care_services.find(service_name);            if (fit == _care_services.end())            {                LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);                return;            }            // 说明关心对应服务,如果服务存在就添加对应信道即可,不存在就创建服务再添加对应信道:            auto sit = _services.find(service_name);            if (sit == _services.end())            {                service = std::make_shared<ServiceChannels>(service_name);                _services.insert(std::make_pair(service_name, service));            }            else            {                service = sit->second;            }        }        if (!service)        {            LOG_ERROR("新增 {} 服务管理节点失败!", service_name);            return;        }        service->Append(host);        LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);    }    void UnonlineService(const std::string &service_instance_name, const std::string &host)    {        const std::string &service_name = GetService(service_instance_name);        auto service = ServiceChannels::Ptr(); // 空的服务信道集        {            std::unique_lock<std::mutex> lock(_mtx);            auto fit = _care_services.find(service_name);            if (fit == _care_services.end())            {                LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);                return;            }            // 从服务集合中查找进行删除:            auto sit = _services.find(service_name);            if (sit == _services.end())            {                LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);                return;            }            else            {                service = sit->second;            }        }        service->Remove(host);        LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);    }    ServiceChannels::channelptr ChooseService(const std::string &servicename)    {        std::unique_lock<std::mutex> lock(_mtx);        auto sit = _services.find(servicename);        if (sit == _services.end())        {            LOG_ERROR("当前没有能够提供 {} 服务的节点!", servicename);            return ServiceChannels::channelptr();        }        return sit->second->Choose();    } private:    const std::string GetService(const std::string &service_instance_name)    {        auto pos = service_instance_name.find_last_of('/');        if (pos == std::string::npos)            return service_instance_name; // 一般来说不会找不到,注册进去的都是包含实例        return service_instance_name.substr(0, pos);    }    std::mutex _mtx;    std::unordered_map<std::string, ServiceChannels::Ptr> _services; // 添加进来的服务都是与业务有关的,也就是关心的    std::unordered_set<std::string> _care_services; };

五.基于etcd实现服务上下线监控来完成brpc服务调用

上面封装好了对应的channel.hpp,下面就使用它结合之前封装的etcd.hpp以及复用下brpc的简单echo服务组合起来使用测试下( 说白了其实就是brpc的echo服务的  server              与client端套用下然后结合封装etcd代码以及改变下获取channel方式而已。)。

说下大致思路:

实现register与discovery两个程序也就是对应的添加rpc服务启动rpc-server+etcd注册echo服务借助servicesmanager获取对应服务信道+进行rpc服务请求发送

逻辑过程图:

在这里插入图片描述

测试效果

在这里插入图片描述

一开始启动对应的rpc客户端,发现etcd中没有关心的服务即对应实例,只能看到之前在etcd服务器中注册的其他服务,但是是不相关的故不进行调用。

在这里插入图片描述

此时启动对应rpc-server也就是给rpc服务器添加对应服务并部署,给etcd注册进去对应服务后开始运行等待。

在这里插入图片描述

此时服务端收到对应的rpc客户端发来的请求,然后进行调用echo服务进行构建答复发送回去。

在这里插入图片描述

客户端收到服务端发送来的答复进行解析处理。

代码汇总

https://gitee.com/etcd_brpc(点我跳转)

六.本篇小结

RPC通过隐藏网络细节实现跨进程调用,BRPC凭借高性能与C++友好性成为优选;本文从理论到实践,结合ETCD服务治理,为分布式系统通信提供了可落地的参考路径。

版权声明:倡导尊重与保护知识产权。未经许可,任何人不得复制、转载、或以其他方式使用本站《原创》内容,违者将追究其法律责任。本站文章内容,部分图片来源于网络,如有侵权,请联系我们修改或者删除处理。

编辑推荐

热门文章