amqp-client:
1.依赖jar包
<dependency>
<groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>2.生产者代码Send.
[java]
- package cn.slimsmart.rabbitmq.demo.helloword;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class Send {
- //消息队列名称
- private final static String QUEUE_NAME = "helloword";
- public static void main(String[] args) throws Exception {
- /**
- * 创建连接连接到MabbitMQ
- */
- ConnectionFactory factory = new ConnectionFactory();
- //设置MabbitMQ所在主机ip或者主机名
- factory.setHost("192.168.101.174");
- //指定用户 密码
- factory.setUsername("admin");
- factory.setPassword("admin");
- //指定端口
- factory.setPort(AMQP.PROTOCOL.PORT);
- //创建一个连接
- Connection connection = factory.newConnection();
- //创建一个频道
- Channel channel = connection.createChannel();
- //指定一个队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //发送的消息
- String message = "hello world!";
- //往队列中发出一条消息
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println("Sent Message:'" + message + "'");
- //关闭频道和连接
- channel.close();
- connection.close();
- }
- }
3.消费者代码Receive.java
[java]
- package cn.slimsmart.rabbitmq.demo.helloword;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Receive {
- //消息队列名称
- private final static String QUEUE_NAME = "helloword";
- public static void main(String[] args) throws Exception {
- //打开连接和创建频道,与发送端一样
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.101.174");
- //指定用户 密码
- factory.setUsername("admin");
- factory.setPassword("admin");
- //指定端口
- factory.setPort(AMQP.PROTOCOL.PORT);
- //创建一个连接
- Connection connection = factory.newConnection();
- //创建一个频道
- Channel channel = connection.createChannel();
- //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //创建队列消费者
- QueueingConsumer consumer = new QueueingConsumer(channel);
- //指定消费队列
- channel.basicConsume(QUEUE_NAME, true, consumer);
- while (true)
- {
- //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println("Received Message:'" + message + "'");
- }
- }
- }
如果运行出现如下异常,可能创建的用户没有访问权限。
[java]
- Exception in thread "main" java.io.IOException
- at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
- at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
- at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
- at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:388)
- at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
- at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
- at cn.slimsmart.rabbitmq.demo.test.Test.main(Test.java:18)
- Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
- at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
- at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
- at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
- at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
- at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
- ... 4 more
- Caused by: java.net.SocketException: Connection reset
- at java.net.SocketInputStream.read(Unknown Source)
- at java.net.SocketInputStream.read(Unknown Source)
- at java.io.BufferedInputStream.fill(Unknown Source)
- at java.io.BufferedInputStream.read(Unknown Source)
- at java.io.DataInputStream.readUnsignedByte(Unknown Source)
- at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
- at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
- at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)
实例代码: