一、背景 最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。二、redis中Stream类型的特点是可持久化的,可以保证数据不丢失。支持消息的多播、分组消费。支持消息的有序性。三、Stream的结构 解释:消费者组:ConsumerGroup,即使用XGROUPCREATE命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。同一条消息,只能被这个消费者组中的某个消费者获取。多个消费者之间是相互独立的,互不干扰。消费者:Consumer消费消息。lastdeliveredid:这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个lastdeliveredid的值会往后移动一位,保证消费者不会读取到重复的消息。pendingids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack命令。这样就确保了某个消息一定会被执行一次。消息内容:是一个键值对的格式。Stream中消息的ID:默认情况下,ID使用,redis可以自动生成一个,格式为时间戳序列号,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。四、Stream的命令1、XADD往Stream末尾添加消息1、命令格式xaddkey〔NOMKSTREAM〕〔MAXLENMINID〔〕threshold〔LIMITcount〕〕IDfieldvalue〔fieldvalue。。。〕 2、举例 xadd命令返回的是数据的id,xxyy(xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)1、向流中增加一条数据,127。0。0。1:6379xaddstreamkeyusernamezhangsan向streamkey这个流中增加一个username是zhangsan的数据表示自动生成id16359998589120返回的是ID127。0。0。1:6379keys1)streamkey可以看到stream自动创建了127。0。0。1:63792、向流中增加数据,不自动创建流127。0。0。1:6379xaddnotexistsstreamnomkstreamusernamelisi因为指定了nomkstream参数,而notexistsstream之前不存在,所以加入失败(nil)127。0。0。1:6379keys(emptyarray)127。0。0。1:63793、手动指定ID的值127。0。0。1:6379xaddstreamkey11usernamelisi此处id的值是自己传递的11,而不是使用自动生成11返回的是id的值127。0。0。1:63794、设置一个固定大小的Stream1、精确指定Stream的大小 指定指定Stream的大小比模糊指定Stream的大小会稍微多少消耗一些性能。 2、模糊指定Stream的大小127。0。0。1:6379xaddstreamkeymaxlen1firstfirst16360010341410127。0。0。1:6379xaddstreamkeymaxlen1secondsecond16360010445060127。0。0。1:6379xaddstreamkeymaxlen1thirdthird16360010578460127。0。0。1:6379xinfostreamstreamkey1)length2)(integer)33)radixtreekeys4)(integer)15)radixtreenodes6)(integer)27)lastgeneratedid8)163600105784609)groups10)(integer)011)firstentry12)1)163600103414102)1)first2)first13)lastentry14)1)163600105784602)1)third2)third127。0。0。1:6379 模糊指定流的大小,可以看到指定的是1,实际上已经到了3。2、XRANGE查看Stream中的消息1、命令格式xrangekeystartend〔COUNTcount〕 2、准备数据127。0。0。1:6379multiOK127。0。0。1:6379(TX)xaddstreamkeyusernamezhangsanQUEUED127。0。0。1:6379(TX)xaddstreamkeyusernamelisiQUEUED127。0。0。1:6379(TX)exec1)163600348170602)16360034817061127。0。0。1:6379xaddstreamkeyusernamewangwu16360034990550127。0。0。1:6379 使用redis的事务操作,获取到同一毫秒产生的多条数据,时间戳一样,序列号不一样3、举例1、获取所有的数据(和的使用)127。0。0。1:6379xrangestreamkey1)1)163600348170602)1)username2)zhangsan2)1)163600348170612)1)username2)lisi3)1)163600349905502)1)username2)wangwu127。0。0。1:6379 :表示最小id的值 :表示最大id的值2、获取指定id范围内的数据,闭区间127。0。0。1:6379xrangestreamkey16360034817061163600349905501)1)163600348170612)1)username2)lisi2)1)163600349905502)1)username2)wangwu127。0。0。1:63793、获取指定id范围内的数据,开区间127。0。0。1:6379xrangestreamkey(16360034817060(163600349905501)1)163600348170612)1)username2)lisi127。0。0。1:6379 (:表示开区间4、获取某个毫秒后所有的数据127。0。0。1:6379xrangestreamkey16360034817061)1)163600348170602)1)username2)zhangsan2)1)163600348170612)1)username2)lisi3)1)163600349905502)1)username2)wangwu127。0。0。1:6379 直接写毫秒不写后面的序列号即可。5、获取单条数据127。0。0。1:6379xrangestreamkey16360034990550163600349905501)1)163600349905502)1)username2)wangwu127。0。0。1:6379 start和end的值写的一样即可获取单挑数据。6、获取固定条数的数据127。0。0。1:6379xrangestreamkeycount11)1)163600348170602)1)username2)zhangsan127。0。0。1:6379 使用count进行限制3、XREVRANGE反向查看Stream中的消息XREVRANGEkeyendstart〔COUNTcount〕 使用方式和XRANGE类似,略。4、XDEL删除消息1、命令格式xdelkeyID〔ID。。。〕2、准备数据127。0。0。1:6379xaddstreamkeyusernamezhangsan16360041769240127。0。0。1:6379xaddstreamkeyusernamelisi16360041836380127。0。0。1:6379xaddstreamkeyusernamewangwu16360041892110127。0。0。1:63793、举例 需求:往Stream中加入3条消息,然后删除第2条消息127。0。0。1:6379xdelstreamkey16360041836380(integer)1返回的是删除记录的数量127。0。0。1:6379xrangstreamkey127。0。0。1:6379xrangestreamkey1)1)163600417692402)1)username2)zhangsan2)1)163600418921102)1)username2)wangwu127。0。0。1:6379 注意: 需要注意的是,我们从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。6、XLEN查看Stream中元素的长度1、命令格式xlenkey2、举例 查看Stream中元素的长度127。0。0。1:6379xaddstreamkeyusernamezhangsan16360046905780127。0。0。1:6379xlenstreamkey(integer)1127。0。0。1:6379xlennotexistsstreamkey(integer)0127。0。0。1:6379 注意: 如果xlen后方的key不存在则返回0,否则返回元素的个数。7、XTRIM对Stream中的元素进行修剪1、命令格式xtrimkeyMAXLENMINID〔〕threshold〔LIMITcount〕2、准备数据127。0。0。1:6379xaddstreamkeyusernamezhangsan16360097454010127。0。0。1:6379multiOK127。0。0。1:6379(TX)xaddstreamkeyusernamelisiQUEUED127。0。0。1:6379(TX)xaddstreamkeyusernamewangwuQUEUED127。0。0。1:6379(TX)exec1)163600976395502)16360097639551127。0。0。1:6379xaddstreamkeyusernamezhaoliu16360097696250127。0。0。1:63793、举例1、maxlen精确限制127。0。0。1:6379xtrimstreamkeymaxlen2保留最后的2个消息(integer)2127。0。0。1:6379xrangestreamkey可以看到之前加入的2个消息被删除了1)1)163600976395512)1)username2)wangwu2)1)163600976962502)1)username2)zhaoliu127。0。0。1:6379 上方的意思是,保留streamkey这个Stream中最后的2个消息。2、minid模糊限制 minid是删除比这个id小的数据,本地测试的时候没有测试出来,略。8、XREAD独立消费消息 XREAD只是读取消息,读取完之后并不会删除消息。使用XREAD读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。1、命令格式xread〔COUNTcount〕〔BLOCKmilliseconds〕STREAMSkey〔key。。。〕ID〔ID。。。〕 2、准备数据127。0。0。1:6379xaddstreamkeyusernamezhangsan16360118013650127。0。0。1:6379xaddstreamkeyusernamelisi16360118062610127。0。0。1:6379xaddstreamkeyusernamewangwu16360118109050127。0。0。1:63793、举例1、获取用户名是wangwu的数据127。0。0。1:6379xreadstreamsstreamkey16360118062610此处写的是lisi的id,即读取到的数据需要是163601180626101)1)streamkey2)1)1)163601181090502)1)username2)wangwu2、获取2条数据127。0。0。1:6379xreadcount2streamsstreamkey001)1)streamkey2)1)1)163601180136502)1)username2)zhangsan2)1)163601180626102)1)username2)lisi127。0。0。1:6379 count限制单次读取最后的消息,因为当前读取可能没有这么多。3、非阻塞读取Stream对尾的数据 即读取队列尾的下一个消息,在非阻塞模式下始终是nil127。0。0。1:6379xreadstreamsstreamkey(nil)4、阻塞读取Stream对尾的数据 注意:表示读取队列最新进来的一个消息,不是Stream的最后一个消息。是xreadblock执行后,再次使用xadd添加消息后,xreadblock才会返回。block0表示永久阻塞,当消息到来时,才接触阻塞。block1000表示阻塞1000ms,如果1000ms还没有消息到来,则返回nilxread进行顺序消费当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。xread读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。9、消费者组相关操作1、消费者组命令 2、准备数据 1、创建Stream的名称是streamkey 2、创建2个消息,aa和bb127。0。0。1:6379xaddstreamkeyaaaa16363626191250127。0。0。1:6379xaddstreamkeybbbb163636262319103、创建消费者组1、创建一个从头开始消费的消费者组xgroupcreatestreamkey(Stream名)g1(消费者组名)00(表示从头开始消费)2、创建一个从Stream最新的一个消息消费的消费者组xgroupcreatestreamkeyg2 表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。4、创建一个从某个消息之后消费的消费者组xgroupcreatestreamkeyg31636362619125016363626191250这个是上方aa消息的id的值 16363626191250某个消息的具体的ID,这个g3消费者组中的消息都是大于这个id的消息。3、从消费者中读取消息127。0。0。1:6379xreadgroupgroupg1(消费组名)c1(消费者名,自动创建)count3(读取3条)streamsstreamkey(Stream名)(从该消费者组中还未分配给另外的消费者的消息开始读取)1)1)streamkey2)1)1)163636261912502)1)aa2)aa2)1)163636262319102)1)bb2)bb127。0。0。1:6379xreadgroupgroupg2c1count3streamsstreamkey(nil)返回nil是因为g2消费组是从最新的一条信息开始读取(创建消费者组时使用了),需要在另外的窗口执行xadd命令,才可以再次读取到消息127。0。0。1:6379xreadgroupgroupg3c1count3streamsstreamkey只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。1)1)streamkey2)1)1)163636262319102)1)bb2)bb127。0。0。1:63794、读取消费者的pending消息127。0。0。1:6379xgroupcreatestreamkeyg400OK127。0。0。1:6379xinfoconsumersstreamkeyg11)1)name2)c13)pending4)(integer)25)idle6)(integer)88792127。0。0。1:6379xinfoconsumersstreamkeyg4(emptyarray)127。0。0。1:6379xreadgroupgroupg1c1count1streamsstreamkey163636261912501)1)streamkey2)1)1)163636262319102)1)bb2)bb127。0。0。1:6379xreadgroupgroupg4c1count1block0streamsstreamkey163636261912501)1)streamkey2)(emptyarray)127。0。0。1:6379 5、转移消费者的消息127。0。0。1:6379xpendingstreamkeyg110c11)1)163636261912502)c13)(integer)26861834)(integer)12)1)163636262319102)c13)(integer)1022744)(integer)7127。0。0。1:6379xpendingstreamkeyg110c2(emptyarray)127。0。0。1:6379xclaimstreamkeyg1c2102274163636262319101)1)163636262319102)1)bb2)bb127。0。0。1:6379xpendingstreamkeyg110c21)1)163636262319102)c23)(integer)176164)(integer)8127。0。0。1:6379 也可以通过xautoclaim来实现。6、一些监控命令1、查看消费组中消费者的pending消息127。0。0。1:6379xpendingstreamkeyg110c21)1)163636262319102)c23)(integer)12476804)(integer)8127。0。0。1:63792、查看消费组中的消费者信息127。0。0。1:6379xinfoconsumersstreamkeyg11)1)name2)c13)pending4)(integer)15)idle6)(integer)14748642)1)name2)c23)pending4)(integer)15)idle6)(integer)1290069127。0。0。1:63793、查看消费组信息127。0。0。1:6379xinfogroupsstreamkey1)1)name2)g13)consumers4)(integer)25)pending6)(integer)27)lastdeliveredid8)163636262319102)1)name2)g23)consumers。。。。。。4、查看Stream信息127。0。0。1:6379xinfostreamstreamkey1)length2)(integer)23)radixtreekeys4)(integer)15)radixtreenodes6)(integer)27)lastgeneratedid8)163636262319109)groups10)(integer)411)firstentry12)1)163636261912502)1)aa2)aa13)lastentry14)1)163636262319102)1)bb2)bb127。0。0。1:6379