Netty简单使用

本文总阅读量
本文最后更新于2 分钟前,文中所描述的信息可能已发生改变。

使用Netty框架构建一个简单的服务端与客户端程序进行通信

依赖项

xml
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.2.0.Alpha1</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
</dependency>

自定义消息实体

请求实体

java
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
@ToString
public class RpcRequest  {
    private String interfaceName;
    private String methodName;
}

响应实体

java
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Data
@ToString
public class RpcResponse  {
    private String message;
}

服务端

java
@Slf4j
public class RpcServer{
    private int port;

    public static void main(String[] args) {
        new RpcServer(8888).run();
    }

    public RpcServer(int port) {
        this.port = port;
    }
    public void run(){
        //创建处理i/o事件的线程池
        //处理链接的建立
        NioEventLoopGroup boss = new NioEventLoopGroup();
        //处理存活链接的事件
        NioEventLoopGroup work = new NioEventLoopGroup();
        KryoSerializer kryoSerializer = new KryoSerializer();
        //创建启动辅助类
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss,work)
                    .channel(NioServerSocketChannel.class)
                    // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,
                    // 减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    // 是否开启 TCP 底层心跳机制
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,
                    // 如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
                    .option(ChannelOption.SO_BACKLOG,128)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
                            socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            //绑定端口,等待绑定成功,ip不写默认获取本地ip
            ChannelFuture f = b.bind(port).sync();
            //等到服务端监听端口关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("服务端出错:{}",e.getMessage());
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

自定义服务端处理器

java
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 一个具有原子性的整型变量,被当作线程安全计数器
     */
    private static final AtomicInteger atomicInteger = new AtomicInteger(1);
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            RpcRequest request = (RpcRequest) msg;
            log.info("服务端接收到消息:[{}] 共计接收次数:[{}]",request,atomicInteger.getAndIncrement());
            RpcResponse response = RpcResponse.builder().message("服务端响应").build();
            //将响应数据发送给客户端,并添加一个默认的关闭连接管道的处理
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        } finally {
            //销毁对象msg,因为其是一个引用计数对象
            //如果该对象被writeAndFlush则不需要显示释放,Netty会帮我们释放
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("服务端Handler出错:{}",cause.getMessage());
        cause.printStackTrace();
        ctx.close();
    }
}

客户端

java
@Slf4j
public class RpcClient {
    private String ip;
    private int port;
    private static final Bootstrap b = new Bootstrap();

    public static void main(String[] args) {
        RpcClient rpcClient = new RpcClient("127.0.0.1", 8888);
        for (int i = 0; i < 4; i++) {
            log.info("=====start======");
            RpcRequest msg = RpcRequest.builder().interfaceName("interface").methodName("method").build();
            RpcResponse response = rpcClient.sendMessage(msg);
            log.info("收到消息:{}",response);
            log.info("======end=====");
        }
    }
    //初始化资源
    static {
        //处理I/O操作的多线程事件循环处理器
        NioEventLoopGroup boss = new NioEventLoopGroup();
        KryoSerializer kryoSerializer = new KryoSerializer();
        b.group(boss)
                .channel(NioSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    //配置新创建的通道
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //配置处理器链pipline
                        //RpcResponse -> ByteBuf
                        socketChannel.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
                        //ByteBuf -> RpcRequest
                        socketChannel.pipeline().addLast(new NettyKryoEncoder(kryoSerializer,RpcRequest.class));
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
    }

    public RpcClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public RpcResponse sendMessage(RpcRequest request){
        try {
            ChannelFuture f = b.connect(ip,port).sync();
            log.info("客户端连接:{}:{}",ip,port);
            Channel channel = f.channel();
            if(channel!=null){
                channel.writeAndFlush(request).addListener(future -> {
                    if (future.isSuccess()){
                        log.info("客户端发送消息:[ {} ]",request);
                    }else {
                        log.warn("客户端发送消息失败:{}",future.cause());
                    }
                });
                //阻塞等待,直到channel关闭
                channel.closeFuture().sync();
                //取出管道中保存的相应数据返回
                AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
                return channel.attr(key).get();
            }
        } catch (InterruptedException e) {
            log.error("客户端出错:{}",e.getMessage());
            e.printStackTrace();
        }
        return null;
    }

}

自定义客户端处理器

java
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            RpcResponse response = (RpcResponse) msg;
            log.info("收到服务端回复并绑定到AttributeMap:{}",response);
            //创建一个管道自定义属性的key对象
            AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
            //通过key获得与key关联的属性对象并设置其值
            ctx.channel().attr(key).set(response);
            //关闭通道
            ctx.channel().close();
        } finally {
            ReferenceCountUtil.release(msg);
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("客户端Handler出错:{}",cause.getMessage());
        cause.printStackTrace();
        //关闭连接管道
        ctx.close();
    }
}

结合Kory进行序列化

依赖项

xml
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>5.5.0</version>
</dependency>
<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.45</version>
</dependency>

定义序列化接口

java
public interface Serializer {
    /**
     * 序列化
     *
     * @param obj 要序列化的对象
     * @return 字节数组
     */
    byte[] serialize(Object obj);

    /**
     * 反序列化
     *
     * @param bytes 序列化后的字节数组
     * @param clazz
     * @param <T>
     * @return 反序列化的对象
     */
    <T> T deserialize(byte[] bytes, Class<T> clazz);
}

实现序列化接口并重写方法

java
public class KryoSerializer implements Serializer {
    private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);

    //kryo不是线程安全的,所有使用本地线程变量存储
    private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(()->{
        Kryo kryo = new Kryo();
        kryo.register(RpcRequest.class);
        kryo.register(RpcResponse.class);
        //默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
        kryo.setReferences(true);
        //默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
        kryo.setRegistrationRequired(false);
        return kryo;
    });
    @Override
    public byte[] serialize(Object obj) {
        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
            Output output = new Output(outputStream);
            Kryo kryo = kryoThreadLocal.get();
            kryo.writeObject(output,obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        }catch (Exception e){
            e.printStackTrace();
            logger.warn("序列化对象 {} 失败",obj);
            return null;
        }
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        try (ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes)) {
            Input input = new Input(inputStream);
            Kryo kryo = kryoThreadLocal.get();
            T t = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return t;
        }catch (Exception e){
            e.printStackTrace();
            logger.warn("反序列化为 {} 类型失败",clazz.getName());
            return null;
         }
    }
}

编写自定义编码

java
@AllArgsConstructor
public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
    private final Serializer serializer;
    /**
     * 要编码的对象类型
     */
    private final Class<?> genericClass;
    /**
     * 将对象转换为字节码然后写入到 ByteBuf 对象中
     */
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        if(genericClass.isInstance(o)){//是对象
            //序列化为字节码
            byte[] bytes = serializer.serialize(o);
            //写入数据长度
            byteBuf.writeInt(bytes.length);
            //写入字节数组
            byteBuf.writeBytes(bytes);
        }

    }
}

编写自定义解码器

java
@AllArgsConstructor
public class NettyKryoDecoder extends ByteToMessageDecoder {
    private static final Logger logger = LoggerFactory.getLogger(NettyKryoDecoder.class);

    private final Serializer serializer;
    /**
     * 要解码的对象类型
     */
    private final Class<?> genericClass;
    /**
     * Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部
     */
    private static final int BODY_LENGTH = 4;

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if(byteBuf.readableBytes()>=BODY_LENGTH){
            //标记读指针的位置
            byteBuf.markReaderIndex();
            int len = byteBuf.readInt();
            if(len<0 || byteBuf.readableBytes()<0){
                logger.error("数据长度或byteBuf没有可读字节");
                return;
            }
            //可读字节数小于消息长度,表示消息不完整,重置读指针
            if(byteBuf.readableBytes()<len){
                byteBuf.resetReaderIndex();
                return;
            }

            //正常情况
            byte[] bytes = new byte[len];
            byteBuf.readBytes(bytes);
            Object target = serializer.deserialize(bytes, genericClass);
            list.add(target);
            logger.info("成功反序列化对象:{}",genericClass.getName());
        }
    }
}
Spring中Bean的生命周期及其扩展点
上传图片至七牛云
Valaxy v0.18.5 驱动 | 主题 - Yun v0.18.5
本站总访问量
本站访客数 人次
本站已运行0 天0 小时0 分0 秒后缀