【RabbitMQ】插件
安装和使用
- 下载插件文件
- 将插件文件放入rabbitmq安装包的plugins目录下
- 查看插件列表
rabbitmq-plugins list
rabbitmq-plugins enable 插件名称
rabbitmq-plugins disable 插件名称
插件介绍
rabbitmq_delayed_message_exchange
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 声明x-delayed-message类型的exchange
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false, false, argss);
// 声明队列
channel.queueDeclare("DELAY_QUEUE", false, false, false, null);
// 绑定交换机与队列
channel.queueBind("DELAY_QUEUE", "DELAY_EXCHANGE", "DELAY_KEY");
System.out.println(" Waiting for message....");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("收到消息:[" + msg + "]\n接收时间:" + sf.format(new Date()));
}
};
// 开始从DELAY_QUEUE中获取消息
channel.basicConsume("DELAY_QUEUE", true, consumer);
}
评论区