【项目实战典型案例】16.消息队列作用和意义
创始人
2025-05-28 13:41:56

目录

  • 一:背景介绍
  • 二:消息队列
    • 消息队列简介
    • 解耦
    • 异步
    • 流量削峰
    • 原理
      • 1.ArrayBockingQueue:
      • 2.Socket
      • 3.SeverSocket
      • 4.Java IO操作——BufferedReader
      • 5.java.io.PrintWriter
  • 三:实现过程
    • 解耦和异步
    • 流量削峰
  • 四:总结

一:背景介绍

在这里插入图片描述

二:消息队列

消息队列简介

  • MQ全程为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。
  • 消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削峰等问题。

解耦

譬如签到送积分,签到和送积分是两个操作。签到产生了很重要的数据,它可以把消息发送到MQ,然后积分系统需要该数据,从MQ中直接获取即可。这样签到系统就做到了和积分系统解耦,不必担心积分系统挂了怎么办,是不是需要重试等,而这些都可以在积分系统内部自己实现,再者,如果以后另外一套系统也需要该签到数据,直接从MQ中获取即可,实际上与签单系统已无关系。

异步

当做到解耦后,实现异步就是自然而然的事情,如果签到只需要1ms,而送积分,或者其他操作需要500ms,那不可能等所有操作完成之后再去返回数据给用户,这样就做到了异步。

流量削峰

削峰是指当并发访问高峰期,通过MQ达到限流的目的,从而减少对数据库MySQL的压力

原理

1.ArrayBockingQueue:

ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,底层以数组的形式保存数据(实际上可看作一个循环数组)
ArrayBockingQueue使用场景:
1.先进先出队列:头是先进队的元素,尾是后进队的元素
2.有界队列:初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作
3.队列不支持空元素
阻塞式队列方法的四种形式:
在这里插入图片描述

2.Socket

创建一个Socket类的实例,用它来发送和接收字节流,发送时调用getOutputStream方法获取一个java.io.OutputStream对象,接收远程对象发送来的信息可以调用getInputStream方法来返回一个java.io.InputStream对象

3.SeverSocket

ServerSocket与Socket不同,ServerSocket是等待客户端的请求,一旦获得一个连接请求,就创建一个Socket示例来与客户端进行通信。

4.Java IO操作——BufferedReader

可以接收任意长度的数据,并且避免乱码的产生

5.java.io.PrintWriter

输出流、字符打印流

三:实现过程

解耦和异步

消息处理中心 Broker

public class Broker {private final static int MAX_SIZE = 3;private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);public static void produce(String msg){if(messageQueue.offer(msg)){System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());} else{System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");}System.out.println("-----------------------------");}public static String consume(){String msg = messageQueue.poll();if(msg != null){System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size());} else {System.out.println("消息处理中心内没有可供消费的消息!");}System.out.println("-----------------------------");return msg;}
}

BrokerSever用来提供Broker类得对外服务,BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动

public class BrokerSever implements Runnable{public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerSever(Socket socket){this.socket = socket;}@Overridepublic void run() {try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){while (true){String str = in.readLine();if (str == null){continue;}System.out.println("接收到原始数据: " + str);if (str.equals("CONSUME")){String message = Broker.consume();out.println(message);out.flush();}else {Broker.produce(str);}}} catch (Exception e){e.printStackTrace();}}public static void main(String[] args) throws Exception{ServerSocket server = new ServerSocket(SERVICE_PORT);while(true){BrokerSever brokerServer = new BrokerSever(server.accept());new Thread(brokerServer).start();}}
}

ProduceClient消息生产者

public class ProduceClient {public static void main(String[] args) throws Exception{MyClient client = new MyClient();client.produce("hello World.");}
}

ConsumeClient消息消费者

public class ConsumeClient {public static void main(String[] args) throws Exception{MyClient client = new MyClient();String message = client.consume();System.out.println("获得的消息为: " + message);}
}

MyClient与消息服务器进行通信

public class MyClient {public static void produce(String message) throws Exception{Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);try(PrintWriter out = new PrintWriter(socket.getOutputStream())){out.println(message);out.flush();}}public static String consume() throws Exception{Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){out.println("CONSUME");out.flush();String message = in.readLine();return message;}}
}

流量削峰

定义一个消息生产者

@Test
public void test() throws Exception {for (int i = 0; i < 1000 ; i++) {rabbitTemplate.convertAndSend("test-queue ",  "消息发送);}Thread.sleep(1000 * 1000);
}

使用@RabbitListener注解定义一个消息消费者

@Component
@RabbitListener(queuesToDeclare = @Queue(name = "test-queue"))
public class Consumer {private int count = 0;@RabbitHandlerpublic void receive(String msg, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {Thread.sleep(1000);System.out.println("=====消息处理===>");channel.basicAck(deliveryTag, true);System.out.println("current count is:" + ++count);} catch (Exception e) {}}
}

运行效果
在这里插入图片描述
Ready:待消费的消息总数

Unacked:待应答的消息总数。

Total:总数 Ready+Unacked

四:总结

大家可以参考一下这篇博客:消息队列的作用和整体介绍

相关内容

热门资讯

V观财报|“四连板”均瑶健康:...   中新经纬5月29日电 29日,均瑶健康再发股票交易异常波动公告。  公告显示,均瑶健康于5月28...
OceanMind海睿思受邀参... 近日,由江苏省工业和信息化厅、宜兴市人民政府指导,宜兴市工业和信息化局、...
SpringBoot Rabb... SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见。 程序员每天的C...
为了赶走烂物业,我被举报到纪委... 我本是为了维护自身权益,试图赶走那令人不满的烂物业。然而,未曾想此举竟让我陷入了意想不到的境地。不知...
*ST围海:签订1.82亿元重... 新京报贝壳财经讯 5月29日,*ST围海公告,公司与文成县水利发展有限公司签署了“文成县城防洪提升工...
宇树回应更名“股份有限公司” ... 5月29日,有消息称,宇树科技向合作伙伴发布通知称,因公司发展需要,杭州宇树科技有限公司即日起名称变...
美股七巨头交出超预期一季报:提... 围绕关税和AI(人工智能)两大话题,美股科技“七巨头”发布了基本超出预期的一季报。近日,美股科技“七...
LuaJIT 常量数组(con... 从LuaJIT Bytecode介绍中可知道,Bytecode关于常量操作的指令均为D...
js 数组中的对象去掉重复的对...  id相同的,保留第一个,其它的删除this.defaultFileLi...
许昌,几线城市?请看2025年... 第一财经·新一线城市研究所 5月28日发布 《2025新一线城市魅力排行榜》 在中国内地337座地级...
拼多多股价波动:牺牲短期业绩,... 5月27日晚,拼多多最新一季度财报发布,营收约957亿人民币,同比增长10%,已经是最近几个季度的最...
额度放开了,再选一遍美债基金~ 转自:懒猫的丰收日 最近,好多美债基金放开了额度, 也有不少小伙伴问美债基金,那就再筛一遍,大概是这...
广州农商银行:唤醒农村沉睡资源... 随着乡村振兴战略的深入实施,农村“三块地”——耕地、宅基地和集体经营性建设用地的改革与盘活,正在成为...
中国艾欧智能团队远程操控机器人... 在2025年度ICRA双臂机器人能力边界挑战赛上,深圳艾欧智能公司凭借其卓越表现,从全球88支参赛队...
同仁堂资本新局 拆分上市在下什... 暌违十多年的资本运作。 《投资者网》蔡俊 暌违十多年,同仁堂(600085.SH,下称“公司”)...
集中上市!增量资金来了 增量资金持续驰援市场,新发ETF密集上市。5月以来,23只ETF上市,9只ETF即将上市,这些ETF...
3.类型、存储和变量 目录 3.1 C#程序是一组类型声明  3.2 类型是一种模板  3.3 实例化类型  3.4 数...
圆通速递今日大宗交易折价成交7... 5月29日,圆通速递大宗交易成交795万股,成交额1.01亿元,占当日总成交额的15.22%,成交价...
最新!宇树科技改名了 证券时报消息,5月29日,宇树科技向合作伙伴发布通知称,因公司发展需要,杭州宇树科技有限公司即日起名...
持续改进与回归 系列文章目录 第一章 敏捷核心知识点 第二章 敏捷宣言与原则 第三章 价值驱动交付-优先级排序&增量...
ST先锋:6月3日起撤销其他风... 5月29日晚间,宁波先锋新材料股份有限公司(ST先锋,300163.SZ)公告,公司股票将于5月30...
双指针 -876. 链表的中间... 开始一个专栏,写自己的博客 双指针,也算是作为自己的笔记吧!...
100%原价买飞天茅台!茅台机... 红星资本局5月29日消息,遵义茅台机场官微发布消息,5月30日-6月30日期间,茅台机场开启2个乘机...
restTemplate未设置... 背景 昨天发版遇到个线上问题,由于运维操作放量时隔离机器过多,导致只有大...
第一批痴迷中古的年轻人,已经袪... 第一批痴迷中古的年轻人,曾如朝圣般追寻那些古老的物件与文化。他们沉浸在中古的世界里,仿佛找到了心灵的...
公司的经销商去年为何减少400... 新京报贝壳财经讯(记者阎侠)5月29日,记者自酒鬼酒发布的投资者关系活动记录表获悉,公司正面临行业环...
试题 历届真题 循环小数【第十... 试题 历届真题 循环小数【第十一届】【决赛】【Python】 题目来源:第十一届蓝桥杯...
读懂IPO|中润光能还未上市业... 本文来源:时代商学院 作者:彭元重 图源:图虫创意来源|时代商业研究院作者|彭元重编辑|郑琳A股折戟...
金科股份管理层:管理人已经启动... 5月29日,金科地产集团股份有限公司(*ST金科,000656.SZ)召开2024年度业绩说明会。金...
vivo 短视频用户访问体验优... 作者:vivo 互联网运维团队- Hu Tao 本文介绍了vivo短视频用户访问体验...