侧边栏壁纸
博主头像
DJ's Blog博主等级

行动起来,活在当下

  • 累计撰写 133 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

【RabbitMQ】插件

Administrator
2022-03-17 / 0 评论 / 0 点赞 / 66 阅读 / 2302 字

【RabbitMQ】插件

安装和使用

  • 下载插件文件
  • 将插件文件放入rabbitmq安装包的plugins目录下
  • 查看插件列表
rabbitmq-plugins list
  • 启用插件
rabbitmq-plugins enable 插件名称
  • 停用插件
rabbitmq-plugins disable 插件名称

插件介绍

rabbitmq_delayed_message_exchange

public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

    // 建立连接
    Connection conn = factory.newConnection();
    // 创建消息通道
    Channel channel = conn.createChannel();

    // 声明x-delayed-message类型的exchange
    Map<String, Object> argss = new HashMap<String, Object>();
    argss.put("x-delayed-type", "direct");
    channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false, false, argss);
    // 声明队列
    channel.queueDeclare("DELAY_QUEUE", false, false, false, null);
    // 绑定交换机与队列
    channel.queueBind("DELAY_QUEUE", "DELAY_EXCHANGE", "DELAY_KEY");
    System.out.println(" Waiting for message....");

    // 创建消费者
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg = new String(body, "UTF-8");
            SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            System.out.println("收到消息:[" + msg + "]\n接收时间:" + sf.format(new Date()));
        }
    };

    // 开始从DELAY_QUEUE中获取消息
    channel.basicConsume("DELAY_QUEUE", true, consumer);
}
0

评论区