1。协议的作用 TCPIP中消息传输基于流的方式,没有边界 协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则2。Redis协议 如果我们要向Redis服务器发送一条setnameNyima的指令,需要遵守如下协议该指令一共有3部分,每条指令之后都要添加回车与换行符3r第一个指令的长度是33r第一个指令是set指令setr下面的指令以此类推4rnamer5rNyimar复制代码 客户端代码如下publicclassRedisClient{staticfinalLoggerlogLoggerFactory。getLogger(StudyServer。class);publicstaticvoidmain(String〔〕args){NioEventLoopGroupgroupnewNioEventLoopGroup();try{ChannelFuturechannelFuturenewBootstrap()。group(group)。channel(NioSocketChannel。class)。handler(newChannelInitializerSocketChannel(){OverrideprotectedvoidinitChannel(SocketChannelch){打印日志ch。pipeline()。addLast(newLoggingHandler(LogLevel。DEBUG));ch。pipeline()。addLast(newChannelInboundHandlerAdapter(){OverridepublicvoidchannelActive(ChannelHandlerContextctx)throwsException{回车与换行符finalbyte〔〕LINE{r,};获得ByteBufByteBufbufferctx。alloc()。buffer();连接建立后,向Redis中发送一条指令,注意添加回车与换行setnameNyimabuffer。writeBytes(3。getBytes());buffer。writeBytes(LINE);buffer。writeBytes(3。getBytes());buffer。writeBytes(LINE);buffer。writeBytes(set。getBytes());buffer。writeBytes(LINE);buffer。writeBytes(4。getBytes());buffer。writeBytes(LINE);buffer。writeBytes(name。getBytes());buffer。writeBytes(LINE);buffer。writeBytes(5。getBytes());buffer。writeBytes(LINE);buffer。writeBytes(Nyima。getBytes());buffer。writeBytes(LINE);ctx。writeAndFlush(buffer);}});}})。connect(newInetSocketAddress(localhost,6379));channelFuture。sync();关闭channelchannelFuture。channel()。close()。sync();}catch(InterruptedExceptione){e。printStackTrace();}finally{关闭groupgroup。shutdownGracefully();}}}复制代码 控制台打印结果1600〔nioEventLoopGroup21〕DEBUGio。netty。handler。logging。LoggingHandler〔id:0x28c994f1,L:127。0。0。1:60792R:localhost127。0。0。1:6379〕WRITE:34B0123456789abcdef000000002a330d0a24330d0a7365740d0a24340d3。。3。。set。。4。000000100a6e616d650d0a24350d0a4e79696d61。name。。5。。Nyima000000200d0a。。复制代码 Redis中查询执行结果 3。HTTP协议 HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec作为服务器端的解码器与编码器,来处理HTTP请求HttpServerCodec中既有请求的解码器HttpRequestDecoder又有响应的编码器HttpResponseEncoderCodec(CodeCombine)一般代表该类既作为编码器又作为解码器publicfinalclassHttpServerCodecextendsCombinedChannelDuplexHandlerHttpRequestDecoder,HttpResponseEncoderimplementsHttpServerUpgradeHandler。SourceCodec复制代码 服务器代码publicclassHttpServer{staticfinalLoggerlogLoggerFactory。getLogger(StudyServer。class);publicstaticvoidmain(String〔〕args){NioEventLoopGroupgroupnewNioEventLoopGroup();newServerBootstrap()。group(group)。channel(NioServerSocketChannel。class)。childHandler(newChannelInitializerSocketChannel(){OverrideprotectedvoidinitChannel(SocketChannelch){ch。pipeline()。addLast(newLoggingHandler(LogLevel。DEBUG));作为服务器,使用HttpServerCodec作为编码器与解码器ch。pipeline()。addLast(newHttpServerCodec());服务器只处理HTTPRequestch。pipeline()。addLast(newSimpleChannelInboundHandlerHttpRequest(){OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,HttpRequestmsg){获得请求urilog。debug(msg。uri());获得完整响应,设置版本号与状态码DefaultFullHttpResponseresponsenewDefaultFullHttpResponse(msg。protocolVersion(),HttpResponseStatus。OK);设置响应内容byte〔〕bytesh1Hello,World!h1。getBytes(StandardCharsets。UTF8);设置响应体长度,避免浏览器一直接收响应内容response。headers()。setInt(CONTENTLENGTH,bytes。length);设置响应体response。content()。writeBytes(bytes);写回响应ctx。writeAndFlush(response);}});}})。bind(8080);}}复制代码 服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可服务器只处理HTTPRequestch。pipeline()。addLast(newSimpleChannelInboundHandlerHttpRequest()复制代码 获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENTLENGTH而一直空转,需要添加CONTENTLENGTH字段,表明响应体中数据的具体长度获得完整响应,设置版本号与状态码DefaultFullHttpResponseresponsenewDefaultFullHttpResponse(msg。protocolVersion(),HttpResponseStatus。OK);设置响应内容byte〔〕bytesh1Hello,World!h1。getBytes(StandardCharsets。UTF8);设置响应体长度,避免浏览器一直接收响应内容response。headers()。setInt(CONTENTLENGTH,bytes。length);设置响应体response。content()。writeBytes(bytes);复制代码 运行结果 浏览器 控制台请求内容1714〔nioEventLoopGroup22〕DEBUGio。netty。handler。logging。LoggingHandler〔id:0x72630ef7,L:0:0:0:0:0:0:0:1:8080R:0:0:0:0:0:0:0:1:55503〕READ:688B0123456789abcdef00000000474554202f66617669636f6e2e69636fGETfavicon。ico0000001020485454502f312e310d0a486f73743aHTTP1。1。。Host:00000020206c6f63616c686f73743a383038300dlocalhost:8080。000000300a436f6e6e656374696f6e3a206b6565。Connection:kee00000040702d616c6976650d0a507261676d613apalive。。Pragma:。。。。响应内容1716〔nioEventLoopGroup22〕DEBUGio。netty。handler。logging。LoggingHandler〔id:0x72630ef7,L:0:0:0:0:0:0:0:1:8080R:0:0:0:0:0:0:0:1:55503〕WRITE:61B0123456789abcdef00000000485454502f312e3120323030204f4b0dHTTP1。1200OK。000000100a436f6e74656e742d4c656e6774683a。ContentLength:000000202032320d0a0d0a3c68313e48656c6c6f22。。。。h1Hello000000302c20576f726c64213c2f68313e,World!h1复制代码4。自定义协议 组成要素魔数:用来在第一时间判定接收的数据是否为无效数据包版本号:可以支持协议的升级序列化算法:消息正文到底采用哪种序列化反序列化方式如:json、protobuf、hessian、jdk指令类型:是登录、注册、单聊、群聊跟业务相关请求序号:为了双工通信,提供异步能力正文长度消息正文编码器与解码器publicclassMessageCodecextendsByteToMessageCodecMessage{Overrideprotectedvoidencode(ChannelHandlerContextctx,Messagemsg,ByteBufout)throwsException{设置魔数4个字节out。writeBytes(newbyte〔〕{N,Y,I,M});设置版本号1个字节out。writeByte(1);设置序列化方式1个字节out。writeByte(1);设置指令类型1个字节out。writeByte(msg。getMessageType());设置请求序号4个字节out。writeInt(msg。getSequenceId());为了补齐为16个字节,填充1个字节的数据out。writeByte(0xff);获得序列化后的msgByteArrayOutputStreambosnewByteArrayOutputStream();ObjectOutputStreamoosnewObjectOutputStream(bos);oos。writeObject(msg);byte〔〕bytesbos。toByteArray();获得并设置正文长度长度用4个字节标识out。writeInt(bytes。length);设置消息正文out。writeBytes(bytes);}Overrideprotectedvoiddecode(ChannelHandlerContextctx,ByteBufin,ListObjectout)throwsException{获取魔数intmagicin。readInt();获取版本号byteversionin。readByte();获得序列化方式byteseqTypein。readByte();获得指令类型bytemessageTypein。readByte();获得请求序号intsequenceIdin。readInt();移除补齐字节in。readByte();获得正文长度intlengthin。readInt();获得正文byte〔〕bytesnewbyte〔length〕;in。readBytes(bytes,0,length);ObjectInputStreamoisnewObjectInputStream(newByteArrayInputStream(bytes));Messagemessage(Message)ois。readObject();将信息放入List中,传递给下一个handlerout。add(message);打印获得的信息正文System。out。println(魔数);System。out。println(magic);System。out。println(版本号);System。out。println(version);System。out。println(序列化方法);System。out。println(seqType);System。out。println(指令类型);System。out。println(messageType);System。out。println(请求序号);System。out。println(sequenceId);System。out。println(正文长度);System。out。println(length);System。out。println(正文);System。out。println(message);}}复制代码编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息publicclassMessageCodecextendsByteToMessageCodecMessage复制代码编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到ByteBuf中解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler 编写测试类publicclassTestCodec{staticfinalorg。slf4j。LoggerlogLoggerFactory。getLogger(StudyServer。class);publicstaticvoidmain(String〔〕args)throwsException{EmbeddedChannelchannelnewEmbeddedChannel();添加解码器,避免粘包半包问题channel。pipeline()。addLast(newLengthFieldBasedFrameDecoder(1024,12,4,0,0));channel。pipeline()。addLast(newLoggingHandler(LogLevel。DEBUG));channel。pipeline()。addLast(newMessageCodec());LoginRequestMessageusernewLoginRequestMessage(Nyima,123);测试编码与解码ByteBufbyteBufByteBufAllocator。DEFAULT。buffer();newMessageCodec()。encode(null,user,byteBuf);channel。writeInbound(byteBuf);}}复制代码测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码 运行结果 Sharable注解 为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作LoggingHandlerloggingHandlernewLoggingHandler(LogLevel。DEBUG);不同的channel中使用同一个handler对象,提高复用率channel1。pipeline()。addLast(loggingHandler);channel2。pipeline()。addLast(loggingHandler);复制代码 但是并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误 为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用Sharable注解来标明,该handler能否在多个channel中共享。 只有带有该注解,才能通过对象的方式被共享,否则无法被共享自定义编解码器能否使用Sharable注解 这需要根据自定义的handler的处理逻辑进行分析 我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加Sharable注解的 但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandlercn。nyimac。study。day8。protocol。MessageCodecisnotallowedtobeshared因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下 这就意味着ByteToMessageCodec不能被多个channel所共享的原因:因为该类的目标是:将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过。所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题 如果想要共享,需要怎么办呢? 继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用Sharable注解了。实现方式与ByteToMessageCodec类似ChannelHandler。SharablepublicclassMessageSharableCodecextendsMessageToMessageCodecByteBuf,Message{Overrideprotectedvoidencode(ChannelHandlerContextctx,Messagemsg,ListObjectout)throwsException{。。。}Overrideprotectedvoiddecode(ChannelHandlerContextctx,ByteBufmsg,ListObjectout)throwsException{。。。}}