摘要:前面几篇文章简单介绍了Thrift的基本知识和小案例。接下来这篇文章讲述在项目中使用Thrift框架作为RPC通信的时候,如何去封装Thrift,使我们在项目中使用的时候可以更简单易用。文中的代码不一定是真的项目应用中的代码,只是写了一个粗糙的例子,提供一个封装思想。
目标 :从前面的文章的几个例子可以看出,我们在调用一个接口的一个方法的时候,需要写多几行代码来设置RPC通信的信息,而我们调用本地的某个类的一个方法的时候,我们只需要2行代码就可以了,比如:1
2HelloService helloService = new HelloService();
helloService.sayHello();
我们肯定是希望在使用Thrift作为RPC通信的时候,不管是Server端还是Client端,都可以像上面调用本地方法一样,只需要2行代码就搞定。带着这个目的,我们通过下面的代码来实现它。
使用手段 :当我想到希望不管在Server端还是Client端,都用2行代码来实现Thrift调用的时候,我想到了Spring的AOP,我觉得跟它很相似,首先我们在调用一个方法的时候,需要设置一大堆东西,执行完方法的时候,又要把通讯关掉,这很符合AOP面向切面编程的思维。我们可以通过AOP把除了执行业务的代码给封装掉,让使用的时候只需要new一个对象然后就可以调用它的方法。AOP是用动态代理实现的,而动态代理跟反射息息相关。这个案例可以让我们重新回顾一下这些知识点。
IDL接口定义
HelloService.thrift1
2
3
4
5
6
7namespace java com.thrift.demo.service
service HelloService {
i32 sayInt(1:i32 param)
string sayString(1:string param)
bool sayBoolean(1:bool param)
void sayVoid()
}
TestService.thrift1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22struct Test {
1: required bool bool_; //required 必须填充也必须序列化
2: optional i8 i8_; //optional 不填充则不序列化
3: i16 i16_;
4: i32 i32_;
5: i64 i64_;
6: double double_;
7: string string_;
8: binary binary_;
}
service TestService {
void getVoid(1:Test test);
bool getBool(1:Test test);
i8 getI8(1:Test test);
i16 getI16(1:Test test);
i32 getI32(1:Test test);
i64 getI64(1:Test test);
double getDouble(1:Test test);
string getString(1:Test test);
binary getBinary(1:Test test);
}
Client端
ThriftProxy.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49public class ThriftProxy implements InvocationHandler {
//Client类
private Class clientClass;
//Service类
private Class classs;
private String[] hostPorts;
public Object newInstance(Class classs, String[] hostPorts) {
try {
this.clientClass = Class.forName(classs.getName() + "$Client");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
this.classs = classs;
this.hostPorts = hostPorts;
return Proxy.newProxyInstance(clientClass.getClassLoader(), clientClass.getInterfaces(), this);
}
@Override
public Object invoke(Object obj, Method method, Object[] objs) throws Throwable {
if (null != hostPorts && hostPorts.length > 0) {
for (String hostPort : hostPorts) {
if (null != hostPort && "" != hostPort.trim()) {
TSocket socket = null;
try {
int index = hostPort.indexOf(":");
String host = hostPort.substring(0, index);
String port = hostPort.substring(index+1, hostPort.length());
socket = new TSocket(host, Integer.valueOf(port));
TProtocol tProtocol = new TBinaryProtocol(socket);
TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(tProtocol, classs.getSimpleName());
Class[] classes = new Class[]{TProtocol.class};
socket.open();
return method.invoke(clientClass.getConstructor(classes).newInstance(multiplexedProtocol), objs);
} finally {
if (null != socket) {
socket.close();
}
}
}
}
}
return null;
}
}
ThriftProxyFactory.java1
2
3
4
5
6
7
8public class ThriftProxyFactory {
public static Object newInstance(Class classT, String[] hostPorts) {
ThriftProxy thriftProxy = new ThriftProxy();
return thriftProxy.newInstance(classT, hostPorts);
}
}
Server端
ThriftServer.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class ThriftServer {
public static void start(Class[] classes, String localhost, int port) {
//classes HelloServiceImpl.class
try {
Pattern pattern = Pattern.compile("^(.+)\\$Iface$");
Matcher matcher = null;
TServerSocket serverTransport = new TServerSocket(port);
Factory protocolFactory = new TBinaryProtocol.Factory();
TMultiplexedProcessor processors = new TMultiplexedProcessor();
for (int i=0; i<classes.length; i++) {
Class<?> classT = classes[i]; // classT HelloServiceImpl.class
Class<?>[] interfaces = classT.getInterfaces();
for (Class<?> interfacez : interfaces) { // interfacez HelloService.Iface
String interfacezName = interfacez.getName();
matcher = pattern.matcher(interfacezName);
if (matcher.find()) {
String classTName = matcher.group(1); //classTName HelloService
Object classTObject = Class.forName(classTName).newInstance(); //classTObject HelloService
Class iface = Class.forName(classTName + "$Iface");
Object object = classT.newInstance();
TProcessor processor = (TProcessor) Class.forName(classTName + "$Processor")
.getDeclaredConstructor(iface)
.newInstance(object);
processors.registerProcessor(classTObject.getClass().getSimpleName(), processor);
}
}
}
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
args.protocolFactory(protocolFactory);
args.processor(processors);
args.minWorkerThreads(1000);
args.maxWorkerThreads(1000);
TServer server = new TThreadPoolServer(args);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行
Server端启动1
2
3public static void startServer() {
ThriftServer.start(new Class[]{HelloServiceImpl.class, TestServiceImpl.class}, "localhost", 9090);
}
Client端启动
1 | public static void startClient() throws TException { |
结果
我们可以看出,Server端只需要ThriftServer.start(xxx)就可以将Thrift Server启动起来,而Client端也像在调用本地方法一样可以通过2行代码就实现调用远程接口的方法。