python中的多进程、多线程、协程
2018-12-17 17:32:16 0 举报
AI智能生成
多进程、多线程、协程
作者其他创作
大纲/内容
协程
概念及简介
协程从代码级实现并发,操作系统感知不到,粒度越细,开发难度越大
greenlet
模块导入:from greenlet import greenlet
创建对象:g1=greenlet(eat)
切换对象:g1.switch('egon')
缺点:该模块不能自动实现来回切换
示例代码
from greenlet import greenlet<br><br>def eat(name):<br> print('%s eat 1' %name)<br> g2.switch('egon')<br> print('%s eat 2' %name)<br> g2.switch()<br>def play(name):<br> print('%s play 1' %name)<br> g1.switch()<br> print('%s play 2' %name)<br><br>g1=greenlet(eat)<br>g2=greenlet(play)<br><br>g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
Gevent
gevent模块:import gevent
创建对象并执行:g1=gevent.spawn(eat)
控制
对象.join方式:g1.join()
模块.joinall方式:gevent.joinall([g1,g2])
gevent阻塞:gevent.sleep(1)
其他阻塞识别
from gevent import monkey;monkey.patch_all()
示例代码1
import gevent<br>def eat(name):<br> print('%s eat 1' %name)<br> gevent.sleep(2)<br> print('%s eat 2' %name)<br><br>def play(name):<br> print('%s play 1' %name)<br> gevent.sleep(1)<br> print('%s play 2' %name)<br><br><br>g1=gevent.spawn(eat,'egon')<br>g2=gevent.spawn(play,name='egon')<br>g1.join()<br>g2.join()<br>#或者gevent.joinall([g1,g2])<br>print('主')
示例代码2
from gevent import spawn,joinall,monkey;monkey.patch_all()<br>import time<br>def task(pid):<br> """<br> Some non-deterministic task<br> """<br> time.sleep(0.5)<br> print('Task %s done' % pid)<br><br>def synchronous(): # 同步<br> for i in range(10):<br> task(i)<br>def asynchronous(): # 异步<br> g_l=[spawn(task,i) for i in range(10)]<br> joinall(g_l)<br> print('DONE')<br> <br>if __name__ == '__main__':<br> print('Synchronous:')<br> synchronous()<br> print('Asynchronous:')<br> asynchronous()<br># 上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。<br># 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,<br># 后者阻塞当前流程,并执行所有给定的greenlet任务。<br>执行流程只会在 所有greenlet执行完后才会继续向下走。
多进程
multiprocess模块
创建进程
Process(group=None, target=None, name=None, args=(), kwargs={})
target:目标函数,不加括号
args=():通过元组将子进程需要参数传入
kwargs={‘k’:'v'}关键字传参
进程开启、控制和结束
P.start()开启子进程,调用Process类中的run()方法
P.join()控制主进程等待子线程结束后再往后执行,timeout可设置等待的最长时间
P.is_alive()判断子进程P是否还在运行
P.terminate()强制终止,可能导致P的子进程成为僵尸进程
实例代码
import time<br>from multiprocessing import Process<br><br>def f(name):<br> print('hello', name)<br> print('我是子进程')<br><br>if __name__ == '__main__':<br> p = Process(target=f, args=('bob',))<br> p.start()<br> time.sleep(1)<br> print('执行主进程的内容了')
进程依赖
守护进程
P.daemon=True放在p.start()之前
信号量
Semaphore模块:from multiprocessing importSemaphore
创建信号量对象:sem=Semaphore(4)
信号的获取,sem.acquire()
信号的释放,sem.release()
事件
Event模块:from multiprocessing import Event
e = Event()
e.set()
e.clear()
e.is_set()
进程间通信
进程间是数据隔离的,通过队列、通道实现进程间的通信
pipe
queue
Queue模块:from multiprocessing import Queue
q=Queue([maxsize])创建允许最大顶数为maxsize的共享队列
q.put(content)将数据放入栈中,栈满时发生阻塞;q.put_nowait(content)栈满时不会阻塞,但会报错
q.get()从栈中取出数据,栈空时阻塞;q.get_nowait()栈空时不会阻塞,但会报错
q.empty(),q.full()判断队列是否为空或满,满足条件是返回True
数据隔离和数据安全
进程间是数据隔离的,但这种隔离主要是指内存级别的,当多进程同时操作文件(硬盘)中的数据时还是存在数据安全问题,为了保证数据安全就要用到锁
锁
Lock模块:from multiprocessing import Process,Lock
创建锁对象:lock=Lock()
锁的获取,lock.acquire()
锁的释放,lock.release()
死锁
递归锁
Rlock
资源占用的问题
进程池
Pool模块
Pool模块:from multiprocessing import Pool
创建进程池对象:p=Pool(n),n为允许同时执行的最大进程数
进程开启:p.apply(work,args=(i,)),work为调用的函数,args传递参数;非阻塞执行为p.apply_async(func [, args [, kwargs]])
p.join()控制主进程在子进程结束后继续进行
示例代码
import os,time<br>from multiprocessing import Pool<br><br>def work(n):<br> print('%s run' %os.getpid())<br> time.sleep(3)<br> return n**2<br><br>if __name__ == '__main__':<br> p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务<br> res_l=[]<br> for i in range(10):<br> res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞<br> # 但不管该任务是否存在阻塞,同步调用都会在原地等着<br> print(res_l)
ProcessPoolExecutor
ProcessPoolExecutor模块:from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
创建对象:executor=ProcessPoolExecutor(max_workers=3)
开启进程(异步):future=executor.submit(task,i)
关闭进程:executor.shutdown(True)
获取进程结果:future.result()
回调函数:add_done_callback(fn)
map简化:executor.map(task,range(1,12))
对等于:<br># for i in range(11):<br> # future=executor.submit(task,i)
多线程
threading模块
创建线程
Thread模块:from threading import Thread
Thread(group=None, target=None, name=None, args=(), kwargs={})
target:目标函数,不加括号
args=():通过元组将线程程需要参数传入
kwargs={‘k’:'v'}关键字传参
线程开启、控制和结束
t.start()开启子线程,调用Thread类中的run()方法
t.join()控制主线程等待子线程结束后再往后执行,timeout可设置等待的最长时间
t.is_alive()判断子进程t是否还在运行
t.enumerate()返回正在运行的子线程列表
示例代码
from threading import Thread<br>import time<br>def sayhi(name):<br> time.sleep(2)<br> print('%s say hello' %name)<br><br>if __name__ == '__main__':<br> t=Thread(target=sayhi,args=('egon',))<br> t.start()<br> print('主线程')
线程依赖
守护线程
t.setDaemon(True)放在t.start()之前
信号量
Semaphore模块:from threading import Thread,Semaphore
创建信号量对象:sem=Semaphore(4)
信号的获取,sem.acquire()
信号的释放,sem.release()
示例代码:
from threading import Thread,Semaphore<br>import threading<br>import time<br># def func():<br># if sm.acquire():<br># print (threading.currentThread().getName() + ' get semaphore')<br># time.sleep(2)<br># sm.release()<br>def func():<br> sm.acquire()<br> print('%s get sm' %threading.current_thread().getName())<br> time.sleep(3)<br> sm.release()<br>if __name__ == '__main__':<br> sm=Semaphore(5)<br> for i in range(23):<br> t=Thread(target=func)<br> t.start()
事件
Event模块:from threading import Thread, Event
e = Event()
e.set()
e.clear()
e.is_set()
条件condition
Condition模块:from threading import Thread,Condition
con=Condition()
con.acquire()<br> con.notify(int(inp))<br> con.release()
con.acquire()<br> con.wait()<br> con.release()
数据共享和数据安全
进程间是数据共享的,当多线程同时操作数据时存在数据安全问题,为了保证数据安全就要用到锁
锁
Lock模块:from threading import Thread,Lock
创建锁对象:lock=Lock()
锁的获取,lock.acquire()
锁的释放,lock.release()
死锁
多线程或多进程争抢共同的资源,原因是设置的多把互斥锁不合理
递归锁
Rlock
科学家吃面:fork_lock = noodle_lock = RLock()
queue
线程中的Queue和进程的Queue不同,线程中更强调数据安全(通道+锁),而不是数据通信,它的导入是从独立的queue模块
Queue模块:from queue import Queue
q=Queue([maxsize])创建允许最大顶数为maxsize的共享队列
q.put(content)将数据放入栈中
q.get()从栈中取出数据
q.empty(),q.full()判断队列是否为空或满,满足条件是返回True
线程池
ThreadPoolExecutor
ThreadPoolExecutor模块:from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
创建对象:executor=ThreadPoolExecutor(max_workers=3)
开启线程(异步):future=executor.submit(task,i)
关闭线程:executor.shutdown(True)
获取线程结果:future.result()
回调函数:add_done_callback(fn)
map简化:executor.map(task,range(1,12))
对等于:<br># for i in range(11):<br> # future=executor.submit(task,i)
0 条评论
下一页