城市直播房产教育博客汽车
投稿投诉
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

RedisStream类型的使用

8月16日 飞凤谷投稿
  一、背景
  最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。二、redis中Stream类型的特点是可持久化的,可以保证数据不丢失。支持消息的多播、分组消费。支持消息的有序性。三、Stream的结构
  解释:消费者组:ConsumerGroup,即使用XGROUPCREATE命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。同一条消息,只能被这个消费者组中的某个消费者获取。多个消费者之间是相互独立的,互不干扰。消费者:Consumer消费消息。lastdeliveredid:这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个lastdeliveredid的值会往后移动一位,保证消费者不会读取到重复的消息。pendingids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack命令。这样就确保了某个消息一定会被执行一次。消息内容:是一个键值对的格式。Stream中消息的ID:默认情况下,ID使用,redis可以自动生成一个,格式为时间戳序列号,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。四、Stream的命令1、XADD往Stream末尾添加消息1、命令格式xaddkey〔NOMKSTREAM〕〔MAXLENMINID〔〕threshold〔LIMITcount〕〕IDfieldvalue〔fieldvalue。。。〕
  2、举例
  xadd命令返回的是数据的id,xxyy(xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)1、向流中增加一条数据,127。0。0。1:6379xaddstreamkeyusernamezhangsan向streamkey这个流中增加一个username是zhangsan的数据表示自动生成id16359998589120返回的是ID127。0。0。1:6379keys1)streamkey可以看到stream自动创建了127。0。0。1:63792、向流中增加数据,不自动创建流127。0。0。1:6379xaddnotexistsstreamnomkstreamusernamelisi因为指定了nomkstream参数,而notexistsstream之前不存在,所以加入失败(nil)127。0。0。1:6379keys(emptyarray)127。0。0。1:63793、手动指定ID的值127。0。0。1:6379xaddstreamkey11usernamelisi此处id的值是自己传递的11,而不是使用自动生成11返回的是id的值127。0。0。1:63794、设置一个固定大小的Stream1、精确指定Stream的大小
  指定指定Stream的大小比模糊指定Stream的大小会稍微多少消耗一些性能。
  2、模糊指定Stream的大小127。0。0。1:6379xaddstreamkeymaxlen1firstfirst16360010341410127。0。0。1:6379xaddstreamkeymaxlen1secondsecond16360010445060127。0。0。1:6379xaddstreamkeymaxlen1thirdthird16360010578460127。0。0。1:6379xinfostreamstreamkey1)length2)(integer)33)radixtreekeys4)(integer)15)radixtreenodes6)(integer)27)lastgeneratedid8)163600105784609)groups10)(integer)011)firstentry12)1)163600103414102)1)first2)first13)lastentry14)1)163600105784602)1)third2)third127。0。0。1:6379
  模糊指定流的大小,可以看到指定的是1,实际上已经到了3。2、XRANGE查看Stream中的消息1、命令格式xrangekeystartend〔COUNTcount〕
  2、准备数据127。0。0。1:6379multiOK127。0。0。1:6379(TX)xaddstreamkeyusernamezhangsanQUEUED127。0。0。1:6379(TX)xaddstreamkeyusernamelisiQUEUED127。0。0。1:6379(TX)exec1)163600348170602)16360034817061127。0。0。1:6379xaddstreamkeyusernamewangwu16360034990550127。0。0。1:6379
  使用redis的事务操作,获取到同一毫秒产生的多条数据,时间戳一样,序列号不一样3、举例1、获取所有的数据(和的使用)127。0。0。1:6379xrangestreamkey1)1)163600348170602)1)username2)zhangsan2)1)163600348170612)1)username2)lisi3)1)163600349905502)1)username2)wangwu127。0。0。1:6379
  :表示最小id的值
  :表示最大id的值2、获取指定id范围内的数据,闭区间127。0。0。1:6379xrangestreamkey16360034817061163600349905501)1)163600348170612)1)username2)lisi2)1)163600349905502)1)username2)wangwu127。0。0。1:63793、获取指定id范围内的数据,开区间127。0。0。1:6379xrangestreamkey(16360034817060(163600349905501)1)163600348170612)1)username2)lisi127。0。0。1:6379
  (:表示开区间4、获取某个毫秒后所有的数据127。0。0。1:6379xrangestreamkey16360034817061)1)163600348170602)1)username2)zhangsan2)1)163600348170612)1)username2)lisi3)1)163600349905502)1)username2)wangwu127。0。0。1:6379
  直接写毫秒不写后面的序列号即可。5、获取单条数据127。0。0。1:6379xrangestreamkey16360034990550163600349905501)1)163600349905502)1)username2)wangwu127。0。0。1:6379
  start和end的值写的一样即可获取单挑数据。6、获取固定条数的数据127。0。0。1:6379xrangestreamkeycount11)1)163600348170602)1)username2)zhangsan127。0。0。1:6379
  使用count进行限制3、XREVRANGE反向查看Stream中的消息XREVRANGEkeyendstart〔COUNTcount〕
  使用方式和XRANGE类似,略。4、XDEL删除消息1、命令格式xdelkeyID〔ID。。。〕2、准备数据127。0。0。1:6379xaddstreamkeyusernamezhangsan16360041769240127。0。0。1:6379xaddstreamkeyusernamelisi16360041836380127。0。0。1:6379xaddstreamkeyusernamewangwu16360041892110127。0。0。1:63793、举例
  需求:往Stream中加入3条消息,然后删除第2条消息127。0。0。1:6379xdelstreamkey16360041836380(integer)1返回的是删除记录的数量127。0。0。1:6379xrangstreamkey127。0。0。1:6379xrangestreamkey1)1)163600417692402)1)username2)zhangsan2)1)163600418921102)1)username2)wangwu127。0。0。1:6379
  注意:
  需要注意的是,我们从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。6、XLEN查看Stream中元素的长度1、命令格式xlenkey2、举例
  查看Stream中元素的长度127。0。0。1:6379xaddstreamkeyusernamezhangsan16360046905780127。0。0。1:6379xlenstreamkey(integer)1127。0。0。1:6379xlennotexistsstreamkey(integer)0127。0。0。1:6379
  注意:
  如果xlen后方的key不存在则返回0,否则返回元素的个数。7、XTRIM对Stream中的元素进行修剪1、命令格式xtrimkeyMAXLENMINID〔〕threshold〔LIMITcount〕2、准备数据127。0。0。1:6379xaddstreamkeyusernamezhangsan16360097454010127。0。0。1:6379multiOK127。0。0。1:6379(TX)xaddstreamkeyusernamelisiQUEUED127。0。0。1:6379(TX)xaddstreamkeyusernamewangwuQUEUED127。0。0。1:6379(TX)exec1)163600976395502)16360097639551127。0。0。1:6379xaddstreamkeyusernamezhaoliu16360097696250127。0。0。1:63793、举例1、maxlen精确限制127。0。0。1:6379xtrimstreamkeymaxlen2保留最后的2个消息(integer)2127。0。0。1:6379xrangestreamkey可以看到之前加入的2个消息被删除了1)1)163600976395512)1)username2)wangwu2)1)163600976962502)1)username2)zhaoliu127。0。0。1:6379
  上方的意思是,保留streamkey这个Stream中最后的2个消息。2、minid模糊限制
  minid是删除比这个id小的数据,本地测试的时候没有测试出来,略。8、XREAD独立消费消息
  XREAD只是读取消息,读取完之后并不会删除消息。使用XREAD读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。1、命令格式xread〔COUNTcount〕〔BLOCKmilliseconds〕STREAMSkey〔key。。。〕ID〔ID。。。〕
  2、准备数据127。0。0。1:6379xaddstreamkeyusernamezhangsan16360118013650127。0。0。1:6379xaddstreamkeyusernamelisi16360118062610127。0。0。1:6379xaddstreamkeyusernamewangwu16360118109050127。0。0。1:63793、举例1、获取用户名是wangwu的数据127。0。0。1:6379xreadstreamsstreamkey16360118062610此处写的是lisi的id,即读取到的数据需要是163601180626101)1)streamkey2)1)1)163601181090502)1)username2)wangwu2、获取2条数据127。0。0。1:6379xreadcount2streamsstreamkey001)1)streamkey2)1)1)163601180136502)1)username2)zhangsan2)1)163601180626102)1)username2)lisi127。0。0。1:6379
  count限制单次读取最后的消息,因为当前读取可能没有这么多。3、非阻塞读取Stream对尾的数据
  即读取队列尾的下一个消息,在非阻塞模式下始终是nil127。0。0。1:6379xreadstreamsstreamkey(nil)4、阻塞读取Stream对尾的数据
  注意:表示读取队列最新进来的一个消息,不是Stream的最后一个消息。是xreadblock执行后,再次使用xadd添加消息后,xreadblock才会返回。block0表示永久阻塞,当消息到来时,才接触阻塞。block1000表示阻塞1000ms,如果1000ms还没有消息到来,则返回nilxread进行顺序消费当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。xread读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。9、消费者组相关操作1、消费者组命令
  2、准备数据
  1、创建Stream的名称是streamkey
  2、创建2个消息,aa和bb127。0。0。1:6379xaddstreamkeyaaaa16363626191250127。0。0。1:6379xaddstreamkeybbbb163636262319103、创建消费者组1、创建一个从头开始消费的消费者组xgroupcreatestreamkey(Stream名)g1(消费者组名)00(表示从头开始消费)2、创建一个从Stream最新的一个消息消费的消费者组xgroupcreatestreamkeyg2
  表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。4、创建一个从某个消息之后消费的消费者组xgroupcreatestreamkeyg31636362619125016363626191250这个是上方aa消息的id的值
  16363626191250某个消息的具体的ID,这个g3消费者组中的消息都是大于这个id的消息。3、从消费者中读取消息127。0。0。1:6379xreadgroupgroupg1(消费组名)c1(消费者名,自动创建)count3(读取3条)streamsstreamkey(Stream名)(从该消费者组中还未分配给另外的消费者的消息开始读取)1)1)streamkey2)1)1)163636261912502)1)aa2)aa2)1)163636262319102)1)bb2)bb127。0。0。1:6379xreadgroupgroupg2c1count3streamsstreamkey(nil)返回nil是因为g2消费组是从最新的一条信息开始读取(创建消费者组时使用了),需要在另外的窗口执行xadd命令,才可以再次读取到消息127。0。0。1:6379xreadgroupgroupg3c1count3streamsstreamkey只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。1)1)streamkey2)1)1)163636262319102)1)bb2)bb127。0。0。1:63794、读取消费者的pending消息127。0。0。1:6379xgroupcreatestreamkeyg400OK127。0。0。1:6379xinfoconsumersstreamkeyg11)1)name2)c13)pending4)(integer)25)idle6)(integer)88792127。0。0。1:6379xinfoconsumersstreamkeyg4(emptyarray)127。0。0。1:6379xreadgroupgroupg1c1count1streamsstreamkey163636261912501)1)streamkey2)1)1)163636262319102)1)bb2)bb127。0。0。1:6379xreadgroupgroupg4c1count1block0streamsstreamkey163636261912501)1)streamkey2)(emptyarray)127。0。0。1:6379
  5、转移消费者的消息127。0。0。1:6379xpendingstreamkeyg110c11)1)163636261912502)c13)(integer)26861834)(integer)12)1)163636262319102)c13)(integer)1022744)(integer)7127。0。0。1:6379xpendingstreamkeyg110c2(emptyarray)127。0。0。1:6379xclaimstreamkeyg1c2102274163636262319101)1)163636262319102)1)bb2)bb127。0。0。1:6379xpendingstreamkeyg110c21)1)163636262319102)c23)(integer)176164)(integer)8127。0。0。1:6379
  也可以通过xautoclaim来实现。6、一些监控命令1、查看消费组中消费者的pending消息127。0。0。1:6379xpendingstreamkeyg110c21)1)163636262319102)c23)(integer)12476804)(integer)8127。0。0。1:63792、查看消费组中的消费者信息127。0。0。1:6379xinfoconsumersstreamkeyg11)1)name2)c13)pending4)(integer)15)idle6)(integer)14748642)1)name2)c23)pending4)(integer)15)idle6)(integer)1290069127。0。0。1:63793、查看消费组信息127。0。0。1:6379xinfogroupsstreamkey1)1)name2)g13)consumers4)(integer)25)pending6)(integer)27)lastdeliveredid8)163636262319102)1)name2)g23)consumers。。。。。。4、查看Stream信息127。0。0。1:6379xinfostreamstreamkey1)length2)(integer)23)radixtreekeys4)(integer)15)radixtreenodes6)(integer)27)lastgeneratedid8)163636262319109)groups10)(integer)411)firstentry12)1)163636261912502)1)aa2)aa13)lastentry14)1)163636262319102)1)bb2)bb127。0。0。1:6379
投诉 评论 转载

五官我们每个人,都随时随地随身带着五官,永不离开。眼、鼻、眉、耳、口,各有不同的姿态,各有所长,各有所短。眼睛在五官中,样子算得上漂亮。浅黑的眼线和洁白的眼底,中间亮晶晶的眼……能让人笑死的毒药是否存在笑死是一种罕见的死因,通常是由一阵大笑引起的心脏骤停或者窒息所致。有时候看网络小说会经常看到什么痒痒粉啊,腐蚀尸体的药水啊,还有一直让人疯狂大笑停止不了的药,这种药可能存……美国建立天地结合的卫星监视系统,网友呼吁给卫星都装上激光武器为什么美国卫星会突然靠近中国卫星,它的目的是什么?为什么网友呼吁要给卫星都装上激光武器?中国卫星面对美国卫星,又做了哪些应对措施呢?以至于让美国高喊:这很危险。我们都知道……RedisStream类型的使用一、背景最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。二、red……从小培养孩子的用眼习惯1、培养他们正确的写字、读书姿势,不要趴在桌子上或扭着身体。书本和眼睛应保持一市尺。学校课桌椅应适合学生身材。2、看书写字时间不宜过久,持续11。5小时后要有一个短时间的……美甲的画法教程图解创意俏皮活力美甲教程俏皮活力美甲画法教程有的女生认为可爱是一种充满活力年轻的风格,而质感是一种成熟稳重的风格,这两种风格是不能搭配在一起的。事实可不是这样,今天小编就带来一款可爱与质感共存的……自助共享茶室模式下的茶馆新业态,24小时在线不打烊无人值守模式是基于物联网智能设备与互联网技术结合而成。目前,无人值守模式正逐步渗透于各个行业,比如无人便利超市、无人自助餐厅、无人售货机(售卖酒水、饮料等)、无人自助洗车、无人……蜂蜜柠檬水减肥法详解想瘦身的看看减肥瘦身是这个时代的热门话题之一,所以关于减肥的方法也是各式各样层出不穷的,蜂蜜柠檬水减肥法就是其中的一种,不过这种减肥方式具体该怎么做效果好呢?柠檬蜂蜜水减肥法(一)……40!40!还是40!国乒不给面子全剃光头坚决给对手送蛋北京时间11月24日休斯顿世乒赛拉开战幕,在第一个比赛日中,国乒全线飘红,在出战的混双、男单、女单项目中全部取得胜利,轻松晋级。值得注意的是,本来按计划出战混双的林高远和王曼昱……恶劣!韩国短道黑脚绊倒武大靖任子威,中国痛失500米金牌当地时间11月20日,20212022赛季国际滑联短道速滑世界杯匈牙利德布勒森站展开第一个决赛日的争夺。中国队表现不错,任子威夺得男子1500米的金牌,他还与武大靖进入了500……CCTV5直播!辽宁PK首钢重头戏,林书豪回归首秀双塔对位让CBA第二阶段比赛将在12月25日正式打响,CBA官方也是公布了第二阶段的赛程。作为联盟战绩第一的辽宁男篮,第二阶段的赛程十分残酷,揭幕战就要面对北京首钢,之后还要面对广东、深……小满节气小满时节各地习俗大曝光二十四节气中的小满节气。为了迎接小满节气的到来,全国各地都开始紧锣密鼓的忙活起来,但是各地由于风俗习惯不同,所以对于小满节气养生也有着一定的影响。那么我国各地小满节气都各有什么……
女教师着装惹争议怎么穿才能为人师表8月12日金属现货报价铜价铝价铅价锌价锡价镍价钢铁生完宝宝性冷淡是怎么回事基于人事档案审核工作的思考芦荟清热解毒,使用时种种禁忌要牢记我和奶奶的故事董明珠又放狠话,格力手机不比苹果差准妈妈需重视夏日防暑周鸿祎数字安全元年即将开启,元宇宙风险升级冰冰造句用冰冰造句大全引领运动健康市场潮流,荣耀这两款穿戴设备为何让我爱不释手?心情不好问问自己这个问题吧高职护理生物化学课程的项目化教学设计构想探析祛斑用什么方法最好维生素能去祛斑用对了想不美都不小学年级组长工作总结属于自己的路作文700字失眠雅思口语八做八不做我的同学作文300字文滑雪肚子大喝减肥茶有用吗篮网GM哈登夏季一直减肥养伤可提前顶薪续约3年1。61亿手中造句用手中造句大全数据产品经理,该如何搭建数据平台?

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找