本文最后更新于2 分钟前,文中所描述的信息可能已发生改变。
使用Netty框架构建一个简单的服务端与客户端程序进行通信
依赖项
xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.2.0.Alpha1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
自定义消息实体
请求实体
java
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
@ToString
public class RpcRequest {
private String interfaceName;
private String methodName;
}
响应实体
java
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
@ToString
public class RpcResponse {
private String message;
}
服务端
java
@Slf4j
public class RpcServer{
private int port;
public static void main(String[] args) {
new RpcServer(8888).run();
}
public RpcServer(int port) {
this.port = port;
}
public void run(){
//创建处理i/o事件的线程池
//处理链接的建立
NioEventLoopGroup boss = new NioEventLoopGroup();
//处理存活链接的事件
NioEventLoopGroup work = new NioEventLoopGroup();
KryoSerializer kryoSerializer = new KryoSerializer();
//创建启动辅助类
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss,work)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,
// 减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY,true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE,true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,
// 如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG,128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
//绑定端口,等待绑定成功,ip不写默认获取本地ip
ChannelFuture f = b.bind(port).sync();
//等到服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("服务端出错:{}",e.getMessage());
e.printStackTrace();
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
自定义服务端处理器
java
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 一个具有原子性的整型变量,被当作线程安全计数器
*/
private static final AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
RpcRequest request = (RpcRequest) msg;
log.info("服务端接收到消息:[{}] 共计接收次数:[{}]",request,atomicInteger.getAndIncrement());
RpcResponse response = RpcResponse.builder().message("服务端响应").build();
//将响应数据发送给客户端,并添加一个默认的关闭连接管道的处理
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} finally {
//销毁对象msg,因为其是一个引用计数对象
//如果该对象被writeAndFlush则不需要显示释放,Netty会帮我们释放
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("服务端Handler出错:{}",cause.getMessage());
cause.printStackTrace();
ctx.close();
}
}
客户端
java
@Slf4j
public class RpcClient {
private String ip;
private int port;
private static final Bootstrap b = new Bootstrap();
public static void main(String[] args) {
RpcClient rpcClient = new RpcClient("127.0.0.1", 8888);
for (int i = 0; i < 4; i++) {
log.info("=====start======");
RpcRequest msg = RpcRequest.builder().interfaceName("interface").methodName("method").build();
RpcResponse response = rpcClient.sendMessage(msg);
log.info("收到消息:{}",response);
log.info("======end=====");
}
}
//初始化资源
static {
//处理I/O操作的多线程事件循环处理器
NioEventLoopGroup boss = new NioEventLoopGroup();
KryoSerializer kryoSerializer = new KryoSerializer();
b.group(boss)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
.handler(new ChannelInitializer<SocketChannel>() {
//配置新创建的通道
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//配置处理器链pipline
//RpcResponse -> ByteBuf
socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
//ByteBuf -> RpcRequest
socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer,RpcRequest.class));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
}
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public RpcResponse sendMessage(RpcRequest request){
try {
ChannelFuture f = b.connect(ip,port).sync();
log.info("客户端连接:{}:{}",ip,port);
Channel channel = f.channel();
if(channel!=null){
channel.writeAndFlush(request).addListener(future -> {
if (future.isSuccess()){
log.info("客户端发送消息:[ {} ]",request);
}else {
log.warn("客户端发送消息失败:{}",future.cause());
}
});
//阻塞等待,直到channel关闭
channel.closeFuture().sync();
//取出管道中保存的相应数据返回
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
return channel.attr(key).get();
}
} catch (InterruptedException e) {
log.error("客户端出错:{}",e.getMessage());
e.printStackTrace();
}
return null;
}
}
自定义客户端处理器
java
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
RpcResponse response = (RpcResponse) msg;
log.info("收到服务端回复并绑定到AttributeMap:{}",response);
//创建一个管道自定义属性的key对象
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
//通过key获得与key关联的属性对象并设置其值
ctx.channel().attr(key).set(response);
//关闭通道
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("客户端Handler出错:{}",cause.getMessage());
cause.printStackTrace();
//关闭连接管道
ctx.close();
}
}
结合Kory进行序列化
依赖项
xml
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.45</version>
</dependency>
定义序列化接口
java
public interface Serializer {
/**
* 序列化
*
* @param obj 要序列化的对象
* @return 字节数组
*/
byte[] serialize(Object obj);
/**
* 反序列化
*
* @param bytes 序列化后的字节数组
* @param clazz 类
* @param <T>
* @return 反序列化的对象
*/
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
实现序列化接口并重写方法
java
public class KryoSerializer implements Serializer {
private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);
//kryo不是线程安全的,所有使用本地线程变量存储
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(()->{
Kryo kryo = new Kryo();
kryo.register(RpcRequest.class);
kryo.register(RpcResponse.class);
//默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
kryo.setReferences(true);
//默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
kryo.setRegistrationRequired(false);
return kryo;
});
@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
Output output = new Output(outputStream);
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output,obj);
kryoThreadLocal.remove();
return output.toBytes();
}catch (Exception e){
e.printStackTrace();
logger.warn("序列化对象 {} 失败",obj);
return null;
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) {
Input input = new Input(inputStream);
Kryo kryo = kryoThreadLocal.get();
T t = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return t;
}catch (Exception e){
e.printStackTrace();
logger.warn("反序列化为 {} 类型失败",clazz.getName());
return null;
}
}
}
编写自定义编码
java
@AllArgsConstructor
public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
private final Serializer serializer;
/**
* 要编码的对象类型
*/
private final Class<?> genericClass;
/**
* 将对象转换为字节码然后写入到 ByteBuf 对象中
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
if(genericClass.isInstance(o)){//是对象
//序列化为字节码
byte[] bytes = serializer.serialize(o);
//写入数据长度
byteBuf.writeInt(bytes.length);
//写入字节数组
byteBuf.writeBytes(bytes);
}
}
}
编写自定义解码器
java
@AllArgsConstructor
public class NettyKryoDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(NettyKryoDecoder.class);
private final Serializer serializer;
/**
* 要解码的对象类型
*/
private final Class<?> genericClass;
/**
* Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部
*/
private static final int BODY_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()>=BODY_LENGTH){
//标记读指针的位置
byteBuf.markReaderIndex();
int len = byteBuf.readInt();
if(len<0 || byteBuf.readableBytes()<0){
logger.error("数据长度或byteBuf没有可读字节");
return;
}
//可读字节数小于消息长度,表示消息不完整,重置读指针
if(byteBuf.readableBytes()<len){
byteBuf.resetReaderIndex();
return;
}
//正常情况
byte[] bytes = new byte[len];
byteBuf.readBytes(bytes);
Object target = serializer.deserialize(bytes, genericClass);
list.add(target);
logger.info("成功反序列化对象:{}",genericClass.getName());
}
}
}