消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削峰等问题。
譬如签到送积分,签到和送积分是两个操作。签到产生了很重要的数据,它可以把消息发送到MQ,然后积分系统需要该数据,从MQ中直接获取即可。这样签到系统就做到了和积分系统解耦,不必担心积分系统挂了怎么办,是不是需要重试等,而这些都可以在积分系统内部自己实现,再者,如果以后另外一套系统也需要该签到数据,直接从MQ中获取即可,实际上与签单系统已无关系。
当做到解耦后,实现异步就是自然而然的事情,如果签到只需要1ms,而送积分,或者其他操作需要500ms,那不可能等所有操作完成之后再去返回数据给用户,这样就做到了异步。
削峰是指当并发访问高峰期,通过MQ达到限流的目的,从而减少对数据库MySQL的压力
ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,底层以数组的形式保存数据(实际上可看作一个循环数组)
ArrayBockingQueue使用场景:
1.先进先出队列:头是先进队的元素,尾是后进队的元素
2.有界队列:初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作
3.队列不支持空元素
阻塞式队列方法的四种形式:
创建一个Socket类的实例,用它来发送和接收字节流,发送时调用getOutputStream方法获取一个java.io.OutputStream对象,接收远程对象发送来的信息可以调用getInputStream方法来返回一个java.io.InputStream对象
ServerSocket与Socket不同,ServerSocket是等待客户端的请求,一旦获得一个连接请求,就创建一个Socket示例来与客户端进行通信。
可以接收任意长度的数据,并且避免乱码的产生
输出流、字符打印流
消息处理中心 Broker
public class Broker {private final static int MAX_SIZE = 3;private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);public static void produce(String msg){if(messageQueue.offer(msg)){System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());} else{System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");}System.out.println("-----------------------------");}public static String consume(){String msg = messageQueue.poll();if(msg != null){System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size());} else {System.out.println("消息处理中心内没有可供消费的消息!");}System.out.println("-----------------------------");return msg;}
}
BrokerSever用来提供Broker类得对外服务,BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动
public class BrokerSever implements Runnable{public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerSever(Socket socket){this.socket = socket;}@Overridepublic void run() {try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){while (true){String str = in.readLine();if (str == null){continue;}System.out.println("接收到原始数据: " + str);if (str.equals("CONSUME")){String message = Broker.consume();out.println(message);out.flush();}else {Broker.produce(str);}}} catch (Exception e){e.printStackTrace();}}public static void main(String[] args) throws Exception{ServerSocket server = new ServerSocket(SERVICE_PORT);while(true){BrokerSever brokerServer = new BrokerSever(server.accept());new Thread(brokerServer).start();}}
}
ProduceClient消息生产者
public class ProduceClient {public static void main(String[] args) throws Exception{MyClient client = new MyClient();client.produce("hello World.");}
}
ConsumeClient消息消费者
public class ConsumeClient {public static void main(String[] args) throws Exception{MyClient client = new MyClient();String message = client.consume();System.out.println("获得的消息为: " + message);}
}
MyClient与消息服务器进行通信
public class MyClient {public static void produce(String message) throws Exception{Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);try(PrintWriter out = new PrintWriter(socket.getOutputStream())){out.println(message);out.flush();}}public static String consume() throws Exception{Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){out.println("CONSUME");out.flush();String message = in.readLine();return message;}}
}
定义一个消息生产者
@Test
public void test() throws Exception {for (int i = 0; i < 1000 ; i++) {rabbitTemplate.convertAndSend("test-queue ", "消息发送);}Thread.sleep(1000 * 1000);
}
使用@RabbitListener注解定义一个消息消费者
@Component
@RabbitListener(queuesToDeclare = @Queue(name = "test-queue"))
public class Consumer {private int count = 0;@RabbitHandlerpublic void receive(String msg, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {Thread.sleep(1000);System.out.println("=====消息处理===>");channel.basicAck(deliveryTag, true);System.out.println("current count is:" + ++count);} catch (Exception e) {}}
}
运行效果
Ready:待消费的消息总数
Unacked:待应答的消息总数。
Total:总数 Ready+Unacked
大家可以参考一下这篇博客:消息队列的作用和整体介绍