LieBrother

当才华撑不起野心时,应该静下心来学习;当能力驾驭不了目标时,应该沉下心来历练。


  • 首页

  • 归档

  • 分类

  • 标签

  • 关于

HTTP工具类

发表于 2017-03-21   |   分类于 HTTP   |     |   阅读次数

GitHub代码:HttpUtil.java

摘要:本文介绍在项目中用HttpClient来发送HTTP请求时,对HtttpClient进行的封装,使得我们在项目中使用HttpClient时可以更简单易用。

Maven项目中的pom.xml

添加HttpClient依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>

HttpUtil.java

实现了对HttpClient的封装,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
package com.csh.util.http;



import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.*;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Map;

/**
* @author James-CSH
* @since 3/14/2017 08:00 AM
*/
public class HttpUtil {
private PoolingHttpClientConnectionManager cm;
private String EMPTY_STR = "";
private String UTF_8 = "UTF-8";

public HttpUtil() {
init(20, 5);
}

public HttpUtil(int maxTotal, int maxPerRoute) {
init(maxTotal, maxPerRoute);
}

/**
* 初始化
* @param maxTotal 最大连接数
* @param maxPerRoute 每个路由最大连接数
*/
private synchronized void init(int maxTotal, int maxPerRoute) {
if (null == cm) {
cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(maxTotal);// 整个连接池最大连接数
cm.setDefaultMaxPerRoute(maxPerRoute);// 每路由最大连接数,默认值是2
}
}

private CloseableHttpClient getHttpClient() {
return HttpClients.custom().setConnectionManager(cm).build();
}

/**
* 设置请求头
* @param http 发送的HTTP请求
* @param headers 请求头
* @return 发送的HTTP请求
*/
private HttpRequestBase setHeaders(HttpRequestBase http, Map<String, Object> headers) {
if (null != headers) {
for (Map.Entry<String, Object> param : headers.entrySet()) {
http.addHeader(param.getKey(), String.valueOf(param.getValue()));
}
}
return http;
}

/**
* 将参数转为特定格式
* @param params 参数
* @return
*/
private ArrayList<NameValuePair> covertParams2NVPS(Map<String, Object> params) {
ArrayList<NameValuePair> pairs = new ArrayList<NameValuePair>();
if (null != params) {
for (Map.Entry<String, Object> param : params.entrySet()) {
pairs.add(new BasicNameValuePair(param.getKey(), String.valueOf(param.getValue())));
}
}
return pairs;
}

/**
* 处理Http请求
*
* @param request
* @return
*/
private String getResult(HttpRequestBase request) {
CloseableHttpClient httpClient = getHttpClient();
CloseableHttpResponse response = null;
try {
response = httpClient.execute(request);
HttpEntity entity = response.getEntity();
if (entity != null) {
String result = EntityUtils.toString(entity);
response.close();
return result;
}
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}

return EMPTY_STR;
}

/**
* some http request just return the headers. such as head
* @param request
* @return
*/
private Header[] getHeaders(HttpRequestBase request) {
CloseableHttpClient httpClient = getHttpClient();
CloseableHttpResponse response = null;
try {
response = httpClient.execute(request);
return response.getAllHeaders();
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}

/**
* 发送GET请求
* @param url 请求路径
* @return 请求结果
* @throws URISyntaxException
*/
public String get(String url) throws URISyntaxException {
return get(url, null);
}

/**
* 发送GET请求
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws URISyntaxException
*/
public String get(String url, Map<String, Object> params) throws URISyntaxException {
return get(url, null, params);
}

/**
* 发送GET请求
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws URISyntaxException
*/
public String get(String url, Map<String, Object> headers, Map<String, Object> params) throws URISyntaxException {
URIBuilder ub = new URIBuilder();
ub.setPath(url);
//设置参数
ub.setParameters(covertParams2NVPS(params));
HttpGet httpGet = new HttpGet(ub.build());
//设置请求头
httpGet = (HttpGet) setHeaders(httpGet, headers);

return getResult(httpGet);
}

/**
* 发送POST请求
* @param url 请求路径
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String post(String url) throws UnsupportedEncodingException {
return post(url, null);
}

/**
* 发送POST请求
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String post(String url, Map<String, Object> params) throws UnsupportedEncodingException {
return post(url, null, params);
}

/**
* 发送POST请求
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String post(String url, Map<String, Object> headers, Map<String, Object> params) throws UnsupportedEncodingException {
HttpPost httpPost = new HttpPost(url);
//设置参数
ArrayList<NameValuePair> pairs = covertParams2NVPS(params);
httpPost.setEntity(new UrlEncodedFormEntity(pairs, UTF_8));
//设置请求头
httpPost = (HttpPost) setHeaders(httpPost, headers);
return getResult(httpPost);
}

/**
* 发送PUT请求
* @param url 请求路径
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String put(String url) throws UnsupportedEncodingException {
return put(url, null);
}

/**
* 发送PUT请求
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String put(String url, Map<String, Object> params) throws UnsupportedEncodingException {
return put(url, null, params);
}

/**
* 发送PUT请求
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String put(String url, Map<String, Object> headers, Map<String, Object> params) throws UnsupportedEncodingException {
HttpPut httpPut = new HttpPut(url);
//设置请求参数
ArrayList<NameValuePair> pairs = covertParams2NVPS(params);
httpPut.setEntity(new UrlEncodedFormEntity(pairs, UTF_8));
//设置请求头
httpPut = (HttpPut) setHeaders(httpPut, headers);

return getResult(httpPut);
}

/**
* 发送PATCH请求
* @param url 请求路径
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String patch(String url) throws UnsupportedEncodingException {
return patch(url, null);
}

/**
* 发送PATCH请求
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String patch(String url, Map<String, Object> params) throws UnsupportedEncodingException {
return patch(url, null, params);
}

/**
* 发送PATCH请求
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String patch(String url, Map<String, Object> headers, Map<String, Object> params) throws UnsupportedEncodingException {
HttpPatch httpPatch = new HttpPatch(url);
//设置请求参数
ArrayList<NameValuePair> pairs = covertParams2NVPS(params);
httpPatch.setEntity(new UrlEncodedFormEntity(pairs, UTF_8));
//设置请求头
httpPatch = (HttpPatch) setHeaders(httpPatch, headers);

return getResult(httpPatch);
}

/**
* 发送DELETE请求
* @param url 请求路径
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String delete(String url) throws UnsupportedEncodingException, URISyntaxException {
return delete(url, null);
}

/**
* 发送DELETE请求
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
* @throws URISyntaxException
*/
public String delete(String url, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
return delete(url, null, params);
}

/**
* 发送DELETE请求
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String delete(String url, Map<String, Object> headers, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
URIBuilder ub = new URIBuilder();
ub.setPath(url);
//设置请求参数
ub.addParameters(covertParams2NVPS(params));
HttpDelete httpDelete = new HttpDelete(ub.build());
//设置请求头
httpDelete = (HttpDelete) setHeaders(httpDelete, headers);

return getResult(httpDelete);
}

/**
* 发送TRACE请求
* 回显服务器收到的请求,主要用于测试或诊断。
* @param url 请求路径
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String trace(String url) throws UnsupportedEncodingException, URISyntaxException {
return trace(url, null);
}

/**
* 发送TRACE请求
* 回显服务器收到的请求,主要用于测试或诊断
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String trace(String url, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
return trace(url, null, params);
}

/**
* 发送TRACE请求
* 回显服务器收到的请求,主要用于测试或诊断
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
*/
public String trace(String url, Map<String, Object> headers, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
URIBuilder ub = new URIBuilder();
ub.setPath(url);
//设置请求参数
ub.setParameters(covertParams2NVPS(params));
HttpTrace httpTrace = new HttpTrace(ub.build());
//设置请求头
httpTrace = (HttpTrace) setHeaders(httpTrace, headers);

return getResult(httpTrace);
}

/**
* 发送HEAD请求
* 类似于get请求,只不过返回的响应中没有具体的内容,用于获取报头
* @param url 请求路径
* @return 请求结果
* @throws UnsupportedEncodingException
* @throws URISyntaxException
*/
public Header[] head(String url) throws UnsupportedEncodingException, URISyntaxException {
return head(url, null);
}

/**
* 发送HEAD请求
* 类似于get请求,只不过返回的响应中没有具体的内容,用于获取报头
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
* @throws URISyntaxException
*/
public Header[] head(String url, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
return head(url, null, params);
}

/**
* 发送HEAD请求
* 类似于get请求,只不过返回的响应中没有具体的内容,用于获取报头
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
* @throws URISyntaxException
*/
public Header[] head(String url, Map<String, Object> headers, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
URIBuilder ub = new URIBuilder();
ub.setPath(url);
//设置请求参数
ub.setParameters(covertParams2NVPS(params));
HttpHead httpHead = new HttpHead(ub.build());
//设置请求头
httpHead = (HttpHead) setHeaders(httpHead, headers);

return getHeaders(httpHead);
}

/**
* 发送OPTIONS请求
* 允许客户端查看服务器的性能
* @param url 请求路径
* @return 请求结果
* @throws UnsupportedEncodingException
* @throws URISyntaxException
*/
public String options(String url) throws UnsupportedEncodingException, URISyntaxException {
return options(url, null);
}

/**
* 发送OPTIONS请求
* 允许客户端查看服务器的性能
* @param url 请求路径
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
* @throws URISyntaxException
*/
public String options(String url, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
return options(url, null, params);
}

/**
* 发送OPTIONS请求
* 允许客户端查看服务器的性能
* @param url 请求路径
* @param headers 请求头
* @param params 请求参数
* @return 请求结果
* @throws UnsupportedEncodingException
* @throws URISyntaxException
*/
public String options(String url, Map<String, Object> headers, Map<String, Object> params) throws UnsupportedEncodingException, URISyntaxException {
URIBuilder ub = new URIBuilder();
ub.setPath(url);
//设置参数
ub.setParameters(covertParams2NVPS(params));
HttpOptions httpOptions = new HttpOptions(ub.build());
//设置请求头
httpOptions = (HttpOptions) setHeaders(httpOptions, headers);

return getResult(httpOptions);
}

}

Thrift封装

发表于 2017-03-16   |   分类于 Thrift   |     |   阅读次数

摘要:前面几篇文章简单介绍了Thrift的基本知识和小案例。接下来这篇文章讲述在项目中使用Thrift框架作为RPC通信的时候,如何去封装Thrift,使我们在项目中使用的时候可以更简单易用。文中的代码不一定是真的项目应用中的代码,只是写了一个粗糙的例子,提供一个封装思想。

目标 :从前面的文章的几个例子可以看出,我们在调用一个接口的一个方法的时候,需要写多几行代码来设置RPC通信的信息,而我们调用本地的某个类的一个方法的时候,我们只需要2行代码就可以了,比如:

1
2
HelloService helloService = new HelloService();
helloService.sayHello();

我们肯定是希望在使用Thrift作为RPC通信的时候,不管是Server端还是Client端,都可以像上面调用本地方法一样,只需要2行代码就搞定。带着这个目的,我们通过下面的代码来实现它。

使用手段 :当我想到希望不管在Server端还是Client端,都用2行代码来实现Thrift调用的时候,我想到了Spring的AOP,我觉得跟它很相似,首先我们在调用一个方法的时候,需要设置一大堆东西,执行完方法的时候,又要把通讯关掉,这很符合AOP面向切面编程的思维。我们可以通过AOP把除了执行业务的代码给封装掉,让使用的时候只需要new一个对象然后就可以调用它的方法。AOP是用动态代理实现的,而动态代理跟反射息息相关。这个案例可以让我们重新回顾一下这些知识点。

IDL接口定义

HelloService.thrift

1
2
3
4
5
6
7
namespace java com.thrift.demo.service 
service HelloService {
i32 sayInt(1:i32 param)
string sayString(1:string param)
bool sayBoolean(1:bool param)
void sayVoid()
}

TestService.thrift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct Test {
1: required bool bool_; //required 必须填充也必须序列化
2: optional i8 i8_; //optional 不填充则不序列化
3: i16 i16_;
4: i32 i32_;
5: i64 i64_;
6: double double_;
7: string string_;
8: binary binary_;
}

service TestService {
void getVoid(1:Test test);
bool getBool(1:Test test);
i8 getI8(1:Test test);
i16 getI16(1:Test test);
i32 getI32(1:Test test);
i64 getI64(1:Test test);
double getDouble(1:Test test);
string getString(1:Test test);
binary getBinary(1:Test test);
}

Client端

ThriftProxy.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ThriftProxy implements InvocationHandler {

//Client类
private Class clientClass;
//Service类
private Class classs;
private String[] hostPorts;

public Object newInstance(Class classs, String[] hostPorts) {
try {
this.clientClass = Class.forName(classs.getName() + "$Client");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
this.classs = classs;
this.hostPorts = hostPorts;
return Proxy.newProxyInstance(clientClass.getClassLoader(), clientClass.getInterfaces(), this);
}

@Override
public Object invoke(Object obj, Method method, Object[] objs) throws Throwable {
if (null != hostPorts && hostPorts.length > 0) {
for (String hostPort : hostPorts) {
if (null != hostPort && "" != hostPort.trim()) {
TSocket socket = null;
try {
int index = hostPort.indexOf(":");
String host = hostPort.substring(0, index);
String port = hostPort.substring(index+1, hostPort.length());
socket = new TSocket(host, Integer.valueOf(port));

TProtocol tProtocol = new TBinaryProtocol(socket);
TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(tProtocol, classs.getSimpleName());
Class[] classes = new Class[]{TProtocol.class};
socket.open();
return method.invoke(clientClass.getConstructor(classes).newInstance(multiplexedProtocol), objs);
} finally {
if (null != socket) {
socket.close();
}
}
}
}
}

return null;
}

}

ThriftProxyFactory.java

1
2
3
4
5
6
7
8
public class ThriftProxyFactory {

public static Object newInstance(Class classT, String[] hostPorts) {
ThriftProxy thriftProxy = new ThriftProxy();
return thriftProxy.newInstance(classT, hostPorts);
}

}

Server端

ThriftServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ThriftServer {

public static void start(Class[] classes, String localhost, int port) {
//classes HelloServiceImpl.class
try {
Pattern pattern = Pattern.compile("^(.+)\\$Iface$");
Matcher matcher = null;
TServerSocket serverTransport = new TServerSocket(port);
Factory protocolFactory = new TBinaryProtocol.Factory();
TMultiplexedProcessor processors = new TMultiplexedProcessor();

for (int i=0; i<classes.length; i++) {
Class<?> classT = classes[i]; // classT HelloServiceImpl.class
Class<?>[] interfaces = classT.getInterfaces();
for (Class<?> interfacez : interfaces) { // interfacez HelloService.Iface
String interfacezName = interfacez.getName();
matcher = pattern.matcher(interfacezName);
if (matcher.find()) {
String classTName = matcher.group(1); //classTName HelloService
Object classTObject = Class.forName(classTName).newInstance(); //classTObject HelloService
Class iface = Class.forName(classTName + "$Iface");
Object object = classT.newInstance();
TProcessor processor = (TProcessor) Class.forName(classTName + "$Processor")
.getDeclaredConstructor(iface)
.newInstance(object);
processors.registerProcessor(classTObject.getClass().getSimpleName(), processor);
}
}
}

TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
args.protocolFactory(protocolFactory);
args.processor(processors);
args.minWorkerThreads(1000);
args.maxWorkerThreads(1000);
TServer server = new TThreadPoolServer(args);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (Exception e) {
e.printStackTrace();
}
}

}

运行

Server端启动

1
2
3
public static void startServer() {
ThriftServer.start(new Class[]{HelloServiceImpl.class, TestServiceImpl.class}, "localhost", 9090);
}

Client端启动

1
2
3
4
5
6
7
8
9
10
public static void startClient() throws TException {
String[] hostPorts = new String[]{"localhost:9090"};
TestService.Iface client = (TestService.Iface) ThriftProxyFactory.newInstance(TestService.class, hostPorts);
Test test = new Test();
test.setI32_(100000);
System.out.println(client.getI32(test));

HelloService.Iface hello = (Iface) ThriftProxyFactory.newInstance(HelloService.class, hostPorts);
hello.sayString("哈哈哈哈");
}

结果

我们可以看出,Server端只需要ThriftServer.start(xxx)就可以将Thrift Server启动起来,而Client端也像在调用本地方法一样可以通过2行代码就实现调用远程接口的方法。

参考文章
thrift生产环境服务端使用的正确姿势
Thrift 个人实战–Thrift 服务化 Client的改造

Thrift多接口支持例子

发表于 2017-02-27   |   分类于 Thrift   |     |   阅读次数

摘要:本文简单写一个Thrift支持多接口的代码

Thrift通过TMultiplexedProcessor来设置多个服务接口
下面是代码:

IDL接口

HelloService.thrift

1
2
3
4
5
6
7
namespace java com.thrift.demo.service 
service HelloService {
i32 sayInt(1:i32 param)
string sayString(1:string param)
bool sayBoolean(1:bool param)
void sayVoid()
}

TestService.thrift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct Test {
1: required bool bool_; //required 必须填充也必须序列化
2: optional i8 i8_; //optional 不填充则不序列化
3: i16 i16_;
4: i32 i32_;
5: i64 i64_;
6: double double_;
7: string string_;
8: binary binary_;
}

service TestService {
void getVoid(1:Test test);
bool getBool(1:Test test);
i8 getI8(1:Test test);
i16 getI16(1:Test test);
i32 getI32(1:Test test);
i64 getI64(1:Test test);
double getDouble(1:Test test);
string getString(1:Test test);
binary getBinary(1:Test test);
}

接口实现类

HelloServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class HelloServiceImpl implements HelloService.Iface {

public int sayInt(int param) throws TException {
System.out.println("say int :" + param);
return param;
}

public String sayString(String param) throws TException {
System.out.println("say string :" + param);
return param;
}

public boolean sayBoolean(boolean param) throws TException {
System.out.println("say boolean :" + param);
return param;
}
public void sayVoid() throws TException {
System.out.println("say void ...");
}
}

TestServiceImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class TestServiceImpl implements TestService.Iface {

@Override
public void getVoid(Test test) throws TException {
System.out.println("TestServiceImpl + " + test.toString());
}

@Override
public boolean getBool(Test test) throws TException {
System.out.println("TestServiceImpl.getBool() + " + test.bool_);
return test.bool_;
}

@Override
public byte getI8(Test test) throws TException {
System.out.println("TestServiceImpl.getI8() + " + test.i8_);
return test.i8_;
}

@Override
public short getI16(Test test) throws TException {
System.out.println("TestServiceImpl.getI16() + " + test.i16_);
return test.i16_;
}

@Override
public int getI32(Test test) throws TException {
System.out.println("TestServiceImpl.getI32() + " + test.i32_);
return test.i32_;
}

@Override
public long getI64(Test test) throws TException {
System.out.println("TestServiceImpl.getI64() + " + test.i64_);
return test.i64_;
}

@Override
public double getDouble(Test test) throws TException {
System.out.println("TestServiceImpl.getDouble() + " + test.double_);
return test.double_;
}

@Override
public String getString(Test test) throws TException {
System.out.println("TestServiceImpl.getString() + " + test.string_);
return test.string_;
}

@Override
public ByteBuffer getBinary(Test test) throws TException {
System.out.println("TestServiceImpl.getBinary() + " + test.binary_);
return test.binary_;
}

}

Server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThriftServer {
public static void main(String[] args) {
try {
TServerSocket serverTransport = new TServerSocket(9090);
Factory protocolFactory = new TBinaryProtocol.Factory();
//创建多个服务Processor
Processor<TestService.Iface> processorTest = new TestService.Processor<TestService.Iface>(new TestServiceImpl());
com.thrift.demo01.service.HelloService.Processor<Iface> processorHello = new HelloService.Processor<HelloService.Iface>(new HelloServiceImpl());

//将服务注册到TMultiplexedProcessor中
TMultiplexedProcessor processors = new TMultiplexedProcessor();
processors.registerProcessor("testService", processorTest);
processors.registerProcessor("helloService", processorHello);

TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport)
.protocolFactory(protocolFactory)
.processor(processors)
.minWorkerThreads(1000)
.maxWorkerThreads(1000);
TServer server = new TThreadPoolServer(args);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ThriftClient {
public static void testTMultiplexedProcessor() {
try {
TTransport transport = new TSocket("localhost", 9090);
TProtocol protocol = new TBinaryProtocol(transport);
//通过TMultiplexedProtocol获取对应的服务
TMultiplexedProtocol protocolTest = new TMultiplexedProtocol(protocol, "testService");
TMultiplexedProtocol protocolHello = new TMultiplexedProtocol(protocol, "helloService");

//创建Client调用服务接口的方法
TestService.Client client = new TestService.Client(protocolTest);
HelloService.Client clientHello = new HelloService.Client(protocolHello);
transport.open();

//调用接口方法
Test test = new Test();
test.setBool_(true);
test.setDouble_(5.0);
System.out.println("Client getBool()" + client.getBool(test));
System.out.println("Client getDouble()" + client.getDouble(test));

System.out.println("HelloService sayString :" + clientHello.sayString("哈哈哈"));
transport.close();
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException te) {
te.printStackTrace();
}
}
}

运行结果:

Server端:

开启thrift服务器,监听端口:9090
TestServiceImpl.getBool() + true
TestServiceImpl.getDouble() + 5.0
say string :哈哈哈

Client端:

Received 1
Client getBool()true
Received 2
Client getDouble()5.0
Received 1
HelloService sayString :哈哈哈

通过这个方法,就可以在启动一个服务中同时提供多个接口给客户端调用,这种方式在项目开发中可能会用的更多。

参考文章:
Thrift对多接口服务的支持

Thrift Server介绍及代码实现

发表于 2017-02-27   |   分类于 Thrift   |     |   阅读次数

摘要:本文介绍Thrift几个Server,以及其代码的实现。

Thrift为服务器端提供了多种工作模式,本文中将涉及以下5中工作模式:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadedSelectorServer,这5中工作模式的详细工作原理如下:

TSimpleServer

介绍

TSimpleServer的工作模式只有一个工作线程,循环监听新请求的到来并完成对请求的处理,它只是在简单的演示时候使用,它的工作方式如下图所示:

TSimpleServer

TSimpleServer的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket连接,效率比较低,主要用于演示Thrift的工作过程,在实际开发过程中很少用到它。

代码

上篇博客已经有了
Thrift 简单例子

TNonblockingServer

介绍

TNonblockingServer工作模式,该模式也是单线程工作,但是该模式采用NIO的方式,所有的socket都被注册到selector中,在一个线程中通过seletor循环监控所有的socket,每次selector结束时,处理所有的处于就绪状态的socket,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送,对于监听socket则产生一个新业务socket并将其注册到selector中,如下图所示:

TNonblockingServer

上图中读取数据之后的业务处理就是根据读取到的调用请求,调用具体函数完成处理,只有完成函数处理才能进行后续的操作;

TNonblockingServer模式优点:

相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,同时监控多个socket的状态变化;

TNonblockingServer模式缺点:

TNonblockingServer模式在业务处理上还是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。

代码

Server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThriftServer {

public static void main(String[] args) {
try {
// 设置服务器端口
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
// 设置二进制协议工厂
Factory protocolFactory = new TBinaryProtocol.Factory();
// 处理器关联业务实现
Processor<TestService.Iface> processor = new TestService.Processor<TestService.Iface>(
new TestServiceImpl());
TNonblockingServer.Args args = new TNonblockingServer.Args(serverTransport)
.processor(processor)
.protocolFactory(protocolFactory);
TServer server = new TNonblockingServer(args);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}

}

Client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThriftClient {
public static void main(String[] args) {
try {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
TAsyncClientManager clientManager = new TAsyncClientManager();
TNonblockingSocket transport = new TNonblockingSocket("localhost", 9090);
TestService.AsyncClient client = new TestService.AsyncClient(protocolFactory, clientManager, transport);

Test test = new Test();
test.setBool_(true);
test.setI8_((byte) 1);
test.setI16_((short) 2);
test.setI32_(3);
test.setI64_(4L);
test.setDouble_(5.0);
test.setString_("abc");
test.setBinary_("abc".getBytes());

CountDownLatch latch = new CountDownLatch(1);
client.getBool(test, new AsynCallback(latch));

boolean wait;
try {
wait = latch.await(1, TimeUnit.SECONDS);
System.out.println("latch.await =:" + wait);
} catch (InterruptedException e) {
e.printStackTrace();
}

transport.close();

} catch (IOException e) {
e.printStackTrace();
} catch (TException e) {
e.printStackTrace();
}
}
}

运行结果:
Client端:

Received 0
onComplete
AsynCall result =:true
latch.await =:true

Server端:

开启thrift服务器,监听端口:9090
TestServiceImpl.getBool() + true

THsHaServer模式(半同步半异步)

介绍

THsHaServer类是TNonblockingServer类的子类,在TNonblockingServer模式中,采用一个线程来完成对所有socket的监听和业务处理,造成了效率的低下,THsHaServer模式的引入则是部分解决了这些问题。THsHaServer模式中,引入一个线程池来专门进行业务处理,如下图所示;

THsHaServer

THsHaServer的优点:

与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升;

THsHaServer的缺点:

由上图可以看出,主线程需要完成对所有socket的监听以及数据读写的工作,当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受。

代码

Server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThriftServer {
public static void main(String[] args) {
try {
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
TTransportFactory transportFactory = new TFramedTransport.Factory();
Factory protocolFactory = new TBinaryProtocol.Factory();
Processor<TestService.Iface> processor = new TestService.Processor<TestService.Iface>(
new TestServiceImpl());
Args args = new Args(serverTransport)
.processor(processor)
.transportFactory(transportFactory)
.protocolFactory(protocolFactory);
TServer server = new THsHaServer(args);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}

Client 端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThriftClient {
public static void main(String[] args) {
try {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
TAsyncClientManager clientManager = new TAsyncClientManager();
TNonblockingSocket transport = new TNonblockingSocket("localhost", 9090);
TestService.AsyncClient client = new TestService.AsyncClient(protocolFactory, clientManager, transport);

Test test = new Test();
test.setBool_(true);
test.setI8_((byte) 1);
test.setI16_((short) 2);
test.setI32_(3);
test.setI64_(4L);
test.setDouble_(5.0);
test.setString_("abc");
test.setBinary_("abc".getBytes());

CountDownLatch latch = new CountDownLatch(1);
client.getBool(test, new AsynCallback(latch));

boolean wait;
try {
wait = latch.await(1, TimeUnit.SECONDS);
System.out.println("latch.await =:" + wait);
} catch (InterruptedException e) {
e.printStackTrace();
}

transport.close();

} catch (IOException e) {
e.printStackTrace();
} catch (TException e) {
e.printStackTrace();
}
}
}

异步处理类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class AsynCallback implements AsyncMethodCallback<Boolean>{  
private CountDownLatch latch;

public AsynCallback(CountDownLatch latch) {
this.latch = latch;
}

public void onComplete(Boolean response) {
System.out.println("onComplete");
try {
// Thread.sleep(1000L * 1);
System.out.println("AsynCall result =:" + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}

public void onError(Exception exception) {
System.out.println("onError :" + exception.getMessage());
latch.countDown();
}
}

运行结果:
Client端:

Received 0
onComplete
AsynCall result =:true
latch.await =:true

Server端:

开启thrift服务器,监听端口:9090
TestServiceImpl.getBool() + true

TThreadPoolServer

介绍

TThreadPoolServer模式采用阻塞socket方式工作,,主线程负责阻塞式监听“监听socket”中是否有新socket到来,业务处理交由一个线程池来处理,如下图所示:

TThreadPoolServer

TThreadPoolServer模式优点:

线程池模式中,数据读取和业务处理都交由线程池完成,主线程只负责监听新连接,因此在并发量较大时新连接也能够被及时接受。线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。

TThreadPoolServer模式缺点:

线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。

代码

Server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ThriftServer {
public static void main(String[] args) {
try {
TServerSocket serverTransport = new TServerSocket(9090);
Factory protocolFactory = new TBinaryProtocol.Factory();
Processor<TestService.Iface> processor = new TestService.Processor<TestService.Iface>(new TestServiceImpl());
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport)
.protocolFactory(protocolFactory)
.processor(processor)
.minWorkerThreads(1000).maxWorkerThreads(1000);
TServer server = new TThreadPoolServer(args);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}

Client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public ThriftClient {
public static void main(String[] args) {
try {
// 设置调用的服务地址-端口
TTransport transport = new TSocket("localhost", 9090);
// 使用二进制协议
TProtocol protocol = new TBinaryProtocol(transport);
// 使用的接口
TestService.Client client = new TestService.Client(protocol);
// 打开socket
transport.open();

Test test = new Test();
test.setBool_(true);
test.setI8_((byte) 1);
test.setI16_((short) 2);
test.setI32_(3);
test.setI64_(4L);
test.setDouble_(5.0);
test.setString_("abc");
test.setBinary_("abc".getBytes());

System.out.println("Client getBool()" + client.getBool(test));
System.out.println("Client getI8()" + client.getI8(test));
System.out.println("Client getI16()" + client.getI16(test));
System.out.println("Client getI32()" + client.getI32(test));
System.out.println("Client getI64()" + client.getI64(test));
System.out.println("Client getDouble()" + client.getDouble(test));
System.out.println("Client getString()" + client.getString(test));
System.out.println("Client getBinary()" + client.getBinary(test));
transport.close();
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException te) {
te.printStackTrace();
}
}
}

运行结果:
Client端:

Received 1
Client getBool()true
Received 2
Client getI8()1
Received 3
Client getI16()2
Received 4
Client getI32()3
Received 5
Client getI64()4
Received 6
Client getDouble()5.0
Received 7
Client getString()abc
Received 8
Client getBinary()java.nio.HeapByteBuffer[pos=0 lim=3 cap=3]

Server端:

开启thrift服务器,监听端口:9090
TestServiceImpl.getBool() + true
TestServiceImpl.getI8() + 1
TestServiceImpl.getI16() + 2
TestServiceImpl.getI32() + 3
TestServiceImpl.getI64() + 4
TestServiceImpl.getDouble() + 5.0
TestServiceImpl.getString() + abc
TestServiceImpl.getBinary() + java.nio.HeapByteBuffer[pos=0 lim=3 cap=3]

TThreadedSelectorServer

介绍

TThreadedSelectorServer模式是目前Thrift提供的最高级的模式,它内部有如果几个部分构成:

  1. 一个AcceptThread线程对象,专门用于处理监听socket上的新连接;
  2. 若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成;
  3. 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
  4. 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取之后,交个ExecutorService线程池中的线程完成此次调用的具体执行;

TThreadedSelectorServer

如上图所示,TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况;TThreadedSelectorServer对于大部分应用场景性能都不会差,因此,如果实在不知道选择哪种工作模式,使用TThreadedSelectorServer就可以。

代码

Server端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThriftServer {
public static void main(String[] args) {
try {
// 设置服务器端口
TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);
// 设置二进制协议工厂
Factory protocolFactory = new TBinaryProtocol.Factory();
// 处理器关联业务实现
Processor<TestService.Iface> processor = new TestService.Processor<TestService.Iface>(new TestServiceImpl());
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
args.processor(processor);
args.protocolFactory(protocolFactory);
TServer server = new TThreadedSelectorServer(args);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}

Client端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ThriftClient {
public static void main(String[] args) {
try {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
TAsyncClientManager clientManager = new TAsyncClientManager();
TNonblockingSocket transport = new TNonblockingSocket("localhost", 9090);
TestService.AsyncClient client = new TestService.AsyncClient(protocolFactory, clientManager, transport);

Test test = new Test();
test.setBool_(true);
test.setI8_((byte) 1);
test.setI16_((short) 2);
test.setI32_(3);
test.setI64_(4L);
test.setDouble_(5.0);
test.setString_("abc");
test.setBinary_("abc".getBytes());

CountDownLatch latch = new CountDownLatch(1);
client.getBool(test, new AsynCallback(latch));

boolean wait;
try {
wait = latch.await(1, TimeUnit.SECONDS);
System.out.println("latch.await =:" + wait);
} catch (InterruptedException e) {
e.printStackTrace();
}

transport.close();

} catch (IOException e) {
e.printStackTrace();
} catch (TException e) {
e.printStackTrace();
}
}
}

异步处理类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class AsynCallback implements AsyncMethodCallback<Boolean>{  
private CountDownLatch latch;

public AsynCallback(CountDownLatch latch) {
this.latch = latch;
}

public void onComplete(Boolean response) {
System.out.println("onComplete");
try {
// Thread.sleep(1000L * 1);
System.out.println("AsynCall result =:" + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}

public void onError(Exception exception) {
System.out.println("onError :" + exception.getMessage());
latch.countDown();
}
}

运行结果:
Client端:

Received 0
onComplete
AsynCall result =:true
latch.await =:true

Server端:

开启thrift服务器,监听端口:9090
TestServiceImpl.getBool() + true

上面代码接口的编写:
testService.thrift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct Test {
1: required bool bool_; //required 必须填充也必须序列化
2: optional i8 i8_; //optional 不填充则不序列化
3: i16 i16_;
4: i32 i32_;
5: i64 i64_;
6: double double_;
7: string string_;
8: binary binary_;
}


service TestService {
void getVoid(1:Test test);
bool getBool(1:Test test);
i8 getI8(1:Test test);
i16 getI16(1:Test test);
i32 getI32(1:Test test);
i64 getI64(1:Test test);
double getDouble(1:Test test);
string getString(1:Test test);
binary getBinary(1:Test test);
}

参考文章
由浅入深了解Thrift(三)——Thrift server端的几种工作模式分析

Thrift 简单例子

发表于 2017-02-22   |   分类于 Thrift   |     |   阅读次数

个人博客原文:Thrift 简单例子

摘要:本文简单的举了个例子,怎么用Thrift去实现一个RPC调用。

编写IDL接口

HelloService.thrift

1
2
3
4
5
6
7
8
namespace java com.thrift.demo.service 

service HelloService {
i32 sayInt(1:i32 param)
string sayString(1:string param)
bool sayBoolean(1:bool param)
void sayVoid()
}

用Thrift编译器编译成对应的类

运行下面命令,生成HelloService.java类

1
thrift-0.10.0 -gen java HelloService.thrift

生成的HelloService.java内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.thrift.demo01.service;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.10.0)", date = "2017-02-13")
public class HelloService {

public interface Iface {

public int sayInt(int param) throws org.apache.thrift.TException;

public java.lang.String sayString(java.lang.String param) throws org.apache.thrift.TException;

public boolean sayBoolean(boolean param) throws org.apache.thrift.TException;

public void sayVoid() throws org.apache.thrift.TException;

}

//......省略了很多代码
}

编写真正业务逻辑实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class HelloServiceImpl implements HelloService.Iface {

public int sayInt(int param) throws TException {
System.out.println("say int :" + param);
return param;
}

public String sayString(String param) throws TException {
System.out.println("say string :" + param);
return param;
}

public boolean sayBoolean(boolean param) throws TException {
System.out.println("say boolean :" + param);
return param;
}
public void sayVoid() throws TException {
System.out.println("say void ...");
}
}

编写Server端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThriftServer {
public static void main(String[] args) {
try {
// 设置服务器端口
TServerSocket serverTransport = new TServerSocket(9090);
// 设置二进制协议工厂
Factory protocolFactory = new TBinaryProtocol.Factory();
// 处理器关联业务实现
Processor<HelloService.Iface> processor = new HelloService.Processor<HelloService.Iface>(
new HelloServiceImpl());
//使用单线程标准阻塞I/O模型
TServer.Args simpleArgs = new TServer.Args(serverTransport)
.processor(processor)
.protocolFactory(protocolFactory);
TServer server = new TSimpleServer(simpleArgs);
System.out.println("开启thrift服务器,监听端口:9090");
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
}

编写Client端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThriftClient {
public static void main(String[] args) {
try {
// 设置调用的服务地址-端口
TTransport transport = new TSocket("localhost", 9090);
// 使用二进制协议
TProtocol protocol = new TBinaryProtocol(transport);
// 使用的接口
HelloService.Client client = new HelloService.Client(protocol);
// 打开socket
transport.open();
client.sayBoolean(true);
client.sayString("Hello world");
client.sayInt(20141111);
client.sayVoid();
transport.close();
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException te) {
te.printStackTrace();
}
}
}

运行结果

先运行Server端,再运行Client端
服务端输出:

开启thrift服务器,监听端口:9090
say boolean :true
say string :Hello world
say int :20141111
say void …

到这里,就实现了一个Thrift的RPC调用例子

1…151617…24
LieBrother

LieBrother

当才华撑不起野心时,应该静下心来学习;当能力驾驭不了目标时,应该沉下心来历练。

120 日志
38 分类
138 标签
© 2016 - 2019 LieBrother
由 Hexo 强力驱动
主题 - NexT.Mist
本站访客数人次  |  本站总访问量次