博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python多进程模板
阅读量:6305 次
发布时间:2019-06-22

本文共 4266 字,大约阅读时间需要 14 分钟。

凑合着用吧,可形成多级生产消费者模式,如果子进程结果不需要收集向后传递可以直接使用进程池(本人对进程池不熟,如果有进程池构成生产消费者模式的样例求推荐,感激不尽)

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Time    : 2018/2/27 11:40# @Author  : Andes# @Site    :import osimport tracebackfrom multiprocessing import Process, Queue, Value, Lockimport time# 收尾进程,需要收集上一段所有子进程结果时使用,特点是退出循环后无需更新进程结束标志def op_end():	##################	##本进程初始化区域##	##################	while True:		try:			[oe1, oen] = opn_queue.get(timeout=0.1)			a = oe1			b = oen		except Exception as e:			if opn_finishmark.value == 1:				if opn_queue.empty():					break				else:					continue			else:				continue	#################	##本进程收尾区域##	################## 如需收集所有子进程结果,则后接收尾进程,否则自身写成收尾进程形式(即退出循环后无需更新进程结束标志)def opn():	##################	##本进程初始化区域##	##################	while True:		try:			[on1, onn] = op2_queue.get(timeout=0.1)			a = on1			b = onn			opn_queue.put([a, b])		except Exception as e:			if op2_finishmark.value == 1:				if op2_queue.empty():					break				else:					continue			else:				continue	#################	##本进程收尾区域##	#################	lockn.acquire()	opn_countmark.value += 1	lockn.release()	if opn_countmark.value == num_opn:		opn_finishmark.value = 1def op2():	##################	##本进程初始化区域##	##################	while True:		try:			[o21, o2n] = op1_queue.get(timeout=0.1)			a = o21			b = o2n			op2_queue.put([a, b])		except Exception as e:			if op1_finishmark.value == 1:				if op1_queue.empty():					break				else:					continue			else:				continue	#################	##本进程收尾区域##	#################	lock2.acquire()	op2_countmark.value += 1	lock2.release()	if op2_countmark.value == num_op2:		op2_finishmark.value = 1def op1():	##################	##本进程初始化区域##	##################	while True:		try:			[o11, o1n] = input_queue.get(timeout=0.1)  # 从某队列取数据			a = o11			b = o1n			op1_queue.put([a, b])  # 将数据放入某队列		except Exception as e:			# 保证本进程正常退出			if input_finishmark.value == 1:				if input_queue.empty():					break				else:					continue			else:				continue	#################	##本进程收尾区域##	#################	# 下面操作为更新本批子进程完成数,保证op2批次的子进程正常退出,如无后续,则无需下面操作(即收尾进程)	lock1.acquire()	op1_countmark.value += 1	lock1.release()	if op1_countmark.value == num_op1:		op1_finishmark.value = 1def set_input(is1, isn):	t = 0	for r, d, f in os.walk(so_rt):		if len(f) != 0:			for fi in f:				if fi.endswith('.jpg') or fi.endswith('.jpeg') or fi.endswith('.png') or fi.endswith('.bmp') or \						fi.endswith('.JPG') or fi.endswith('.JPEG') or fi.endswith('.PNG') or fi.endswith(					'.BMP'):					t += 1					img_path = os.path.join(r, fi)					input_queue.put([img_path, t])	input_finishmark.value += 1	print('insert done {}'.format(t))so_rt = '/data_b/data_alpen/imgs'  # 图片根目录input_queue = Queue(20)  # 输入队列,存储图片路径,括号内为队列容纳的最大元素个数,太大容易爆内存,不填为无上限input_countmark = Value('i', 0)  # 输入完成数input_finishmark = Value('i', 0)  # 输入结束标志op1_queue = Queue(30)  # 中间进程1结果队列op1_countmark = Value('i', 0)  # 中间进程1完成数op1_finishmark = Value('i', 0)  # 中间进程1结束标志op2_queue = Queue(30)  # 中间进程2结果队列op2_countmark = Value('i', 0)  # 中间进程2完成数op2_finishmark = Value('i', 0)  # 中间进程2结束标志opn_queue = Queue(30)  # 中间进程n结果队列opn_countmark = Value('i', 0)  # 中间进程n完成数opn_finishmark = Value('i', 0)  # 中间进程n结束标志locki = Lock()  # 输入进程锁lock1 = Lock()  # 中间进程1锁lock2 = Lock()  # 中间进程2锁lockn = Lock()  # 中间进程n锁num_i = 1  # 输入进程数num_op1 = 5  # 中间进程1进程数num_op2 = 5  # 中间进程2进程数num_opn = 5  # 中间进程n进程数num_ope = 1  # 收尾进程数#&&&&&产生子进程格式为:# Process(target=func, args=(param1, param2,...,paramn,))  # func为子进程操作的函数,后面不带括号,传入参数的末尾建议加逗号# 若不传入参数则为:Process(target=func)#&&&&&中间进程理论上可以无限拼接def op_main():	st = time.time()	# 输入子进程	is1 = 0  # 输入进程参数1	isn = 0  # 输入进程参数n	pi = Process(target=set_input, args=(is1, isn,))	pi.start()	# 中间进程1	for p1 in xrange(0, num_op1):		pp1 = Process(target=op1)		pp1.start()	# 中间进程2	for p2 in xrange(0, num_op2):		pp2 = Process(target=op2)		pp2.start()	# 中间进程n	for pn in xrange(0, num_opn):		ppn = Process(target=opn)		ppn.start()	# 若需统计总时间,则收尾进程需使用.join()保证主进程在所有收尾进程结束后再继续往下执行,	# 若无需统计时间,则跟中间进程一样即可	pl = []	# 收尾进程	for pe in xrange(0, num_ope):		ppe = Process(target=op_end)		ppe.start()		pl.append(ppe)	for pli in pl:		pli.join()	et = time.time()	print('all costtime:{}'.format(et-st))def main():	op_main()if __name__ == '__main__':	main()

  

 

转载于:https://www.cnblogs.com/lxfnote/p/9994761.html

你可能感兴趣的文章
如何使用Gulp构建TypeScript
查看>>
黑科技是什么
查看>>
百度视觉团队斩获 ECCV Google AI 目标检测竞赛冠军,获奖方案全解读 | ECCV 2018
查看>>
开源 | Service Mesh 数据平面 SOFAMosn 深层揭秘
查看>>
安装好ubuntu后的操作
查看>>
jenkins 批量添加任务job
查看>>
花几分钟用Socket.io写一个简单的你画我猜小应用
查看>>
使用 angularjs 遇到的问题
查看>>
谁将引领新一代视频编码标准:HEVC、AVS2和AV1性能对比报告
查看>>
“怎么做好云迁移”? 深蓝云海资深架构师给你答案
查看>>
什么叫云计算?云计算通俗解释
查看>>
eNSP华为模拟器使用——(10)eNSP模拟防火墙
查看>>
杂乱笔记
查看>>
MyEclipse打不开jsp文件 报错“Failed to create the part's controls"
查看>>
PHP代码审计笔记--弱类型存在的安全问题
查看>>
LC3大会,开源人的狂欢,反哺的力量驱动技术革新
查看>>
巴伦周刊:天高任“云”飞,阿里云估值或超800亿美元
查看>>
题解 P3717 【[AHOI2017初中组]cover】
查看>>
MVC 区域内默认控制器不能访问(Multiple types were found that match the controller named ‘Index')...
查看>>
别再瞎猜测了,吴恩达妻子说了他不会加入Drive.ai
查看>>