本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下:

应用场景
为什么要用redis?
二进制存储、java序列化传输、IO连接数高、连接频繁
一、序列化
这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口;
其代码如下:
package Utils;
import java.io.*;
/**
* Created by Kinglf on 2016/10/17.
*/
public class ObjectUtil {
/**
* 对象转byte[]
* @param obj
* @return
* @throws IOException
*/
public static byte[] object2Bytes(Object obj) throws IOException{
ByteArrayOutputStream bo=new ByteArrayOutputStream();
ObjectOutputStream oo=new ObjectOutputStream(bo);
oo.writeObject(obj);
byte[] bytes=bo.toByteArray();
bo.close();
oo.close();
return bytes;
}
/**
* byte[]转对象
* @param bytes
* @return
* @throws Exception
*/
public static Object bytes2Object(byte[] bytes) throws Exception{
ByteArrayInputStream in=new ByteArrayInputStream(bytes);
ObjectInputStream sIn=new ObjectInputStream(in);
return sIn.readObject();
}
}
二、消息类(实现Serializable接口)
package Model;
import java.io.Serializable;
/**
* Created by Kinglf on 2016/10/17.
*/
public class Message implements Serializable {
private static final long serialVersionUID = -389326121047047723L;
private int id;
private String content;
public Message(int id, String content) {
this.id = id;
this.content = content;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
三、Redis的操作
利用redis做队列,我们采用的是redis中list的push和pop操作;
结合队列的特点:
只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可
java采用Jedis进行Redis的存储和Redis的连接池设置
上代码:
package Utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Created by Kinglf on 2016/10/17.
*/
public class JedisUtil {
private static String JEDIS_IP;
private static int JEDIS_PORT;
private static String JEDIS_PASSWORD;
private static JedisPool jedisPool;
static {
//Configuration自行写的配置文件解析类,继承自Properties
Configuration conf=Configuration.getInstance();
JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
JEDIS_PORT=conf.getInt("jedis.port",6379);
JEDIS_PASSWORD=conf.getString("jedis.password",null);
JedisPoolConfig config=new JedisPoolConfig();
config.setMaxActive(5000);
config.setMaxIdle(256);
config.setMaxWait(5000L);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setMinEvictableIdleTimeMillis(60000L);
config.setTimeBetweenEvictionRunsMillis(3000L);
config.setNumTestsPerEvictionRun(-1);
jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
}
/**
* 获取数据
* @param key
* @return
*/
public static String get(String key){
String value=null;
Jedis jedis=null;
try{
jedis=jedisPool.getResource();
value=jedis.get(key);
}catch (Exception e){
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally {
close(jedis);
}
return value;
}
private static void close(Jedis jedis) {
try{
jedisPool.returnResource(jedis);
}catch (Exception e){
if(jedis.isConnected()){
jedis.quit();
jedis.disconnect();
}
}
}
public static byte[] get(byte[] key){
byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return value;
}
public static void set(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void set(byte[] key, byte[] value, int time) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
jedis.expire(key, time);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void hset(byte[] key, byte[] field, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void hset(String key, String field, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 获取数据
*
* @param key
* @return
*/
public static String hget(String key, String field) {
String value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return value;
}
/**
* 获取数据
*
* @param key
* @return
*/
public static byte[] hget(byte[] key, byte[] field) {
byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return value;
}
public static void hdel(byte[] key, byte[] field) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hdel(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 存储REDIS队列 顺序存储
* @param key reids键名
* @param value 键值
*/
public static void lpush(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.lpush(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 存储REDIS队列 反向存储
* @param key reids键名
* @param value 键值
*/
public static void rpush(byte[] key, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpush(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
* @param key reids键名
* @param destination 键值
*/
public static void rpoplpush(byte[] key, byte[] destination) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpoplpush(key, destination);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
/**
* 获取队列数据
* @param key 键名
* @return
*/
public static List lpopList(byte[] key) {
List list = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
list = jedis.lrange(key, 0, -1);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return list;
}
/**
* 获取队列数据
* @param key 键名
* @return
*/
public static byte[] rpop(byte[] key) {
byte[] bytes = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
bytes = jedis.rpop(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return bytes;
}
public static void hmset(Object key, Map hash) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static void hmset(Object key, Map hash, int time) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
jedis.expire(key.toString(), time);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static List hmget(Object key, String... fields) {
List result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hmget(key.toString(), fields);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static Set hkeys(String key) {
Set result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hkeys(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static List lrange(byte[] key, int from, int to) {
List result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.lrange(key, from, to);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static Map hgetAll(byte[] key) {
Map result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hgetAll(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return result;
}
public static void del(byte[] key) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.del(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}
public static long llen(byte[] key) {
long len = 0;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.llen(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return len;
}
}
四、Configuration主要用于读取Redis的配置信息
package Utils;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* Created by Kinglf on 2016/10/17.
*/
public class Configuration extends Properties {
private static final long serialVersionUID = -2296275030489943706L;
private static Configuration instance = null;
public static synchronized Configuration getInstance() {
if (instance == null) {
instance = new Configuration();
}
return instance;
}
public String getProperty(String key, String defaultValue) {
String val = getProperty(key);
return (val == null || val.isEmpty()) ? defaultValue : val;
}
public String getString(String name, String defaultValue) {
return this.getProperty(name, defaultValue);
}
public int getInt(String name, int defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public long getLong(String name, long defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public float getFloat(String name, float defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}
public double getDouble(String name, double defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}
public byte getByte(String name, byte defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}
public Configuration() {
InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
try {
this.loadFromXML(in);
in.close();
} catch (IOException ioe) {
}
}
}
五、测试
import Model.Message;
import Utils.JedisUtil;
import Utils.ObjectUtil;
import redis.clients.jedis.Jedis;
import java.io.IOException;
/**
* Created by Kinglf on 2016/10/17.
*/
public class TestRedisQueue {
public static byte[] redisKey = "key".getBytes();
static {
try {
init();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void init() throws IOException {
for (int i = 0; i < 1000000; i++) {
Message message = new Message(i, "这是第" + i + "个内容");
JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
}
}
public static void main(String[] args) {
try {
pop();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void pop() throws Exception {
byte[] bytes = JedisUtil.rpop(redisKey);
Message msg = (Message) ObjectUtil.bytes2Object(bytes);
if (msg != null) {
System.out.println(msg.getId() + "----" + msg.getContent());
}
}
}
每执行一次pop()方法,结果如下: <br>1----这是第1个内容 <br>2----这是第2个内容 <br>3----这是第3个内容 <br>4----这是第4个内容
总结
至此,整个Redis消息队列的生产者和消费者代码已经完成
1.Message 需要传送的实体类(需实现Serializable接口)
2.Configuration Redis的配置读取类,继承自Properties
3.ObjectUtil 将对象和byte数组双向转换的工具类
4.Jedis 通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
# java
# Redis实现消息队列
# redis
# 消息队列
# 消息队列实现
# 一口气说出Java 6种延时队列的实现方法(面试官也得服)
# 详解Java阻塞队列(BlockingQueue)的实现原理
# java中栈和队列的实现和API的用法(详解)
# Java 队列实现原理及简单实现代码
# 解析Java中PriorityQueue优先级队列结构的源码及用法
# 剖析Java中阻塞队列的实现原理及应用场景
# java实现消息队列的两种方式(小结)
# 详解Java消息队列-Spring整合ActiveMq
# java优先队列PriorityQueue中Comparator的用法详解
# Java中和队列相关的基本操作
# 连接池
# 这是
# 键名
# 序列化
# 键值
# 主要是
# 的是
# 都要
# 给大家
# 要用
# 弹出
# 写了
# 转化为
# 转换成
# 主要用于
# 配置文件
# 大家多多
# 只允许
# 连接数
# 客户端
相关文章:
,怎么在广州志愿者网站注册?
建站主机与虚拟主机有何区别?如何选择最优方案?
上海网站制作网站建设公司,建筑电工证网上查询系统入口?
如何破解联通资金短缺导致的基站建设难题?
制作网站的模板软件,网站怎么建设?
c# 服务器GC和工作站GC的区别和设置
广平建站公司哪家专业可靠?如何选择?
成都网站制作报价公司,成都工业用气开户费用?
如何制作算命网站,怎么注册算命网站?
江苏网站制作公司有哪些,江苏书法考级官方网站?
网站制作公司哪里好做,成都网站制作公司哪家做得比较好,更正规?
Android自定义listview布局实现上拉加载下拉刷新功能
模具网站制作流程,如何找模具客户?
邀请函制作网站有哪些,有没有做年会邀请函的网站啊?在线制作,模板很多的那种?
公司网站设计制作厂家,怎么创建自己的一个网站?
视频网站app制作软件,有什么好的视频聊天网站或者软件?
较简单的网站制作软件有哪些,手机版网页制作用什么软件?
网站制作知乎推荐,想做自己的网站用什么工具比较好?
小型网站建站如何选择虚拟主机?
韩国代理服务器如何选?解析IP设置技巧与跨境访问优化指南
大连网站制作费用,大连新青年网站,五年四班里的视频怎样下载啊?
如何选择服务器才能高效搭建专属网站?
高防网站服务器:DDoS防御与BGP线路的AI智能防护方案
网站制作服务平台,有什么网站可以发布本地服务信息?
网站制作外包价格怎么算,招聘网站上写的“外包”是什么意思?
如何快速启动建站代理加盟业务?
制作宣传网站的软件,小红书可以宣传网站吗?
行程制作网站有哪些,第三方机票电子行程单怎么开?
已有域名和空间,如何快速搭建网站?
深圳网站制作培训,深圳哪些招聘网站比较好?
网站广告牌制作方法,街上的广告牌,横幅,用PS还是其他软件做的?
建站主机如何选?性能与价格怎样平衡?
广州营销型建站服务商推荐:技术优势与SEO优化解析
如何通过IIS搭建网站并配置访问权限?
建站org新手必看:2024最新搭建流程与模板选择技巧
湖南网站制作公司,湖南上善若水科技有限公司做什么的?
C++如何编写函数模板?(泛型编程入门)
如何用wdcp快速搭建高效网站?
大连网站设计制作招聘信息,大连投诉网站有哪些?
网站制作公司排行榜,抖音怎样做个人官方网站
专业网站建设制作报价,网页设计制作要考什么证?
香港服务器如何优化才能显著提升网站加载速度?
广州网站建站公司选择指南:建站流程与SEO优化关键词解析
网站制作哪家好,cc、.co、.cm哪个域名更适合做网站?
深入理解Android中的xmlns:tools属性
图册素材网站设计制作软件,图册的导出方式有几种?
南阳网站制作公司推荐,小学电子版试卷去哪里找资源好?
建站之星导航如何优化提升用户体验?
定制建站方案优化指南:企业官网开发与建站费用解析
如何在腾讯云服务器快速搭建个人网站?
*请认真填写需求信息,我们会在24小时内与您取得联系。