Python 多线程
创始人
2025-06-01 07:18:00

文章目录

  • 一、简介
    • 1.1 多线程的特性
    • 1.2 GIL
  • 二、线程
    • 1.2 单线程
    • 1.3 多线程
  • 三、线程池
    • 3.1 pool.submit
    • 3.2 pool.map
  • 四、Lock(线程锁)
    • 4.1 无锁导致的线程资源异常
    • 4.2 有锁
  • 五、Event(事件)
    • 5.1 简介
    • 5.2 示例
  • 六、Queue(队列)
    • 6.1 简介
    • 6.2 生产者 & 消费者
  • 七、Condition(条件锁)
    • 7.1 简介
    • 7.2 notify 单任务通信
    • 7.3 notify_all 多任务通信
  • 八、Semaphore(信号量)
    • 8.1 简介
    • 8.2 示例

一、简介

说起python线程,说少也少,比如线程怎么启动,获取结果,阻塞等;还有线程池的两种运行方式以及使用的一些案例。说多的话,又会涉及到Lock,Rlock,Queue,Condition,Event等很多东西。

这边博客先留着慢慢写,主要介绍线程和线程池的使用,后面吧所有线程内部的东西都深入说一下,为什么要用到这些。

1.1 多线程的特性

Python多线程是Python的一个重要特性,它允许程序同时执行多个线程。Python中的线程是轻量级的,它们共享内存空间,因此创建和销毁线程的开销很小。

多线程切换的效率是以时间为指标的,因为线程切换需要保存当前线程的状态并加载下一个线程的状态,这个过程需要花费一定的时间。在多线程切换的过程中,如果线程的数量过多,那么线程切换的时间就会占用大量的CPU时间,从而导致程序的执行效率降低。因此,在编写多线程程序时,需要合理的控制线程的数量,避免线程切换的时间过长。

1.2 GIL

Python GIL (Global Interpreter Lock)是Python解释器的一个特性,它是一种互斥锁,用于保护Python解释器的内部数据结构。在任何时刻,只有一个线程可以执行Python字节码。这意味着,即使在多核CPU上运行Python程序,也只能使用一个核。

这个特性对于CPU密集型任务来说是一个瓶颈,因为它不能充分利用多核CPU的优势。但是对于I/O密集型任务来说,Python GIL并不是一个问题,因为在I/O操作期间,Python解释器会释放GIL,以便其他线程可以执行Python字节码。如果你想充分利用多核CPU,可以使用多进程或者使用其他语言编写CPU密集型任务的代码。

二、线程

1.2 单线程

import threading
import requestsdef task(url):resp = requests.get(url=url).textprint(len(resp))thread = threading.Thread(target=task, args=('https://www.baidu.com', ))
thread.start()    # 启动这个线程
thread.join()     # 阻塞线程print('end')

1.3 多线程

这里有个点记一下,join是会阻塞任务的,只有当线程全部都跑完,才会向下执行;如果你需要在执行线程的时候响应外部请求,那么只start即可。

import threading
import requestsdef task(url):resp = requests.get(url=url).textprint(f'url length: ', len(resp))urls = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{num}.png' for num in range(1, 6)]
threads = []
for url in urls:thread = threading.Thread(target=task, args=(url, ))thread.start()threads.append(thread)for thread in threads:thread.join()

三、线程池

线程池主要是解决多任务并发的实效性问题,简单的说就是让任务跑的很快,系统资源利用率更高。线程池对于运行多个线程的优点主要就是线程的启动和关闭有资源的开销,而线程池则可以复用线程。你就当作TCP中客户端TIME-WAIT状态的连接重新用于新的TCP连接,一个道理。本质都是为了减少资源开销。

那为啥要用线程池,而不用进程池,区别就是

  1. 线程池中的线程启动开销更低,切换也更快
  2. 线程池主要是为了解决IO的问题(文件IO,网络IO等)
  3. 进程池主要是为了解决CPU密集的问题(数据运算,数据处理等)

python中线程池的库是concurrent.futures,线程池有两种使用方式,submit和map

3.1 pool.submit

下面是我业务场景中要通过线程池启动两个线程,并且要将返回的值拿过来对用户进行响应

一个是获取阿里云SLB的QPS
一个是获取阿里云SLB的RT

这个案例很好说明为什么我要用线程池中的 pool.submit 方法而不是 pool.map,结论是都能实现,submit更方便

  1. 我的任务很少,不去太考虑线程的重用和开销问题
  2. 明确知道需要两个指标,一个是QPS,一个是RT
  3. pool.submit的结果可以通过result()直接拿到
  4. pool.map 返回的是一个generator对象,用起来会更麻烦
# submit获取结果
qps = resp_qps.result()
rt = resp_rt.result()# map获取结果
results = pool.map(func, data)
for result in results:print(result)
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor()def get_slb_metric(ali_obj, metric_name, delay, dimensions):""" 获取阿里云指标数据 """period = 60namespace = 'acs_slb_dashboard'timestamp = int(time.time())start_time = timestamp_to_str(timestamp=timestamp - delay - period)end_time = timestamp_to_str(timestamp=timestamp - delay)response = ali_obj.describe_metric_data_request(namespace=namespace,metric_name=metric_name,start_time=start_time,end_time=end_time,period=period,dimensions=dimensions,)return json.loads(response['Datapoints'])def get_slb_metric_data(ali_obj, delay, dimensions):""" 获取阿里云指标数据公共方法 """resp_metric = {'success': False,'msg': None,'retry': 0,'data': {'qps': None,'rt': None,}}while resp_metric['retry'] < 5:resp_metric['retry'] += 1resp_qps = pool.submit(get_slb_metric, ali_obj, 'Qps', delay, dimensions)resp_rt = pool.submit(get_slb_metric, ali_obj, 'Rt', delay, dimensions)qps = resp_qps.result()rt = resp_rt.result()if qps and rt:resp_metric['success'] = Trueresp_metric['data']['qps'] = qps[0]['Average']resp_metric['data']['rt'] = rt[0]['Average']return resp_metricdelay += 10resp_metric['msg'] = f'请求异常: \n\n qps或rt数据为空,请延长周期,最后一次delay为: {delay}s, 递增10s请求5次,未成功获取到数据'return resp_metric

3.2 pool.map

那什么时候用pool.map,以我的习惯是当这个任务是固定的,在批量处理数据的情况下,pool.map特别方便。

  1. 需要处理的任务非常多,需要考虑线程开销且复用
  2. 对于任务的执行结果不需要按类型来区分,批量获取

那具体用在什么场景下呢?下面是我一个业务场景,ECI配置中心,里面记录了一条条项目的配置信息,且ECI项目的POD数量是动态的,里面记录了获取POD的URL,需要实时请求。eci_list的方法需要读取配置中心,

这个时候我的任务数是不固定的,有多少个业务需要ECI,那就有多少个URL需要去请求,我总不能一个一个手动去创建线程,并且,所有业务URL的接口给我返回的数据格式是固定的,我也只会取固定的值,那么pool.map就非常方便了。批量请求接口,批量处理结果。

def get_pod_info(data):# 获取项目信息resp_data = {'success': False,'msg': None,'data': data}try:url = data.get('url')headers = {'contene-type': 'application/json'}project = data.get('project')profile = data.get('profile')sign = data.get('sign')req_data = {'sign': sign,'project': project,'profile': profile}resp = requests.post(url=url, headers=headers, data=json.dumps(req_data))if resp.status_code == 200:resp_data['data']['pod'] = 100resp_data['data']['open'] = Trueresp_data['success'] = Trueelse:resp_data['msg'] = f'ECI业务接口异常: {resp.text}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}'except Exception as err:resp_data['msg'] = f'ECI运维接口异常: {err}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}'finally:return resp_datadef eci_list():if request.method == 'GET':resp_data = {'success': False,'msg': None,'data': None}try:# 获取项目列表queryset = OpsEci.query.all()data = list()for obj in queryset:_obj_host = CloudHost.query.filter_by(resource_type='kubernetes', instance_id=obj.kubernetes).first()_kube_host = re.search('\d+\.\d+\.\d+\.\d+', _obj_host.private_ip[0]).group()_data = {'project': obj.project,'profile': obj.profile,'kubeconfig': f"/home/tomcat/.kube/kubeconfig/{_kube_host}",'token': obj.token,'address': obj.address,'url': obj.url,'sign': obj.sign}data.append(_data)# 处理项目列表with ThreadPoolExecutor() as pool:results = pool.map(get_pod_info, data)results_list = list()for result in results:project = result['data']['project']profile = result['data']['profile']app = project if profile == 'jst' else f'{project}-{profile}'if result.get('success'):results_list.append({'status': True,'open': result['data']['open'],'pod': result['data']['pod'],'ding_token': result['data']['token'],'app': app,'kubeconfig_path': result['data']['kubeconfig']})else:results_list.append({'status': False,'app': app,})resp_data['success'] = Trueresp_data['data'] = results_listexcept Exception as err:resp_data['msg'] = f'获取ECI项目信息异常: {err}'finally:return resp_data

四、Lock(线程锁)

4.1 无锁导致的线程资源异常

当线程没有锁以后,不同的线程使用共享资源会出现不可预估的后果

我们期望的情况

王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余:  200
取钱失败: 余额:  200

可能会出现的情况

王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余:  200
取钱成功, 剩余:  -600

无锁代码,这里为了能稳定复现,特别加了sleep

from concurrent.futures import ThreadPoolExecutor
import threading
import timepool = ThreadPoolExecutor()def bank(amount):global balanceprint(f'王二狗第{threading.current_thread().name}次取钱')if balance > amount:time.sleep(0.1)balance = balance - amountprint('取钱成功, 剩余: ', balance)else:print('取钱失败: 余额: ', balance)if __name__ == '__main__':balance = 1000t1 = threading.Thread(target=bank, args=(800,), name='1')t2 = threading.Thread(target=bank, args=(800,), name='2')t1.start()t2.start()

4.2 有锁

如果在使用线程共享资源的时候,给资源加上锁,那么我们每次运行的结果都是一致的

王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余:  200
取钱失败: 余额:  200

有锁代码,在操作线程共享资源的时候,给资源上锁

import threading
import timepool = ThreadPoolExecutor()
lock = threading.Lock()def bank(amount):global balanceprint(f'王二狗第{threading.current_thread().name}次取钱')with lock:if balance > amount:time.sleep(0.1)balance = balance - amountprint('取钱成功, 剩余: ', balance)else:print('取钱失败: 余额: ', balance)if __name__ == '__main__':balance = 1000t1 = threading.Thread(target=bank, args=(800,), name='1')t2 = threading.Thread(target=bank, args=(800,), name='2')t1.start()t2.start()

五、Event(事件)

5.1 简介

Event是python中的一个同步原语,用于线程之间的通信。event有两种状态,分别是set和clear。当event处于set状态时,调用wait方法的线程会立即返回,否则会一直阻塞,直到event被set。

5.2 示例

下面例子中,创建了一个event事件,两个人worker。worker_b调用了event.wait()方法,这会使worker线程阻塞,直到event被set。当worker_a开始运行后,将event置为set后,worker_b结束阻塞,开始运行。

import threading
import timeevent = threading.Event()def worker_a():print(f'{time.time()}: worker_a 等待运行')print(f'{time.time()}: worker_a 开始运行')event.set()def worker_b():print(f'{time.time()}: worker_b 等待运行')event.wait()print(f'{time.time()}: worker_b 开始运行')if __name__ == '__main__':t_a = threading.Thread(target=worker_a)t_b = threading.Thread(target=worker_b)t_b.start()time.sleep(1)t_a.start()

执行结果

1679301511.736171: worker_b 等待运行
1679301512.741683: worker_a 等待运行
1679301512.741767: worker_a 开始运行
1679301512.741956: worker_b 开始运行

六、Queue(队列)

6.1 简介

Python中的Queue模块提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后进先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。

6.2 生产者 & 消费者

下面是一个使用Queue模块实现多线程的示例,其中包括了生产者和消费者两个线程,生产者向队列中添加元素,消费者从队列中取出元素,而他们都是用队列queue.Queue();除此之外,还有queue.LifoQueue,queue.PriorityQueue。

import threading
import queue
import timeclass Producer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue = queuedef run(self):for i in range(100):self.queue.put(i)time.sleep(0.3)print('producer put end')class Consumer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue = queuedef run(self):while True:if self.queue.empty():time.sleep(1)print('queue is empty, waiting ...')else:print(f'consumer get {self.queue.get()}')time.sleep(0.1)if __name__ == '__main__':q = queue.Queue()producer = Producer(q)consumer = Consumer(q)producer.start()consumer.start()

七、Condition(条件锁)

7.1 简介

在 Python 中,可以使用 threading.Condition 实现条件锁。Condition 对象提供了 acquire() 和 release() 方法,与 Lock 对象的方法类似。此外,Condition 对象还提供了 wait()、notify() 和 notify_all() 方法,用于线程间的协调。具体使用方法可以参考 Python 官方文档中的 threading.Condition 部分。

在 threading.Condition 中,wait() 方法会释放锁并挂起当前线程,直到另一个线程调用 notify() 或 notify_all() 方法唤醒它。notify() 方法会随机唤醒一个挂起的线程,而 notify_all() 方法会唤醒所有挂起的线程。需要注意的是,wait() 方法只能在已经获得锁的情况下调用,否则会抛出 RuntimeError 异常。

在使用 threading.Condition 时,通常需要先获得一个 Lock 对象,然后使用这个 Lock 对象创建一个 Condition 对象。在需要等待某个条件时,调用 Condition 对象的 wait() 方法;在满足条件时,调用 notify() 或 notify_all() 方法唤醒等待的线程

7.2 notify 单任务通信

import threading
import timecondition = threading.Condition()class Master(threading.Thread):"""主任务类,执行过后等待子任务响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '-----任务开始-----')print(self.name, '事件A处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件B处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件C处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件D处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '-----任务结束-----')class Worker(threading.Thread):"""子任务类,等待主任务通知并响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:self.cond.wait()print(self.name, '事件A已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件B已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件C已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件D已响应, 请继续')self.cond.notify()if __name__ == '__main__':master = Master('master', condition)worker = Worker('worker', condition)worker.start()time.sleep(1)master.start()

执行结果

master -----任务开始-----
master 事件A处理完毕, 等待worker响应...
worker 事件A已响应, 请继续
master 事件B处理完毕, 等待worker响应...
worker 事件B已响应, 请继续
master 事件C处理完毕, 等待worker响应...
worker 事件C已响应, 请继续
master 事件D处理完毕, 等待worker响应...
worker 事件D已响应, 请继续
master -----任务结束-----

7.3 notify_all 多任务通信

import threading
import timecondition = threading.Condition()class Master(threading.Thread):"""主任务类,执行过后等待子任务响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '前置准备工作结束, 通知子任务开始任务...')time.sleep(1)self.cond.notify_all()class Worker(threading.Thread):"""子任务类,等待主任务通知并响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '准备就绪, 等待调度...')self.cond.wait()print(self.name, '接收到主任务通知, 开始执行任务')print(self.name, '任务A执行完成')print(self.name, '任务B执行完成')print(self.name, '任务C执行完成')if __name__ == '__main__':master = Master('master', condition)worker_a = Worker('worker-a', condition)worker_b = Worker('worker-b', condition)worker_c = Worker('worker-c', condition)worker_a.start()worker_b.start()worker_c.start()time.sleep(0.3)master.start()
worker-a 准备就绪, 等待调度...
worker-b 准备就绪, 等待调度...
worker-c 准备就绪, 等待调度...
master 前置准备工作结束, 通知子任务开始任务...
worker-a 接收到主任务通知, 开始执行任务
worker-a 任务A执行完成
worker-a 任务B执行完成
worker-a 任务C执行完成
worker-b 接收到主任务通知, 开始执行任务
worker-b 任务A执行完成
worker-b 任务B执行完成
worker-b 任务C执行完成
worker-c 接收到主任务通知, 开始执行任务
worker-c 任务A执行完成
worker-c 任务B执行完成
worker-c 任务C执行完成

八、Semaphore(信号量)

8.1 简介

Semaphore用于控制对共享资源的访问。semaphore维护一个内部计数器。该计数器可以通过 acquire() 和 release() 两个方法来增加和减少。当计数器为0时,acquire() 方法将会被阻塞,直到其他线程调用 release() 方法位置。

semaphore.acquire() 将会使计数器-1,当计数器为0则会阻塞当前线程
semaphore.release() 将会时计数器+1,以便有更多的资源去使用计数器

8.2 示例

下面模拟10个任务运行的情况,同时运行三个线程,通过Semaphore进行控制线程数。可以发现,通过semaphore即可控制线程的worker

import threading
import timesemaphore = threading.Semaphore(3)def task():"""任务机"""with semaphore:print(time.strftime('%H:%M:%S'), threading.current_thread().name, '开始执行...')time.sleep(2)if __name__ == '__main__':for i in range(10):t = threading.Thread(target=task)t.start()

执行结果

18:12:05 Thread-1 开始执行...
18:12:05 Thread-2 开始执行...
18:12:05 Thread-3 开始执行...
18:12:07 Thread-4 开始执行...
18:12:07 Thread-5 开始执行...
18:12:07 Thread-6 开始执行...
18:12:09 Thread-7 开始执行...
18:12:09 Thread-9 开始执行...
18:12:09 Thread-8 开始执行...
18:12:11 Thread-10 开始执行...

相关内容

热门资讯

关键时刻,日美关系出现重大转折... 作者 | 无疆编辑 | 剑书日 美关系出现一个重大转折,影响可能不亚于贸易协议。就在最近,曾被拜登坚...
深圳半山巨宅底价成交,3749... “老李,听说了吗?大鹏那栋别墅被人底价捡走了!” “真的假的?那个叶老板花大钱建的城堡,现在才卖三千...
郑商所就丙烯期货和期权合约及期... 5月30日,郑州商品交易所就丙烯期货和期权合约及期货业务细则公开征求意见。这意味着我国产量最大的烯烃...
上涨!金饰价格重回1000元大... 随着国际黄金价格上涨,国内金饰价格重新升至每克千元以上。 6月2日,老凤祥金饰品为1000元/克,这...
晶振行业落后产能出清,利好头部... 晶振行业出清信号明显,*ST东晶筹划控制权变更,惠伦晶体也曾两次筹划控制权变更。我们关注到*ST东晶...
港股或陷入“五穷六绝”,投资者... 6月2日,海外地缘政治各种不确定因素引发全球市场大跌,美国总统特朗普关税政策不确定,俄乌冲突变数加大...
中信证券前员工趋同交易超千万,... 2025.06.02本文字数:2005,阅读时长大约4分钟作者 |第一财经 周楠地方证监局最新披露的...
一财社论:无序“价格战”没有赢... 在我国汽车工业从高速发展迈入高质量发展的关键阶段,根治以无序“价格战”为代表的“内卷式”竞争迫在眉睫...
原创 A... 第一:A股假期最后一天,全球市场集体杀跌 今天早上,全球市场全线杀跌。日经指数一度杀跌近1.5%,中...
【股评】6月将会是A股市场在新... 这个端午节,与A股市场有关的系列重要信息,将影响6月开盘后相关板块和股票的走势,现整理如下,供大家参...
百亿钛白粉龙头跨界失利,拟终止... 钛白粉龙头跨界失利,拟终止年产能50万吨磷酸铁项目投资。6月2日,中核华原钛白股份有限公司(中核钛白...
海南发展:6个月内控股股东将增... 6月2日,海南发展(002163.SZ)发布公告,公司控股股东海南省发展控股有限公司(以下简称“海南...
泸州老窖高层变动,履职十年董事... 微成都报道日前,泸州老窖(000568.SZ)发布公告称,公司董事会近日收到董事钱旭递交的书面辞职报...
借壳上市需要什么条件?快速融资... 资本市场从来都是一个充满博弈的舞台,而借壳上市则是其中最富争议的玩法之一。有人将其视为企业快速登陆资...
重塑胶膜行业格局?这对“胡润百... 文 | 华夏能源网 近日,咨询机构Infolink公布了2024年全球光伏胶膜出货排行榜。百佳年代...
港股6月首日“V型”反弹,券商... 每经记者:王海慜 每经编辑:彭水萍 5月全球股市整体飘红,A股市场收复关税冲击造成的缺口。6 月2日...
“遛娃经济”撬动端午消费新增长... 在深圳锦绣中华,小朋友们包粽子、制香囊。南方日报记者 鲁力 通讯员 陈爱 摄 “别整什么旅游KPI了...
利好突袭,集体大涨!发生了什么... 这个板块今天集体大涨,有何利好?6月2日,在港股市场上,数字货币概念股集体拉升,连连数字盘中一度上涨...
“妖王”惠城环保3年15倍,扣... 在5月30日股价创下历史新高后,惠城环保2022年11月以来的累计涨幅超过15倍,系同时间段A股市场...
不慌!港股深 v,a50 期指... 今天早盘港股大跳水,搞得很多股民对节后行情比较担忧。但港股盘中触底反弹,收盘已收回盘中跌幅,a50期...
刚刚,杀出一个医药大白马,挑战... 重重挑战激发无限动力!集采、竞争,仿佛是医药企业头顶的两座大山。连恒瑞医药、华东医药都不免因此出现短...
香港四大家族之一,有麻烦了……... 原创 刘博团队又一家知名大房企,惊爆财务困境。今天,香港四大家族之一——郑裕彤家族控股的、在香港上市...
002875,明日停牌!节前已... 2025.06.02本文字数:279,阅读时长大约1分钟A股“童装第一股”安奈儿正筹划公司控制权变更...
争议升级!两家上市公司深夜互相... 5月中旬以来,玻尿酸、重组胶原蛋白之争引发热议,频频登上网络热搜,且有愈演愈烈之势。6月1日晚间,医...
两宗券商“老鼠仓”曝光:中信证... 地方证监局最新披露的罚单,让两起券商从业人员“老鼠仓”案件浮出水面,2家龙头券商中信证券、华泰证券牵...
土耳其主要银行业指数上涨1.5... 每经AI快讯,6月2日,土耳其主要银行业指数上涨1.5%。
【12366问答】离境退税相关... 12366 热点梳理 问答来啦 退税商店在向境外旅客开具《离境退税申请单》后,如发生境外旅客退货等...
高凌信息:终止发行股份购买资产... 新京报贝壳财经讯 高凌信息6月2日发布公告,公司于2024年12月启动筹划实施重大资产重组,拟通过发...
赛诺菲斥资超90亿美元收购罕见... 当地时间6月2日,法国制药巨头赛诺菲与美国生物药企Blueprint Medicines公司宣布达成...
黔源电力:董事长罗涛因工作原因... 新京报贝壳财经讯 黔源电力6月2日晚间公告,公司董事长罗涛因工作原因申请辞去公司董事长、董事、董事会...