最近工作中使用到了消息中間件,另外一個組的同事經(jīng)過評估選擇了Redis stream作為最終選擇。我自己寫的性能測試框架自然也需要接入這套消息系統(tǒng)。所以我也抓緊學習起來。
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結構。 Redis Stream 主要用于消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發(fā)布訂閱 (pub/sub) 來實現(xiàn)消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現(xiàn)網(wǎng)絡斷開、Redis 宕機等,消息就會被丟棄。
之前還沒發(fā)現(xiàn)Redis還有這種使用方法,著實有點少見過怪了。照例我后面會進行一些基本功能的性能測試,下面分享基本功能的使用演示。
準備工作
依賴
如果想自己操作的話,請注意這個版本,因為在我找資料的過程中發(fā)現(xiàn),不同版本的API有不少的差異,算是踩了一些坑。如果你使用其他版本的redis.clients遇到代碼無法運行的時候,可以直接翻看源碼查看相關參數(shù)類型。
Maven依賴:
redis.clients jedis 4.2.3
Gradle依賴:
// https://mvnrepository.com/artifact/redis.clients/jedisimplementation group: ‘redis.clients’, name: ‘jedis’, version: ‘4.2.3’
Redis server版本:Redis 6.2.5。
XADD – 添加消息到末尾
如果key對應的隊列不存在,則會自動創(chuàng)建。
首先我們需要創(chuàng)建一個redis.clients.jedis.params.XAddParams,顧名思義就是查詢參數(shù),這里面有重要的參數(shù):redis.clients.jedis.params.XAddParams#maxLen表示設置隊列的長度,但是不常用。語法如下:
def len = XAddParams.xAddParams()
xadd API使用方式如下:
public static void main(String[] args) { def base = new RedisBase(“127.0.0.1”, 6379) Jedis jedis = base.getJedis() def len = XAddParams.xAddParams() def map = new HashMap() map.put(“FunTester”, Time.getDate() + TAB + 325) jedis.xadd(“fun”, len, map) jedis.close() }
XTRIM – 對流進行修剪,限制長度
這個API就是設置隊列長度。使用方式也非常簡單。
public static void main(String[] args) { def base = new RedisBase(“127.0.0.1”, 6379) Jedis jedis = base.getJedis() def xtrim = jedis.xtrim(“fun”, XTrimParams.xTrimParams().maxLen(10)) output(xtrim) jedis.close() }
返回值是丟棄的消息的數(shù)量。
XDEL – 刪除消息
這個就是刪除某個消息,使用更簡單了。
public static void main(String[] args) { def base = new RedisBase(“127.0.0.1”, 6379) Jedis jedis = base.getJedis() jedis.xdel(“fun”,new StreamEntryID(1653129389004,1)) jedis.close() }
XLEN – 獲取流包含的元素數(shù)量,即消息長度
話不多說了,使用如下:
jedis.xlen(“fun”)
XREAD – 以阻塞或非阻塞方式獲取消息列表
這個要著重介紹一下,因為我用的就是這個,首先我們需要創(chuàng)建一個redis.clients.jedis.params.XReadParams,這里有兩個參數(shù):redis.clients.jedis.params.XReadParams#count和redis.clients.jedis.params.XReadParams#block。前者控制返回數(shù)量,后者控制阻塞時間,如果時間小于0則認為不阻塞,等于0則一直會阻塞,小于0會報錯。不設置該參數(shù)責任無非阻塞模式。PS:數(shù)量不足不會造成阻塞。示例如下:
def block = XReadParams.xReadParams().count(3).block(1000)
還有我們需要redis.clients.jedis.Jedis#xread(redis.clients.jedis.params.XReadParams, java.util.Map)第二個參數(shù),這里常用的兩種:
Map entry = [“fun”: new StreamEntryID()]//獲取歷史消息 Map entry = [“fun”: StreamEntryID.LAST_ENTRY]//獲取在請求之后添加的消息
遍歷消息:
List<Map.Entry> xread = jedis.xread(block, entry) output(xread.size()) Map.Entry get = xread.get(0) def value = get.getValue() value.each { println(it.getID()) println(it.getFields().get(“FunTester”)) }
控制臺響應如下:
16:40:56.065 main redis連接池IP:127.0.0.1,端口:6379,超時設置:500016:40:56.280 main 11653725282325-02022-05-28 16:08:02 3251653725282325-12022-05-28 16:08:02 3251653725282325-22022-05-28 16:08:02 325
XRANGE – 獲取消息列表,會自動過濾已經(jīng)刪除的消息
這個API獲取某個范圍內(nèi)的消息,有個start和end的參數(shù),可以傳String類型的消息ID,也可以傳redis.clients.jedis.StreamEntryID,方法重載的比較多,有興趣可以翻一翻源碼。
jedis.xrange(“fun”, “1653129389045-0”, “1653129389047-0”)
后面會對Redis stream API進行性能測試,歡迎繼續(xù)關注FunTester。