手写Rpc框架 - 导读
git仓库-all-rpc
GTIEE:https://gitee.com/quercus-sp204/all-rpc 【参考源码 yrpc】
1. Rpc概念
RPC 即远程过程调用(Remote Procedure Call) ,就是通过网络从远程计算机程序上请求服务。
- 本地调用抽象:允许程序像调用本地函数一样调用远程计算机上的函数。开发者无需编写复杂的网络通信代码来处理诸如建立连接、发送请求、等待响应等细节,只需关注业务逻辑的实现。例如,在一个分布式系统中,A 服务器上的程序需要调用 B 服务器上的某个函数来获取数据,使用 RPC 就可以像调用本地函数一样简单。
- 通信协议与序列化:为了实现这种跨网络的函数调用,RPC 框架通常会使用特定的通信协议(如 TCP/IP)来传输数据,并通过序列化和反序列化技术,将调用的参数和返回值转换为适合在网络上传输的格式。比如,将参数对象转换为字节流进行传输,在接收端再将字节流还原为对象。
那么,它的应用场景有哪些呢?我们平时用到了吗?
- 微服务架构:在微服务架构中,各个微服务之间通常需要相互通信来完成复杂的业务流程。例如,一个电商系统中,订单服务可能需要调用库存服务来检查商品库存,调用用户服务来验证用户信息等,RPC 可以高效地实现这些微服务间的通信。
- 分布式系统:在大型分布式系统中,不同节点可能负责不同的功能模块。例如,在搜索引擎系统中,索引构建节点和查询服务节点可能分布在不同的服务器上,通过 RPC 可以实现节点之间的协同工作。
说白了,就是不同服务之间的网络通信嘛。那你可能会问了,假如我的系统有User、Order、Shipment三个服务【爪哇SpringBoot编写的嚯】,如果User想要访问Order上面的函数,我只需要将此函数以Http接口的形式暴露出去,然后User那边使用RestTemplate来访问不就好了吗?何必这么费劲还要用框架呢?
仔细想一下,确实有那么点儿卵道理,但是又仔细一想,
- 你会发现,你使用RestTemplate的时候,需要手动处理请求的 URL 拼接、请求头设置、参数序列化和响应反序列化等操作,对了,还有异常也需要自己处理,需要自己处理调用失败的情况,如重试几次啊等等
- HTTP 协议是一种文本协议,存在较多的头部信息,在数据传输时会带来额外的开销。而rpc框架的协议通常相比http协议,是很轻量的。
- 如果Order部署在了多台机器上面,代码里面肯定是存了这些机器的地址,如果扩容或者缩容,都要修改代码,同时还需要我们手动选择调用哪一台机器上面的Order【需要手动实现负载均衡】
那么,rpc都可以完成这些,并且服务的地址信息那些啊,可以在rpc的注册中心拉取到,动态感知。
综上所述,一个基本的Rpc框架主要应该具有的能力是:
基础通信能力
-
稳定性与可靠性
- 连接管理:具备完善的连接池管理机制,对连接进行复用,减少频繁建立和销毁连接的开销,同时保证连接的稳定性。例如,在高并发场景下,能自动处理连接的超时、重连等问题
服务发现与治理能力
- 服务注册与发现: 1. 服务提供者能够在启动时自动将自己的服务信息(如服务名称、地址、端口等)注册到服务注册中心,方便服务消费者发现和调用。2.实时感知:调用方能够及时感知调用的服务信息并更新。
- 负载均衡: 如随机、轮询、最少活跃调用数、一致性哈希
- 服务容错
- 熔断机制:当服务提供者出现故障或响应时间过长时,能够自动熔断对该服务的调用,避免大量请求积压,影响整个系统的稳定性。
- 降级策略:在系统资源紧张或服务出现故障时,能够自动降级服务,提供默认的响应结果或采取其他临时措施,保证系统的基本可用性。
易用性与可扩展性
- 简单的编程模型:让开发者能够像调用本地函数一样调用远程服务,无需关注底层的网络通信细节,降低开发难度,提高开发效率
- 插件化与扩展性
- SPI 机制:具备良好的插件化架构,通过服务提供者接口(SPI)机制,允许开发者根据实际需求扩展框架的功能,如自定义序列化方式、负载均衡策略、过滤器等。
现在就按照上面的能力,来一个一个实现,最终将其组合成一个框架。
2. 角色
一个Rpc框架的大致角色分布:
服务提供者将自己的数据信息【例如,端口、ip、接口等信息提供给注册中心】(服务注册)
消费者从注册中心拉取到可用的服务信息(服务发现),然后选择一个合适的服务(负载均衡),发送网络请求【请求里面封装了需要调用的接口,参数等等】,
服务提供者接收到请求之后,本地调用方法,然后通过网络把响应结果过传输给消费者
最后消费者解析响应结果。
注册中心: (本文就选择zookeeper为注册中心)
3. 注册中心的接入
zookeeper的安装与启动,就不在这里赘述了。
思考注册中心的主要能力:【服务注册,服务发现】
定义接口
Registry接口
/*
* 注册中心的能力: 注册服务, 拉取服务列表
*/
public interface Registry {
/**
* 注册服务
* @param serviceConfig 服务的配置内容
*/
void register(ServiceConfig<?> serviceConfig);
/**
* 从注册中心拉取服务列表
* @param serviceName 服务的名称
* @return 服务的地址
*/
List<InetSocketAddress> lookup(String serviceName, String group);
}
ServiceConfig 封装服务信息的class
/*
服务信息
*/
public class ServiceConfig<T> {
// 接口的类型
/*
比如UserServiceImpl实现了UserService, 真实对象就是实现类,interfaceProvider就是UserService.class
*/
private Class<?> interfaceProvider;
private Object ref; // 真实对象
private String group = "default"; // 分组
// get set.....
}
Zookeeper注册中心的实现类 ZookeeperRegistry
服务注册在zookeeper上面的节点如图所示
trpc根节点
==▶消费者节点
==▼生产者节点
====▼接口的全限定名
======▼分组
========▼地址信息1...
@Slf4j
public class ZookeeperRegistry extends AbstractRegistry {
// 维护一个zk实例
private ZooKeeper zooKeeper;
public ZookeeperRegistry() {
this.zooKeeper = ZookeeperUtil.createZookeeper();
}
public ZookeeperRegistry(String connectString,int timeout) {
this.zooKeeper = ZookeeperUtil.createZookeeper(connectString,timeout);
}
@Override
public void register(ServiceConfig<?> service) {
// 服务名称的节点 ---- "/tprc-metadata/providers/接口全限定名"
String parentNode = Constant.BASE_PROVIDERS_PATH +"/"+service.getInterface().getName();
// 建立服务节点这个节点应该是一个持久节点
if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){
ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);
ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
}
// 建立分组节点
parentNode = parentNode + "/" + service.getGroup();
if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){
ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);
ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);
}
// 创建本机的临时节点, ip:port ,
// 服务提供方的端口一般自己设定,我们还需要一个获取ip的方法
// ip我们通常是需要一个局域网ip,不是127.0.0.1,也不是ipv6
// 192.168.12.121
String node = parentNode + "/" + NetUtils.getIp() + ":" + TrpcBootstrap.getInstance().getConfiguration().getPort();
if(!ZookeeperUtil.exists(zooKeeper,node,null)){
ZookeeperNode zookeeperNode = new ZookeeperNode(node,null);
ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);
}
log.info("服务{},注册ok",service.getInterface().getName());
}
/**
* 拉取合适的服务列表
* @param serviceName 服务名称
* @return 服务列表
*/
@Override
public List<InetSocketAddress> lookup(String serviceName,String group) {
// 1、找到服务对应的节点
String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" +group;
List<String> children = ZookeeperUtil.getChildren(zooKeeper, serviceNode, null);
// 获取了所有的可用的服务列表
List<InetSocketAddress> inetSocketAddresses = children.stream().map(ipString -> {
String[] ipAndPort = ipString.split(":");
String ip = ipAndPort[0];
int port = Integer.parseInt(ipAndPort[1]);
return new InetSocketAddress(ip, port);
}).toList();
if(inetSocketAddresses.isEmpty()){
throw new DiscoveryException("未发现任何可用的服务主机.");
}
return inetSocketAddresses;
}
}
上面的ZookeeperUtil是自定义的操作Zookeeper的工具类。-- 详情见源码里面的注释,值得说明一下,Zookeeper要先有父结点才能创建子节点,不能把路径直接写全了直接创建,故在源码里面会用createRoot方法初始化所有的父结点。
public static ZooKeeper createZookeeper(String connectPath, int timeout) {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
.................
// 连接成功就创建根节点,检查是否存在rpc根节点 /trpc-metadata/providers && /trpc-metadata/consumers
createRoot(zooKeeper);
...............
} catch (IOException | InterruptedException e) {
log.info("创建zookeeper实例时发生异常:",e);
throw new ZookeeperException("创建zookeeper实例时发生异常");
}
}
至此,注册中心的两个基本方法就可以告一段落了。
rpc_236">4.Trpc框架启动器
既然是一个框架,那么,我们必然有一个入口来启动这一套流程。
①服务提供方
- 基本功能信息appName、registry
形如: all-tRpc-demo / demo-simple-provider / …/ProviderApplication.java 这样来启动我们的提供方。
public class ProviderApplication {
public static void main(String[] args) {
TrpcBootstrap.getInstance()
.appName("user-provider")
// 配置注册中心
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
// 扫描包下的类,然后批量发布接口
.scan("com.feng.demo")
// 启动服务
.start();
}
}
在TrpcBootstrap.java里面
@Slf4j
public class TrpcBootstrap {
// 单例,每个应用程序只有一个
private static final TrpcBootstrap trpcBootstrap = new TrpcBootstrap();
// 配置
private final RpcConfiguration configuration;
// 获取实例
public static TrpcBootstrap getInstance() {
return trpcBootstrap;
}
// 设置应用名称 *****
public TrpcBootstrap appName( String appName ) {
configuration.setAppName(appName);
return this;
}
// 配置注册中心 *****
public TrpcBootstrap registry( RegistryConfig registryConfig ) {
// 传递过来的参数registryConfig,还没有创建连接,在这里创建与注册中心的连接
registryConfig.createRegistryConnection(); // 创建注册中心的连接
configuration.setRegistryConfig(registryConfig); // 设置服务注册中心
return this;
}
}
RpcConfiguration是封装的配置信息
// 全局的配置类,代码配置 --> xml配置 --> 默认项
@Data
public class RpcConfiguration {
// 配置信息-->端口号
private int port = 8094;
// 配置信息-->应用程序的名字
private String appName = "default";
// 分组信息
private String group = "default";
// 配置信息-->注册中心
private RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181"); // 默认的
// 配置信息-->序列化协议
private String serializeType = "jdk";
// 配置信息-->压缩使用的协议
private String compressType = "gzip";
// 配置信息
@Getter
public IdGenerator idGenerator = new IdGenerator(1, 2);
// 配置信息-->负载均衡策略
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
// 为每一个ip配置一个限流器
private final Map<SocketAddress, RateLimiter> everyIpRateLimiter = new ConcurrentHashMap<>(16);
// 为每一个ip配置一个断路器,熔断
private final Map<SocketAddress, CircuitBreaker> everyIpCircuitBreaker = new ConcurrentHashMap<>(16);
// 读xml,dom4j
public RpcConfiguration() {
............
}
}
// 里面又持有注册中心的类
@Slf4j
public class RegistryConfig {
// 定义连接的 url zookeeper://127.0.0.1:2181 redis://192.168.12.125:3306
private final String connectString;
// 持有一个 Registry
private Registry registry;
public RegistryConfig(String connectString) {
this.connectString = connectString;
}
public Registry getRegistry() {
if ( registry == null ) createRegistryConnection();
return registry;
}
/**
* 可以使用简单工厂来完成
* @return 具体的注册中心实例
*/
public void createRegistryConnection() {
if ( connectString == null ) throw new DiscoveryException("未配置注册中心!");
// 1、获取注册中心的类型
String registryType = getRegistryType(connectString,true).toLowerCase().trim();
log.info("【创建与注册中心的连接~~~】 注册中心的类型: {}", registryType);
// 2、通过类型获取具体注册中心
if( registryType.equals("zookeeper") ){
String host = getRegistryType(connectString, false);
this.registry = new ZookeeperRegistry(host, Constant.ZK_TIME_OUT);
} else if (registryType.equals("nacos")){
String host = getRegistryType(connectString, false);
this.registry = new NacosRegistry(host, Constant.ZK_TIME_OUT);
} else {
throw new DiscoveryException("未发现合适的注册中心。");
}
}
private String getRegistryType(String connectString,boolean ifType){
String[] typeAndHost = connectString.split("://");
if(typeAndHost.length != 2){
throw new RuntimeException("给定的注册中心连接url不合法");
}
if(ifType){
return typeAndHost[0];
} else {
return typeAndHost[1];
}
}
}
- 扫描接口并发布scan
// 扫描项目指定包下面的接口,并且将他们发布到注册中心
public TrpcBootstrap scan(String packageName) {
// 1. 获取指定包 path 下的所有类的全限定名
List<String> classNames = getAllClassName(packageName);
// 2.1 拿到所有标注了TrpcApi注解的类
List<? extends Class<?>> classes = getTrpcClassesByList(classNames);
// 2.2遍历这些构建实例
for (Class<?> clazz : classes) {
Class<?>[] interfaces = clazz.getInterfaces(); // 获取到他的接口
Object instance;
try {
instance = clazz.getConstructor().newInstance(); // 通过无参构造器创建一个实例
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
// 获取注解的group
TrpcApi annotation = clazz.getAnnotation(TrpcApi.class);
String group = annotation.group();
// 3.将这些接口发布
for (Class<?> anInterface : interfaces) {
ServiceConfig<?> serviceConfig = new ServiceConfig<>();
serviceConfig.setInterface(anInterface);
serviceConfig.setRef(instance);
serviceConfig.setGroup(group);
if (log.isDebugEnabled()){
log.debug("---->已经通过包扫描,将服务【{}】发布.",anInterface);
}
// 3、发布
publish(serviceConfig);
}
}
return this;
}
//
private TrpcBootstrap publish( ServiceConfig<?> service ) {
configuration.getRegistryConfig().getRegistry().register(service);
// 维护一个映射关系
SERVERS_LIST.put(service.getInterface().getName(), service);
return this;
}
具体可以看源码里面的实现
- 启动netty
public void start() {
// 1、创建eventLoop,老板只负责处理请求,之后会将请求分发至worker
EventLoopGroup boss = new NioEventLoopGroup(2);
EventLoopGroup worker = new NioEventLoopGroup(10);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
// 3.配置服务
bootstrap = bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 是核心,我们需要添加很多入站和出站的handler
socketChannel.pipeline().addLast(new LoggingHandler()) // 打印日志
.addLast(new TrpcRequestDecoder()) // 请求过来,需要解码
// 根据请求进行方法调用
.addLast(new MethodCallHandler())
.addLast(new TrpcResponseEncoder()) // 响应回去,需要编码
;
}
});
// 4.绑定端口
ChannelFuture channelFuture = bootstrap.bind(configuration.getPort()).sync();
// 5.关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
② 服务消费方
形如 all-tRpc-demo / demo-simple-consumer /…/ConsumerApplication.java
public class ConsumerApplication {
public static void main(String[] args) {
ReferenceConfig<UserService> reference = new ReferenceConfig<>();
reference.setReference(UserService.class);
// 1、连接注册中心
// 2、拉取服务列表
// 3、选择一个服务并建立连接
// 4、发送请求,携带一些信息(接口名,参数列表,方法的名字),获得结果
TrpcBootstrap.getInstance()
.appName("user-consumer")
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.reference(reference);
UserService userService = reference.get();
System.out.println("=======================================");
for (int i = 0; i < 10; i++) {
System.out.println("【rpc调用开始=============】");
// 开始时间
long start = System.currentTimeMillis();
List<User> users = userService.getUserByName("田小锋q" + i);
for (User user : users) {
System.out.println(user);
}
// 结束时间
long end = System.currentTimeMillis();
System.out.println("rpc执行耗时:" + (end - start));
System.out.println("【rpc调用=============结束-----】");
}
}
}
@Slf4j
public class ReferenceConfig<T> {
// 接口类型
private Class<T> interfaceRef;
// 注册中心
private Registry registry;
// 分组信息
private String group;
/**
* 代理设计模式
* @return 代理对象
*/
public T get() {
// 此处一定是使用动态代理完成了一些工作
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 类加载器
Class<T>[] classes = new Class[]{interfaceRef}; // 接口类型
InvocationHandler handler = new ProxyConsumerInvocationHandler(registry,interfaceRef,group);
// 使用动态代理生成代理对象
Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);
return (T)helloProxy;
}
}
主要是jdk动态代理 在invoke里面实现我们的远程调用
5. 序列化&&压缩
在 RPC(远程过程调用)框架中,序列化和压缩是两个重要的概念,它们在数据传输过程中起着关键作用
序列化
上图里面可以看出来,序列化是将对象转换为字节流的过程,以便在网络上传输或存储到文件中。在 RPC 中,客户端调用远程服务时,需要将调用的参数对象序列化为字节流,通过网络发送到服务端;服务端接收到字节流后,再将其反序列化为对象进行处理。处理完后,又将结果对象序列化为字节流返回给客户端,客户端再反序列化得到结果。
常见的序列化方式
那么我们就定义一下序列化的接口
public interface Serializer {
/**
* 抽象的用来做序列化的方法
*/
byte[] serialize(Object object);
/**
* 反序列化的方法
*/
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
1. JDK序列化
@Slf4j // lombok里面的日志注解
public class JdkSerializer implements Serializer {
@Override
public byte[] serialize(Object object) {
if (object == null) return null;
try (
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(baos);
) { // try - with写法
outputStream.writeObject(object);
byte[] result = baos.toByteArray();
if(log.isInfoEnabled()){
log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);
}
return result;
} catch (IOException e) {
log.error("序列化对象【{}】时放生异常.",object);
throw new SerializeException(e);
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if(bytes == null || clazz == null) return null;
try (
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(bais);
) {
Object object = objectInputStream.readObject();
if(log.isInfoEnabled()){
log.info("类【{}】已经完成了反序列化操作.",clazz);
}
return (T)object;
} catch (IOException | ClassNotFoundException e) {
log.error("反序列化对象【{}】时放生异常.",clazz);
throw new SerializeException(e);
}
}
}
2.JSON序列化
这里就用fastjson了
@Slf4j
public class JsonSerializer implements Serializer {
@Override
public byte[] serialize(Object object) {
if (object == null) return null;
byte[] result = JSON.toJSONBytes(object);
if (log.isInfoEnabled()) {
log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】", object, result.length);
}
return result;
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (bytes == null || clazz == null) return null;
T t = JSON.parseObject(bytes, clazz);
if (log.isInfoEnabled()) {
log.info("类【{}】已经完成了反序列化操作.", clazz);
}
return t;
}
}
3.Hessian序列化
Hessian是一个轻量级的、基于HTTP的RPC(远程过程调用)框架,由Resin开源提供。它使用一个简单的、基于二进制的协议来序列化对象,并通过HTTP进行传输。Hessian的设计目标是提供一种高效、可靠且易于使用的远程服务调用机制。
maven依赖
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>版本号</version> <!-- 4.0.66 -->
</dependency>
@Slf4j
public class HessianSerializer implements Serializer {
@Override
public byte[] serialize(Object object) {
if (object == null) return null;
try (
ByteArrayOutputStream baos = new ByteArrayOutputStream();
) {
Hessian2Output hessian2Output = new Hessian2Output(baos);
hessian2Output.writeObject(object);
hessian2Output.flush();
byte[] result = baos.toByteArray();
if(log.isInfoEnabled())
log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);
return result;
} catch (IOException e) {
log.error("使用hessian进行序列化对象【{}】时放生异常.",object);
throw new SerializeException(e);
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if(bytes == null || clazz == null) return null;
try (
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
) {
Hessian2Input hessian2Input = new Hessian2Input(bais);
T t = (T) hessian2Input.readObject();
if(log.isInfoEnabled()
log.info("类【{}】已经使用hessian完成了反序列化操作.",clazz);
return t;
} catch (IOException e) {
log.error("使用hessian进行反序列化对象【{}】时发生异常.",clazz);
throw new SerializeException(e);
}
}
}
4.序列化工厂
转念一想嚯,我们这是实现一个框架额,肯定是要有对外扩展的能力的,那么,我们就将序列化的所有方式默认加载到内存中去,通过一个工厂类来获取指定的序列化方法就可以了,然后暴露添加其他序列化方法的接口。
/**
* @version 1.0
* @Author txf
* @Date 2025/2/10 15:17
* @注释 序列化工厂
*/
@Slf4j
public class SerializerFactory {
private final static ConcurrentHashMap<String, ObjectWrapper<Serializer>> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);
private final static ConcurrentHashMap<Byte, ObjectWrapper<Serializer>> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);
static {
ObjectWrapper<Serializer> jdk = new ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());
ObjectWrapper<Serializer> json = new ObjectWrapper<>((byte) 2, "json", new JsonSerializer());
ObjectWrapper<Serializer> hessian = new ObjectWrapper<>((byte) 3, "hessian", new HessianSerializer());
SERIALIZER_CACHE.put("jdk",jdk);
SERIALIZER_CACHE.put("json",json);
SERIALIZER_CACHE.put("hessian",hessian);
SERIALIZER_CACHE_CODE.put((byte) 1, jdk);
SERIALIZER_CACHE_CODE.put((byte) 2, json);
SERIALIZER_CACHE_CODE.put((byte) 3, hessian);
}
/**
* 使用工厂方法获取一个SerializerWrapper
* @param serializeType 序列化的类型
* @return SerializerWrapper
*/
public static ObjectWrapper<Serializer> getSerializer(String serializeType) {
ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE.get(serializeType);
if(serializerWrapper == null){
log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeType);
return SERIALIZER_CACHE.get("jdk");
}
return SERIALIZER_CACHE.get(serializeType);
}
public static ObjectWrapper<Serializer> getSerializer(Byte serializeCode) {
ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE_CODE.get(serializeCode);
if(serializerWrapper == null){
log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeCode);
return SERIALIZER_CACHE.get("jdk");
}
return SERIALIZER_CACHE_CODE.get(serializeCode);
}
/**
* 新增一个新的序列化器
* @param serializerObjectWrapper 序列化器的包装
*/
public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper){
SERIALIZER_CACHE.put(serializerObjectWrapper.getName(),serializerObjectWrapper);
SERIALIZER_CACHE_CODE.put(serializerObjectWrapper.getCode(),serializerObjectWrapper);
}
}
压缩
压缩是指通过特定的算法对数据进行处理,减少数据的存储空间或传输带宽。在 RPC 中,对序列化后的字节流进行压缩可以进一步减少数据传输量,提高传输效率。说白了,就是传的少了。RPC嘛,将性能追求到极致!!!!
设计方式同序列化一样的。
public interface Compressor {
// 序列化后的字节数组压缩
byte[] compress(byte[] bytes);
// 解压缩
byte[] decompress(byte[] bytes);
}
Gzip压缩
@Slf4j
public class GzipCompressor implements Compressor {
@Override
public byte[] compress(byte[] bytes) {
try (
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
) {
gzipOutputStream.write(bytes);
gzipOutputStream.finish();
byte[] result = baos.toByteArray();
if(log.isInfoEnabled())
log.info("对字节数组进行了压缩长度由【{}】压缩至【{}】.",bytes.length,result.length);
return result;
} catch (IOException e){
log.error("对字节数组进行压缩时发生异常",e);
throw new CompressException(e);
}
}
@Override
public byte[] decompress(byte[] bytes) {
try (
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
GZIPInputStream gzipInputStream = new GZIPInputStream(bais);
) {
byte[] result = gzipInputStream.readAllBytes();
if(log.isInfoEnabled())
log.info("对字节数组进行了解压缩长度由【{}】变为【{}】.",bytes.length,result.length);
return result;
} catch (IOException e){
log.error("对字节数组进行压缩时发生异常",e);
throw new CompressException(e);
}
}
}
压缩工厂就不在这里写了。
导读部分结束了。。