Producer.java 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. package com.sf.message.publish;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.sf.util.RabbitMqUtils;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. import java.util.Scanner;
  8. public class Producer {
  9. private static String queueName = "hello1";
  10. public static void main(String[] args) throws Exception{
  11. Channel channel = RabbitMqUtils.getChannel();
  12. channel.queueDeclare(queueName, false, false, false, null);
  13. System.out.println("输入消息:");
  14. Scanner scanner = new Scanner(System.in);
  15. while (scanner.hasNext()){
  16. String message = scanner.next();
  17. // 发送一个持久化的消息
  18. // AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
  19. // .contentType("text/plain").deliveryMode(2).priority(1).build();
  20. // // void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
  21. // channel.basicPublish("", queueName, basicProperties, message.getBytes());
  22. // 发送一个带有header的消息
  23. // Map<String,Object> headers = new HashMap<>();
  24. // headers.put("location","here");
  25. // headers.put("time",System.currentTimeMillis());
  26. // AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().headers(headers).build();
  27. // channel.basicPublish("",queueName,basicProperties,message.getBytes());
  28. // 发送一个带有过期时间的消息
  29. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
  30. .expiration("10000").build();
  31. channel.basicPublish("", queueName, basicProperties, message.getBytes());
  32. }
  33. }
  34. }