Spring Boot与RabbitMQ的整合消息确认

Java 发表评论
消息生产者和消费者
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.io.IOException;
import java.util.Date;
import java.util.UUID;

/**
 * Created by yangliu on 2018/4/8.
 */
@Controller
@RequestMapping("/rabbitMq")
public class TestController {
    private Logger logger= LoggerFactory.getLogger(TestController.class);
    @Autowired
    RabbitAdmin rabbitAdmin;

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack());
        rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback());
        return rabbitAdmin;
    }


    @RequestMapping("/sendMq")
    @ResponseBody
    public String send(String name) throws Exception {
        String context = "hello "+name+" --" + new Date();
        String sendStr;
        for(int i=1;i<=100;i++){
            sendStr="第["+i+"]个 hello  --" + new Date();
            logger.debug("HelloSender: " + sendStr);
            sendMessage("myqueue",sendStr);
            //Thread.sleep(1000);
        }
        return context;
    }

    /**
     * 方式一:动态声明exchange和queue它们的绑定关系  rabbitAdmin
     * @param exchangeName
     * @param queueName
     */
    protected void declareBinding(String exchangeName, String queueName) {
        if (rabbitAdmin.getQueueProperties(queueName) == null) {
            /*  queue 队列声明
            durable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; durable=false,相反
            auto-delete=true:无消费者时,队列自动删除; auto-delete=false:无消费者时,队列不会自动删除
            排他性,exclusive=true:首次申明的connection连接下可见; exclusive=false:所有connection连接下*/
            Queue queue = new Queue(queueName, true, false, false, null);
            rabbitAdmin.declareQueue(queue);
            TopicExchange directExchange = new TopicExchange(exchangeName);
            rabbitAdmin.declareExchange(directExchange);//声明exchange
            Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);    //将queue绑定到exchange
            rabbitAdmin.declareBinding(binding);      //声明绑定关系
            rabbitAdmin.getRabbitTemplate().setMandatory(true);
            rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack());//消息确认
            rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback());//确认后回调
        } else {
            rabbitAdmin.getRabbitTemplate().setQueue(queueName);
            rabbitAdmin.getRabbitTemplate().setExchange(queueName);
            rabbitAdmin.getRabbitTemplate().setRoutingKey(queueName);
        }
    }

    /**
     * 发送消息
     * @param queueName
     * @param message
     * @throws Exception
     */
    public void sendMessage(String queueName, String message) throws Exception {
        declareBinding(queueName, queueName);
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitAdmin.getRabbitTemplate().convertAndSend(queueName, queueName, message,correlationId);
        logger.debug("[rabbitmq-sendMessage]queueName:{} ,uuid:{},msg:{}",queueName,correlationId.getId(),message);
    }

    /**
     * 消费者
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        Queue queue = new Queue("myqueue", true, false, false, null);
        container.setQueues(queue);
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
        container.setMessageListener(new ChannelAwareMessageListener() {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {

                    byte[] body = message.getBody();
                    try {
                        //业务逻辑
                        logger.info("消费 receive msg : " + new String(body));
                        // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
                        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手动确认确认消息成功消费
                    } catch (Exception e) {
                        logger.info("消费失败: " + new String(body));
                        // ack返回false,并重新回到队列,api里面解释得很清楚
                        try {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }

        }
        });
        return container;
    }


    /*

    //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    //ack返回false,并重新回到队列,api里面解释得很清楚
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    //拒绝消息
    channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

    如果消息没有到exchange,则confirm回调,ack=false
    如果消息到达exchange,则confirm回调,ack=true
    exchange到queue成功,则不回调return
    exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
    */
}

失败后return回调:

 

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
    @Override  
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        System.out.println("确认后回调return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"
                +replyText+",exchange:"+exchange+",routingKey:"+routingKey);
    }
}  

确认后回调: 

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                System.out.println("消息确认成功cause"+cause);
            } else {
                //处理丢失的消息
                System.out.println("消息确认失败:"+correlationData.getId()+"#cause"+cause);
            }
        }
    }

发表评论

电子邮件地址不会被公开。 必填项已用*标注

昵称 *