Getting Started
如果有homebrew的话,直接执行以下命令即可,brew会处理相关依赖(https://thrift.apache.org/docs/install/)。
brew install thrift
或者可以从源码安装。
下载tar包 https://thrift.apache.org/download
参考 https://thrift.apache.org/docs/BuildingFromSource
先写一个例子,目录结构如下:
├── pom.xml
├── src
│ ├── main
│ │ ├── java
│ │ └── resources
│ └── test
│ └── java
└── thrift
├── Common.thrift
└── ShopService.thrift
pom.xml中添加以下依赖:
thrift目录下创建两个thrift文件:
Common.thrift
namespace java me.kavlez.thrift.service
service BaseService {
string echoServiceName()
}
ShopService.thrift
include "Common.thrift"
namespace java me.kavlez.thrift.service
struct Shop {
1: required i32 id,
2: required string name
}
struct Item {
1: required i32 id,
2: required string name = "unknown",
3: required string detail,
4: required Shop shop
}
service ShopService extends Common.BaseService {
Shop queryShopInfo(1: i32 id),
bool isValidShop(1: Shop shop),
set
}
Thrift提供了多个语言的生成器实现,按照thrift文件生成java类,生成代码命令的用法如下:
thrift -r --gen
其中-r即recursive,如果在文件中通过include关键字引用了其他文件,-r选项可以一并生成被引用的文件。
例如上面ShopService.thrift中的:
include Common.thrift
默认情况下,代码会在gen-
生成后再拷贝有点麻烦,直接生成到代码目录下,在工程目录下执行以下命令:
thrift -r --gen java --out src/main/java thrift/ShopService.thrift
执行后src/main/java/目录下生成me/kavlez/thrift/service/目录,以及4个java文件。
在service目录下创建impl,提供接口实现:
package me.kavlez.thrift.service.impl;
import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.Shop;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* Created by Kavlez.Kim@gmail.com
*/
@Slf4j
public class ShopServiceImpl implements ShopService.Iface {
@Override
public Shop queryShopInfo(int id) throws TException {
return new Shop(id, "DMC_".concat(String.valueOf(id)));
}
@Override
public boolean isValidShop(Shop shop) throws TException {
return shop != null;
}
@Override
public Set
if (shopId < 1) {
return Collections.emptySet();
}
Set
Shop shop = new Shop(1101, "DMC");
for (int i = 0; i < 8; i++) {
Item item = new Item(shopId + i, "sample_".concat(String.valueOf(shopId + i))
, "this is sample_".concat(String.valueOf(i))
, shop);
items.add(item);
}
return items;
}
@Override
public String echoServiceName() throws TException {
return "alo! this is shop service!";
}
}
除了业务实现,我们需要额外做两件事情——构建Server和Client。
构建Server,也就是为Server指定Transparent、Protocol、Processor:
package me.kavlez.thrift.server;
import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.ShopService;
import me.kavlez.thrift.service.impl.ShopServiceImpl;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
/**
* Created by Kavlez.Kim@gmail.com
*/
@Slf4j
public class SimpleServerHolder {
public static TServer buildServer() {
TServerSocket serverSocket = null;
try {
serverSocket = new TServerSocket(8081);
} catch (TTransportException e) {
e.printStackTrace();
}
TProcessor tprocessor = new ShopService.Processor
TServer.Args tArgs = new TServer.Args(serverSocket);
tArgs.protocolFactory(new TCompactProtocol.Factory());
tArgs.processor(tprocessor);
TServer server = new TSimpleServer(tArgs);
return server;
}
public static void main(String[] args) {
TServer server = SimpleServerHolder.buildServer();
log.info("server ready...");
server.serve();
}
}
相应地,构建Client:
package me.kavlez.thrift.client;
import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.util.Set;
/**
* Created by Kavlez.Kim@gmail.com
*/
@Slf4j
public class SimpleClientHolder {
private TTransport transport;
public ShopService.Client buildClient(String serverAddr, int serverPort, int timeout) throws TException {
this.transport = new TSocket(serverAddr, serverPort, timeout);
TProtocol protocol = new TCompactProtocol(transport);
transport.open();
ShopService.Client client = new ShopService.Client(protocol);
return client;
}
public static void main(String[] args) {
SimpleClientHolder simpleClientHolder = new SimpleClientHolder();
ShopService.Client client = null;
try {
client = simpleClientHolder.buildClient("localhost", 8081, 1000);
Set
log.info("return items = {}", String.valueOf(items));
} catch (TException e) {
e.printStackTrace();
}
if (null != simpleClientHolder.transport) {
simpleClientHolder.transport.close();
}
}
}
依次运行Server和Client,输出正常。
IDL (Interface Description Language)
提供服务的第一步是用IDL编写Thrift文件,IDL几乎可以描述接口所需的所有元素,接口定义中包括以下内容:
namespace
每个thrift文件都在自己的命名空间中,多个thrift文件可以用同一个命名空间作为标识,并指定要使用的语言的generator。
例如:
namespace java me.kavlez.thrift.service
namespace php tutorial
基本类型
类型说明
bool
布尔类型
i8 (byte)
8-bit 有符号整型,对应java的byte
i16
16-bit 有符号整型,对应java的short
i32
32-bit 有符号整型,对应java的int
i64
64-bit 有符号整型,对应java的long
double
64-bit 浮点类型,对应java的double
string
字符串
binary
Blob (byte array)
结构体
用于定义一个对象类型。
字段默认为optional,可以声明required。
字段可以设置默认值。
结构体之间可以互相引用。
0.9.2开始可以引用自身。
struct Shop {
1: required i32 id,
2: required string name
}
struct Item {
1: required i32 id,
2: required string name = "unknown",
3: required string detail,
4: required Shop shop
}
枚举
值是可选项,枚举不能嵌套;基本上就是K、V的形式,不能描述太复杂的枚举类。
enum Numberz {
ONE = 1,
TWO,
THREE,
FIVE = 5,
SIX,
EIGHT = 8
}
常量
可以自定义常量,像Map、List这样的复杂结构可以用json表示。
const i32 INT_CONST = 1234; // a
const map
const list
容器类型
不支持异构容器,容器的元素类型必须一致。
元素类型可以是service以外的任何类型。
类型说明
map
Map from one type to another
list
Ordered list of one type
set
Set of unique elements of one type
自定义异常
语法上和struct相似,生成后的代码,不同语言各有各的实现方式。
exception IllegalShopException {
1: i32 errorCode,
2: string message,
3: Shop shop
}
service
一个函数集合,语法和java定义接口的语法类似,下面是一些例子。
service ThriftTest {
/**
* 无返回,空参数列表
*/
void testVoid(),
/**
* 声明返回类型、参数
*/
string testString(1: string thing),
/**
* 返回结构体
*/
Shop queryShopInfo(1: i32 id),
/**
* 结构体作为参数
*/
bool isValidShop(1: Shop shop),
/**
* ...
*/
set
/**
* 抛出异常
*/
bool changeShopStatus(1: i32 shopId) throws(1: IllegalShopException err),
/**
* 多异常
*/
bool changeItemStatus(1: i32 itemId) throws(1: IllegalShopException shopErr,2:IllegalItemException itemErr),
/**
* oneway表示该方法在客户端发起请求后不会等待响应,返回类型必须为void
*/
oneway void sendMessage(1:i32 shopId,2:string message)
}
thrift working stack
用Thrift构建服务和客户端,架构如下:
+-------------------+ +-------------------+
| Server | | Client |
| | | |
| +---------------+ | | +---------------+ |
| | | | | | | |
| | your code | | | | your code | |
| +---------------+ | | +---------------+ |
| | Service | | | | Service | |
| | processor | | | | Client | |
| +---------------+ | | +---------------+ |
| | | | | | | |
| | Protocol | | | | Protocol | |
| +---------------+ | | +---------------+ |
| | | | | | | |
| | Transport |<--------->| Transport | |
| +---------------+ | | +---------------+ |
+-------------------+ +-------------------+
生成的接口类中大致包括三样,分别是Iface、Client、Processor。
另外还有Server、Transport、Protocol。
Transport
在RPC框架的语境下谈传输层很容易只想到网络通信,但Transport表述的并不只是网络通信。
不如说Transport是多种IO的抽象,其不仅限于网络IO。
比如,基础的TIOStreamTransport,以及其两个子类,TSocket和TZlibTransport。
TSocket在上面的例子中作为TBinaryProtocol依赖的transport类型,与Server的TServerSocket进行通信。
但后者是封装了InflaterInputStream和DeflaterOutputStream,其InputStream并不要求是SocketInputStream。
从开发角度来讲,如果将一个TMemoryBuffer对象传入Protocol,并以此创建某个service对应的Client,再调用相应接口。
整个过程在代码上并没有什么限制,只是运行时抛出org.apache.thrift.TApplicationException。
Protocol
protocol依赖transport,决定双方以什么协议通信,同时也是通信内容的载体。
org.apache.thrift.protocol.TProtocol中的方法声明里,一系列readXX和writeXX,在具体实现中通常都是通过transport来完成。
以TJSONProtocol为例,其实现的TProtocol的所有write方法都是以几个私有的write方法组织起来。
比如,writeI32和writeI64都是通过私有方法writeJSONInteger,而writeJSONInteger则是由实例化时传入的trasnport进行write。
Processor
构建自己的server时需要在tArgs提供一个Processor,比如本文中的ShopService.Processor。
(p.s. 如果需要提供多个Processor,比如再加一个ItemService,则使用TMultiplexedProcessor即可。)
Server通过Processor执行业务逻辑代码,文件中描述的每个函数作为ProcessFunction子类进行实例化,放入Processor的processMap中。
Server收到请求,从输入的protocol中读取方法名,根据方法名从processMap中拿到对应的ProcessFunction;
通过ProcessFunction的process方法执行业务逻辑,过程大体分为3步:
从protocol读入请求参数,构建参数对象;
传入参数,本地执行业务方法。假设方法名为"getItems",调用结果则为getItems_success;
将结果写入protocol,调用protocol.writeXX;
Client
像本文中,指定Transport和Protocol,构建ShopService.Client,客户端通过Client对象像调用本地方法一样调用queryItems;
在ShopService中,Client类同样实现了ShopService.Iface中的方法,以queryItems为例,其实现如下:
public Shop queryShopInfo(int id) throws org.apache.thrift.TException {
send_queryShopInfo(id);
return recv_queryShopInfo();
}
在send_queryShopInfo,构建该函数对应的xx_args对象,将其写入oprot,并通过oprot.tranport进行flush;
相应地,recv_queryShopInfo就是从iport中读取函数的返回值,构建该函数对应的queryShopInfo_result对象。
Server
将Transport、Protocol和Processor集合在一起就是一个完整的Server,父类TServer提供了唯一的抽象方法——serve()。
以TSimpleServer为例,serve中通过java.lang.ServerSocket的accept获取client Socket并转为client Transport,以此获取相应的Processor、创建相应的inputTransport、outputTransport和iProt、oProt。
(p.s. 默认的TProcessorFactory没有子类,其getProcessor(Transport)和并没有通过transport来获取processor。可以用来扩展,比如用一个server提供多版本服务之类的。)
剩下的工作由Processor进行处理,从iPort读入请求信息并构造TMessage,找到相应的ProcessFunction并执行其process方法,这个在上面说过。
Thrift为TServer提供了3种实现:
TSimpleServer: 单线程ServerSocket实现,仅用于测试;
TThreadPoolServer: 封装了ThreadPoolExecutor,用内部类WorkerProcess表示单个请求,通过每个WorkerProcess对象的transport获取相应的Processor和Protocol,调用业务代码并返回;
AbstractNonblockingServer: 非阻塞server抽象类,其serve()方法即整个过程的skeleton,serve()中调用的方法交给其子类提供具体实现。
public void serve() {
// start any IO threads
if (!startThreads()) {
return;
}
// start listening, or exit
if (!startListening()) {
return;
}
setServing(true);
// this will block while we serve
waitForShutdown();
setServing(false);
// do a little cleanup
stopListening();
}
AbstractNonblockingServer的3个子类,分别为:
TNonblockingServer: 实现父类的startThreads(),启动selector线程(也就是SelectAcceptThread,父类声明了protected final Selector selector),开始轮询SelectedKeys,检查状态并进行相应处理:
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
另外,使用TNonblockingServer时transport必须为TFramedTransport,以此保证能正确读取单次方法调用。
THsHaServer: "HsHa",即"Half-Sync/Half-Async",是TNonblockingServer的子类。
工作流程和TNonblockingServer相似,主要区别在与handleRead()。
handleRead中完成读取后,另外一项重要的工作就是requestInvoke(buffer),也就是执行processor.process(iProt,oProt)。
不过,TNonblockingServer是单线程执行,而THsHaServer则是通过线程池。
将FrameBuffer装进Invocation(其run方法即frameBuffer.invoke()),提交给线程池处理。
线程池参数的默认值如下:
corePoolSize = 5;
maximumPoolSize = Integer.MAX_VALUE;
keepAliveTime = 60;
workQueue = new LinkedBlockingQueue
TThreadedSelectorServer: 进一步加强HsHaServer,用一个AcceptThread接收所有连接请求,并担任负载均衡的角色。
负载均衡的工作由构造器参数中的SelectorThreadLoadBalancer进行,该类只提供了一种实现——对已注册的selector线程列表进行round robin。
AcceptThread处理连接时,通过SelectorThreadLoadBalancer选出selector线程,将接收到的socketChannel放入selector线程的队列中。
虽然TThreadedSelectorServer的requestInvoke也是使用线程池进行,但线程池的默认配置和THsHaServer不同,默认时为corePoolSize为5的FixedThreadPool。
如果corePoolSize小为0,则由caller线程执行。
最后,把之前的例子修改一下,看看效果。
AbstractTServerHolder.java
package me.kavlez.thrift.server;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException;
public abstract class AbstractTServerHolder {
private TServer tServer;
public abstract TServer build() throws TTransportException;
}
ThreadedSelectorServerHolder.java
package me.kavlez.thrift.server;
import me.kavlez.thrift.service.ShopService;
import me.kavlez.thrift.service.impl.ShopServiceImpl;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportException;
public class ThreadedSelectorServerHolder extends AbstractTServerHolder {
@Override
public TServer build() throws TTransportException {
TNonblockingServerTransport transport = new TNonblockingServerSocket(8090);
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport);
ShopService.Processor
= new ShopService.Processor
args.processor(shopServiceProcessor)
.protocolFactory(new TBinaryProtocol.Factory())
.transportFactory(new TFramedTransport.Factory());
TServer server = new TThreadedSelectorServer(args);
return server;
}
}
Launcher.java
package me.kavlez.thrift;
import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.client.AbstractShopServiceClientHolder;
import me.kavlez.thrift.client.NonBlockingClientHolder;
import me.kavlez.thrift.client.ShopServiceClientAgent;
import me.kavlez.thrift.server.AbstractTServerHolder;
import me.kavlez.thrift.server.ThreadedSelectorServerHolder;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException;
import java.io.FileNotFoundException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class Launcher {
static class TServerClientHolderPair {
private AbstractTServerHolder tServerHolder;
private Class extends AbstractShopServiceClientHolder> clientHolderClass;
public TServerClientHolderPair(AbstractTServerHolder tServerHolder, Class extends AbstractShopServiceClientHolder> clientHolderClass) {
this.tServerHolder = tServerHolder;
this.clientHolderClass = clientHolderClass;
}
}
public static void main(String[] args) throws InterruptedException, TTransportException, FileNotFoundException {
final AbstractTServerHolder serverHolder = new ThreadedSelectorServerHolder();
final TServer tServer = serverHolder.build();
ExecutorService executorService = Executors.newCachedThreadPool();
Future> serverFuture = executorService.submit(new Runnable() {
@Override
public void run() {
tServer.serve();
}
});
Thread.sleep(100);
int times = 10;
final CountDownLatch countDownLatch = new CountDownLatch(times);
class ShopServiceClientTask implements Runnable {
@Override
public void run() {
AbstractShopServiceClientHolder clientHolder = null;
clientHolder = new NonBlockingClientHolder();
try {
ShopService.Iface shopService = new ShopServiceClientAgent(clientHolder.build());
for (int i = 0; i < 1000; i++) {
Set
log.info("return items = {}", String.valueOf(items));
}
} catch (TException e) {
log.info("thread name={} get TException", Thread.currentThread().getName(), e);
} finally {
clientHolder.close();
countDownLatch.countDown();
}
}
}
long start = System.currentTimeMillis();
for (int i = 0; i < times; i++) {
executorService.submit(new ShopServiceClientTask());
}
countDownLatch.await();
log.info("used {} ms ", System.currentTimeMillis() - start);
tServer.setShouldStop(true);
tServer.stop();
executorService.shutdown();
}
}