凑合着用吧,可形成多级生产消费者模式,如果子进程结果不需要收集向后传递可以直接使用进程池(本人对进程池不熟,如果有进程池构成生产消费者模式的样例求推荐,感激不尽)
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Time : 2018/2/27 11:40# @Author : Andes# @Site :import osimport tracebackfrom multiprocessing import Process, Queue, Value, Lockimport time# 收尾进程,需要收集上一段所有子进程结果时使用,特点是退出循环后无需更新进程结束标志def op_end(): ################## ##本进程初始化区域## ################## while True: try: [oe1, oen] = opn_queue.get(timeout=0.1) a = oe1 b = oen except Exception as e: if opn_finishmark.value == 1: if opn_queue.empty(): break else: continue else: continue ################# ##本进程收尾区域## ################## 如需收集所有子进程结果,则后接收尾进程,否则自身写成收尾进程形式(即退出循环后无需更新进程结束标志)def opn(): ################## ##本进程初始化区域## ################## while True: try: [on1, onn] = op2_queue.get(timeout=0.1) a = on1 b = onn opn_queue.put([a, b]) except Exception as e: if op2_finishmark.value == 1: if op2_queue.empty(): break else: continue else: continue ################# ##本进程收尾区域## ################# lockn.acquire() opn_countmark.value += 1 lockn.release() if opn_countmark.value == num_opn: opn_finishmark.value = 1def op2(): ################## ##本进程初始化区域## ################## while True: try: [o21, o2n] = op1_queue.get(timeout=0.1) a = o21 b = o2n op2_queue.put([a, b]) except Exception as e: if op1_finishmark.value == 1: if op1_queue.empty(): break else: continue else: continue ################# ##本进程收尾区域## ################# lock2.acquire() op2_countmark.value += 1 lock2.release() if op2_countmark.value == num_op2: op2_finishmark.value = 1def op1(): ################## ##本进程初始化区域## ################## while True: try: [o11, o1n] = input_queue.get(timeout=0.1) # 从某队列取数据 a = o11 b = o1n op1_queue.put([a, b]) # 将数据放入某队列 except Exception as e: # 保证本进程正常退出 if input_finishmark.value == 1: if input_queue.empty(): break else: continue else: continue ################# ##本进程收尾区域## ################# # 下面操作为更新本批子进程完成数,保证op2批次的子进程正常退出,如无后续,则无需下面操作(即收尾进程) lock1.acquire() op1_countmark.value += 1 lock1.release() if op1_countmark.value == num_op1: op1_finishmark.value = 1def set_input(is1, isn): t = 0 for r, d, f in os.walk(so_rt): if len(f) != 0: for fi in f: if fi.endswith('.jpg') or fi.endswith('.jpeg') or fi.endswith('.png') or fi.endswith('.bmp') or \ fi.endswith('.JPG') or fi.endswith('.JPEG') or fi.endswith('.PNG') or fi.endswith( '.BMP'): t += 1 img_path = os.path.join(r, fi) input_queue.put([img_path, t]) input_finishmark.value += 1 print('insert done {}'.format(t))so_rt = '/data_b/data_alpen/imgs' # 图片根目录input_queue = Queue(20) # 输入队列,存储图片路径,括号内为队列容纳的最大元素个数,太大容易爆内存,不填为无上限input_countmark = Value('i', 0) # 输入完成数input_finishmark = Value('i', 0) # 输入结束标志op1_queue = Queue(30) # 中间进程1结果队列op1_countmark = Value('i', 0) # 中间进程1完成数op1_finishmark = Value('i', 0) # 中间进程1结束标志op2_queue = Queue(30) # 中间进程2结果队列op2_countmark = Value('i', 0) # 中间进程2完成数op2_finishmark = Value('i', 0) # 中间进程2结束标志opn_queue = Queue(30) # 中间进程n结果队列opn_countmark = Value('i', 0) # 中间进程n完成数opn_finishmark = Value('i', 0) # 中间进程n结束标志locki = Lock() # 输入进程锁lock1 = Lock() # 中间进程1锁lock2 = Lock() # 中间进程2锁lockn = Lock() # 中间进程n锁num_i = 1 # 输入进程数num_op1 = 5 # 中间进程1进程数num_op2 = 5 # 中间进程2进程数num_opn = 5 # 中间进程n进程数num_ope = 1 # 收尾进程数#&&&&&产生子进程格式为:# Process(target=func, args=(param1, param2,...,paramn,)) # func为子进程操作的函数,后面不带括号,传入参数的末尾建议加逗号# 若不传入参数则为:Process(target=func)#&&&&&中间进程理论上可以无限拼接def op_main(): st = time.time() # 输入子进程 is1 = 0 # 输入进程参数1 isn = 0 # 输入进程参数n pi = Process(target=set_input, args=(is1, isn,)) pi.start() # 中间进程1 for p1 in xrange(0, num_op1): pp1 = Process(target=op1) pp1.start() # 中间进程2 for p2 in xrange(0, num_op2): pp2 = Process(target=op2) pp2.start() # 中间进程n for pn in xrange(0, num_opn): ppn = Process(target=opn) ppn.start() # 若需统计总时间,则收尾进程需使用.join()保证主进程在所有收尾进程结束后再继续往下执行, # 若无需统计时间,则跟中间进程一样即可 pl = [] # 收尾进程 for pe in xrange(0, num_ope): ppe = Process(target=op_end) ppe.start() pl.append(ppe) for pli in pl: pli.join() et = time.time() print('all costtime:{}'.format(et-st))def main(): op_main()if __name__ == '__main__': main()