整体框架图
如上图,
Client 分为以下几层:
Interface
→Stub
: 处理接口,将相关属性封装为metadata
Stub
→Proxy
: 通过调用信息,获取metadata
, 基于调用参数,生成代理类,然后传递Invocation
Filter
→ClientInvoker
:调用过滤器, 基于业务逻辑依次过滤Invocation
ClientInvoker
→TrasnportClient
: 最后经过ClientInvoker
, 然后开始处理Invocation
, 将Invocation
转化成Message
, 作为传递的参数。过程中会经过MessageIntercetor
从而定制不同的Message
。TransportClient
: 封装 客户端负载均衡 的逻辑,获取对应的客户端,然后将Message
转化成Request
, 然后发送到服务器端TransportClient
→End
:Client
收到响应,将之封装成Result
, 然后经过拦截器ResultInterceptor
将之转化为需要的数据接口,比如xxBean
, 最后返回给调用方,结束本次调用
Server 分为以下几层:
TransportServerHandler
→TransportServer
: 首先要接受到请求Request
, 因为不清楚会从什么样的方式接受到,所以抽象除了TransportServerHandler
将Request
转化成Message
后传递给Server
。核心的逻辑在Server
中处理。TransportServer
→Filter
:Server
转化Message
为Invocation
, 然后传递给Invoker
Filter
→ServerInvoker
: 和 Client 的第三部一致,也是过滤Invocation
ServerInvoker
: 最后一个为ServerInvoker
, 会获取到implement
, 然后执行调用逻辑
后面行文,会针对具体各个层级的细节,按照 关键类 、 类图 、 描述 来具体详述。
这里需要注意的是,当前的 RPC 支持以下三种方式
- 兼容 v1 版本的消息注入逻辑,接收需要之前版本的服务器框架支持。
- RPC+JSON 的框架,接收需要该框架支持。
- HTTP+JSON 的框架,接收只需要实现任意服务器框架即可,比如
SpringMVC
这里的 Server 只是代表 RPC 配套的 Server 的部分。如果是其他的方式, Server 部分的图是不符合的。
Client
Stub 层
Stub
public class Stub<T> {
private String serviceName;
private Class<T> serviceClass;
private final Map<AnyKey<?>, MetaData> metaDataMap = new HashMap<>();
// lazy
private T proxy;
}
Stub 会解析类,然后获取 name
class
metadata
然后等到第一次调用时,基于上文中的信息, 生成 proxy
,并缓存。
MetaData
public class MetaData {
private String service;
private String endpoint;
private Class<?> proxyClass;
private Method method;
private Type[] parameterTypes;
private Type returnType;
private TransferMode sendMode = TransferMode.Common;
private TransferMode retMode = TransferMode.Common;
private Attributes attributes = new Attributes();
在处理成 MetaData
的时候,
- 考虑到泛型抽象出了
Type
, - 考虑到发送,接受方案,抽象出
TransferMode
, - 考虑到可能存在的一些注解属性,还处于未知,所以,先处理成
Attributes
Proxy 层
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// gen or get invocation
MetaData behavior = servicePool.getMetaData(proxy, method);
Invocation invocation = behavior.derive(new Args(args));
RpcCall rpcCall = context.startInternalCall();
rpcCall.setInvocation(invocation);
Result result = clientInvoker.invoke(invocation);
// 简单设计,如果有必要后置,就给 result 加 hook
context.endCallIfComplete();
// 如果有异常,则抛出
if (result.hasEx()) {
throw result.getEx();
}
return result.getValue();
}
Filter 层
public Invoker build() {
Invoker last = originInvoker;
Iterator<Filter> filterIterator = filters.descendingIterator();
while (filterIterator.hasNext()) {
Filter filter = filterIterator.next();
Invoker next = last;
last = new FilterNode(next, filter);
}
return last;
}
创建过滤链,然后包装成 Invoker
, 然后依次过滤 Invocation
, 直到最后一个 ClientInvoker
ClientInvoker 层
public Result invoke(Invocation invocation) throws Exception {
try {
// get channel
TransportClient channel = this.clientProvider.getClient();
// genTemplate
Message message = messageParser.parse(invocation);
// interceptor select
InterceptorGroup group = interceptorSelector.select(message);
// intercept msg
MessageInterceptor msgInterceptor = group.getMsgInterceptor();
msgInterceptor.intercept(message);
// 通过数据管道 发送/获取 结果
Result result = channel.send(message);
// intercept result
ResultInterceptor resultInterceptor = group.getResultInterceptor();
resultInterceptor.intercept(result);
return result;
} catch (FuncException sre) {
return new CommonResult(sre.getCause());
} catch (Throwable e) {
return new CommonResult(e);
}
}
- 传入
Invocation
后,首先通过TransportMessageParser
转换为Message
- 然后经过一轮拦截器
MsgInterceptor
, - 然后经过
Client
发送Message
- 获取响应
Result
- 然后在经过一轮拦截器
ResultInterceptor
- 最后返回
MessageParser
public interface TransportMessageParser {
Message parse(Invocation invocation);
}
/**
* 放到 headers 里面的属性
*/
private Map<String, String> headers = new HashMap<>();
/**
* 放到 参数 的属性
*/
private Map<String, String> parameters = new HashMap<>();
private HttpMethod httpMethod;
private MediaType consumeType;
/**
* 默认为空
*/
private String path = "";
private byte[] body = new byte[0];
private Invocation invocation;
private final Attachments attachments = new Attachments();
public HttpMessage() {
}
取决于协议的不同,可以转化为不同的 Message
, 供后面 TransportClient
使用。
这里是 HttpMessage
, 包含 header, parameter, method(post/get), mediatype, path, body, attachments
MessageInterceptor
MessageInterceptor
主要是往 Message
中注入信息,比如 Body
, Parameter
等需要用到的部分。
其中转化为 Body
时,需要序列化框架,因此抽象出了 BodyCodec
来满足不同情况下,对 Body
的处理。BodyCodec
又依赖底层的 Codec
来进行相应的序列化。
RequestBodyInterceptor
@Override
public void intercept(Message msg) {
byte[] body = msg.getBody();
if (ArrayUtils.isEmpty(body)) {
try {
byte[] encode = bodyEncoder.encode(msg);
msg.setBody(encode);
} catch (Exception e) {
FineLoggerFactory.getLogger().error(e.getMessage(), e);
}
}
}
HttpRpcBodyCodec
@Override
public byte[] encode(Message message) throws Exception {
byte[] body = message.getBody();
MetaData metaData = message.getInvocation().getMetaData();
if (metaData.getSendMode().isCommon()) {
Invocation invocation = message.getInvocation();
Args args = invocation.getArgs();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
getEncoder().encode(baos, args);
return baos.toByteArray();
}
return body;
}
JsonCodec
@Override
public void encode(OutputStream out, Object obj) throws Exception {
objectMapper.writeValue(out, obj);
}
从上面的关键逻辑中可以看出
BodyInterceptor
: 负责赋值BodyCodec
负责解析Message
, 然后将选中的部分转化成bytes
- 参见上文中的
HttpRpcBodyCodec
, 这里是会将所有的Args
转化成Body
, 对应的服务器部分也是将Body
解析成Args
, 从而正常的使用。类似下图
- 参见上文中的
JsonCodec
负责处理任意Object
, 转化成 JSON 后写入OutputStream
中
ResultInterceptor
ResultInterceptor
和上文中的 MessageInterceptor
功能类似,都是拦截 Result
后进行一些处理。
目前是有三个实现,组合后处理不同的响应场景。
Transport 层
Provider
通过Connector
与ConnectionInfo
创建Client
Client
接收到Message
后,- 通过
ClientRequestConstructor
处理成对应的Request
值 Client
发送Request
, 接受响应Response
Client
将Response
转化为Result
@NotNull
private Result execute(HttpMessage message) throws Exception {
// 构建请求
HttpRequestBase request = requestConstructor.construct(message);
CloseableHttpResponse response = httpClient.execute(request);
// check status
StatusLine statusLine = response.getStatusLine();
int responseCode = statusLine.getStatusCode();
// check ex
if (responseCode >= HttpStatus.SC_BAD_REQUEST) {
CommonResult httpResult = new CommonResult();
String reason = statusLine.getReasonPhrase();
String msg = "status is " + responseCode + " reason is " + reason;
httpResult.setEx(new TransportException(msg));
return httpResult;
}
// get return bytes
Result result = toResult(message, response);
result.setInvocation(message.getInvocation());
return result;
}
注:考虑到客户端负载均衡的场景, 如果需要,直接在 Provider
这部分去进行实现即可。
Server
Codec
类图
2023-12-07 修改
接口设计
流程图
sequenceDiagram participant codec as RpcJsonCodec participant context as JsonCodecContext participant registry as JsonModuleRegistry participant B as Biz B->>registry: 注册 module codec->>context: 获取 codec 单例 context->>registry: 获取 module registry->>context: 返回 module context->>codec: 返回 codec 单例 B->>registry: 注册 module registry->>context: 失效 codec 单例 codec->>context: 获取 codec 单例 context->>registry: 获取 module registry->>context: 返回 module context->>codec: 获取 codec 单例
关键类
ResponseDeserializer
在服务器返回响应的时候,基于实际业务,我们返回的数据类型如下。
public class Response {
//响应状态
private int status;
//错误码
private String errorCode;
//错误信息
private String errorMsg;
//响应实体内容
private Object data;
}
这样的数据类型,如果不记录 data 的 class 类型, 很难进行反序列化。比如
ObjectMapper mapper = new ObjectMapper();
Response ok = Response.ok(new JSONObj("111"));
String value = mapper.writeValueAsString(ok);
Response response = mapper.readValue(value, Response.class);
private class JSONObj {
private String val1;
}
输出的结果如下
可以发现是无法将 value 正常的处理会原来的 JSONObj 的类的。
然而,如果将类的 class 写进去,可以解决这个问题,但会导致Java 中的反序列化漏洞
所以设计了如下结构 BoxType
public class BoxType extends Type {
private final Type self;
private final Type[] fields;
}
当需要反序列化时,传入 Type
(实际是 BoxType
)。
@Override
public Object decode(InputStream in, Type type) throws Exception {
try {
beforeDecode(type);
JsonParser parser = objectMapper.getFactory().createParser(in);
return JsonUtils.decode(parser, objectMapper.getTypeFactory(), type);
} finally {
afterDecode();
}
}
然后此 Type
会通过 ThreadLocal
保存下来。
private final ThreadLocal<Type> types;
protected void beforeDecode(Type type) {
types.set(type);
}
然后添加反序列化器 ResponseDeserializer
, 当遇到 Response.class
的时候,使用此序列化器
module.addDeserializer(Response.class, new ResponseDeserializer(types));
此序列化器,当处理到内部的 data
的时候,将 BoxType
里面存储的子结构获取出来。供反序列化的时候确认反序列化值。
case "data":
default:
// 处理序列化值
Type type = typeHolder.get();
if (type instanceof BoxType) {
BoxType structure = (BoxType) type;
Type[] fields = structure.getFields();
Type dataType = fields[0];
Object data = JsonUtils.decode(p, ctxt.getTypeFactory(), dataType);
response.setData(data);
}
ArgsRpcSerializer
public class Args {
private Object[] args;
}
Args 和上文的 Response 类似,这里就不赘述。
SafeAnnotationIntrospector
如上文所说,反序列化漏洞往往发生在
- 调用了 ObjectMapper.enableDefaultTyping()函数;
- 对要进行反序列化的类的属性使用了值为 JsonTypeInfo.Id.CLASS 的@JsonTypeInfo 注解;
- 对要进行反序列化的类的属性使用了值为 JsonTypeInfo.Id.MINIMAL_CLASS 的@JsonTypeInfo 注解;
所以为了限制用户的使用操作,防止出现异常,这里限制使用注解 JsonTypeInfo
不允许在
- Object[]
- Container<x,x,x>
- Object
上使用该注解。
测试如下
首先构造一个 Bean, 里面包含 @JsonTypeInfo, 并且为 Object
private static class ContainerBean {
@JsonTypeInfo(use = Id.CLASS)
private List<Object> list;
@JsonTypeInfo(use = Id.CLASS)
private Map<String, Object> map;
@JsonTypeInfo(use = Id.CLASS)
private Object box;
@JsonTypeInfo(use = Id.CLASS)
private Object[] boxArray;
@JsonTypeInfo(use = Id.CLASS)
private Object[][] boxArray2;
}
未处理前,结果为下
{
"list": [
{
"@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$SimpleBean",
"val": 1
}
],
"map": {
"1": {
"@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$SimpleBean",
"val": 1
}
},
"box": {
"@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$BoxBean",
"bean": {
"val": 1
}
},
"boxArray": [
{
"@class": "com.fr.workspace.rpc.codec.json.JsonUtilsTest$BoxBean",
"bean": {
"val": 1
}
}
],
"boxArray2": [
[
"[Ljava.lang.Object;",
[
{
"bean": {
"val": 1
}
}
]
]
]
}
可以看到 map, list, object, object[] 默认都会将 class 记录下来。因此会出现反序列化问题。
通过以下方式改造
private static class SafeAnnotationIntrospector extends JacksonAnnotationIntrospector {
private static final long serialVersionUID = -7768565555692182793L;
@Override
protected TypeResolverBuilder<?> _findTypeResolver(MapperConfig<?> config, Annotated ann, JavaType baseType) {
JsonTypeInfo info = _findAnnotation(ann, JsonTypeInfo.class);
if (info != null) {
Id use = info.use();
if (use == Id.CLASS || use == Id.MINIMAL_CLASS) {
if (baseType.getRawClass() == Object.class) {
return null;
}
if ((baseType.isArrayType() && baseType.getContentType().getRawClass() == Object.class)) {
return null;
}
if (baseType.isContainerType()) {
int count = baseType.containedTypeCount();
for (int i = 0; i < count; i++) {
JavaType javaType = baseType.containedTypeOrUnknown(i);
if (javaType.getRawClass() == Object.class) {
return null;
}
}
}
}
}
return super._findTypeResolver(config, ann, baseType);
}
}
输出结果
{
"list": [
{
"val": 1
}
],
"map": {
"1": {
"val": 1
}
},
"box": {
"bean": {
"val": 1
}
},
"boxArray": [
{
"bean": {
"val": 1
}
}
],
"boxArray2": [
[
"[Ljava.lang.Object;",
[
{
"bean": {
"val": 1
}
}
]
]
]
}
可以看到不会有任何 class 属性, 存在序列化问题的部分,都过滤掉
Type
Note
如何正确读取参数的类型,以及当前的支持程度
上文有讲到,反序列化时,需要记录具体的 Type
类型
目前有两种类型
com.fr.workspace.rpc.invocation.type.ContainerType
任意 n 个泛型的类,包含 Map, List 等泛型形式com.fr.workspace.rpc.invocation.type.BoxType
盒类型,用于内部有 Object, 并且不指定泛型的类型。
限制为
- 不支持嵌套的泛型。比如 List<Map<a,b>>
JsonModuleProvider
Note
针对具体类的扩展逻辑,和正常的
@JsonDeserialize
的区别、优势
使用 Jackson 对具体的类进行序列化/反序列化时。有几种区别。
1、使用 SimpleModule
类似下图
ObjectMapper mapper = getMapper();
SimpleModule module = new SimpleModule(RpcJsonCodec.class.getSimpleName());
module.addSerializer(A.class, new ASerializer());
module.addDeserializer(A.class, new ADeserializer());
mapper.registerModule(module);
但是该方案有一个问题。
当继承 A
时,比如 AImpl
。这个时候,序列化时,可以捕获到对应的 Serializer
, 反序列化时,由于匹配规则 com.fr.third.fasterxml.jackson.databind.module.SimpleDeserializers#_find
private final JsonDeserializer<?> _find(JavaType type) {
if (_classMappings == null) {
return null;
}
return _classMappings.get(new ClassKey(type.getRawClass()));
}
只会匹配当前类型的 class, 所以导致如果要反序列化 AImpl
, 那么是匹配不到 A
的。
而由于我们有 XMLable
的场景,所以必然要判断当前是否是 XMLable
。 所以直接使用如上方案不可以。
但根据上文所述,只需要处理匹配规则即可。 所以通过继承后重写匹配规则,即可以解决上述问题。
SimpleDeserializers deserializers = new SimpleDeserializers() {
private static final long serialVersionUID = 7861280549299306670L;
@Override
public JsonDeserializer<?> findBeanDeserializer(JavaType type, DeserializationConfig config, BeanDescription beanDesc) throws JsonMappingException {
JavaType superType = type.findSuperType(Filter.class);
if (superType != null) {
return deserializer;
}
return super.findBeanDeserializer(type, config, beanDesc);
}
};
2、使用 @JsonDeserialize
@JsonSerialize
@JsonDeserialize(using="")
@JsonSerialize(using="")
private XMLable xmlObj;
该注解有以下几种方法
方法 | 描述 |
---|---|
contentConverter | 指定自定义的内容转换器类 |
converter | 指定自定义的转换器类 |
include | 指定序列化时要包含的属性 |
nullsUsing | 指定自定义的处理 null 值的序列化器类 |
keyUsing | 指定自定义的处理键的序列化器类,用在 Map 上 |
contentUsing | 指定自定义的处理内容信息的序列化器类,用在 List 上 |
using | 指定自定义的序列化器类 |
这个注解的问题有 1 个, 放在对象上时不能识别嵌套的内部 Class
Container {
Xmlable xmlable;
}
ContainerImpl extends Container {}
Bean {
@JsonSerialize(using=xmlSerializer)
@JsonDeserialize(using=xmlDeserializer)
ContainerImpl container;
}
这样的场景下,Container
内部的 xmlable 是不能被使用 xmlDeserializer
反序列化成 Xmlable
的。
只能改成
Container {
@JsonSerialize(using=xmlSerializer)
@JsonDeserialize(using=xmlDeserializer)
Xmlable xmlable;
}
这种需要改动的地方可能会很多,需要调用者自己考虑的比较多。
这里有有两种影响,
1、pure-http, 是必须考虑这种影响的。
2、rpc-http, 这种是尽量减少同学的感知。
或者
@JsonSerialize(using=xmlSerializer)
@JsonDeserialize(using=xmlDeserializer)
public interface Xmlable {
}
这种改动就导致所有的地方, xmlable 的序列化方式都会改变。
总结
第一种方式,影响的 args 会比较多,但是只针对当前的 mapper, 重新创建一个 mapper, 就不会有影响。 第二种方式,影响的 mapper 会比较多, 因为是通用的注解,所以所有的 mapper 都会被影响到,但是影响的 args 会比较少,只影响有注解的部分。
使用
关键类
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface Service {
/**
* 服务名称
*
* @return 名称
*/
String name() default "";
}
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@Inherited
public @interface Request {
/**
* 服务器端接受的类型
*
* @return 类型
*/
MediaType consume() default MediaType.APPLICATION_JSON;
/**
* 传递的方法,post/get/put/delete
** @return 方法
*/
HttpMethod method() default HttpMethod.POST;
/**
* 请求路径,可以叠加
* 使用在类上,则为 base
* 使用在方法上,则为 path
* 最后的路径 = base + path
** @return 路径
*/
String path();
}
flowchart TB A[service] --> B{exist} B -- yes --> C[v2] B -- no --> D[v1] C --> E[Request] E --> F{exist} F -- yes --> G[http] F -- no --> H[rpc]
RPC 方案
Note
在对应的类上添加 @Service, 代表是新方案。
@Service
public interface DataFetchService {
<T extends Base> DataFetchResult<T> getConfig(Class<T> clazz, String nameSpace);
class DataFetchResult<T extends Base> {
@JsonTypeInfo(use = Id.CLASS)
private List<T> entities;
public DataFetchResult() {
}
public DataFetchResult(List<T> entities) {
this.entities = entities;
}
public List<T> getEntities() {
return entities;
}
public void setEntities(List<T> entities) {
this.entities = entities;
}
}
}
该方案,只需要在客户端服务器端同时注册该接口即可。
Http 方案
@Service
@Request(path="/datafetch")
public interface DataFetchService {
@Request(path="/getConfig")
<T extends Base> DataFetchResult<T> getConfig(Class<T> clazz, String nameSpace);
}
该方案需要服务器端根据 path
实现对应的 Controller
@Controller
@RequestMapping(value = "/datafetch")
public class DataFetchController {
@RequestMapping(value = "/getConfig", method = RequestMethod.POST)
@ResponseBody
public Response getConfig(HttpServletRequest req,
HttpServletResponse res, @RequestParam Class<T> clazz, @RequestParam String nameSpace) throws Exception {
// get config
DataFetchResult<T> results = xxx;
return Response.ok(results);
}