python多进程多线程

on under python
3 minute read

0x00 python性能优化

http://pfmiles.github.io/blog/python-speed-performance-tips/

0x01 为什么有人说Python的多线程是鸡肋

差不多是这样子.多线程目前仅用于网络多线程采集, 以及性能测试.
其它的语言也有类似的情况,线程本身的特点导致线程的适用范围是受限的.只有CPU过剩,而其它的任务很慢,此时用线程才是有
益的,可以很好平衡等待时间,提高并发性能.线程的问题主要是线程的安全稳定性.线程无法强制中止,同时线程与主进程共享内
存,可能会影响主进程的内存管理.在python里线程出问题,可能会导致主进程崩溃. 虽然python里的线程是操作系统的真实线程
那么怎么解决呢?通过我们用进程方式.子进程崩溃后,会完全的释放所有的内存和错误状态.所以进程更安全. 另外通过进程,
python可以很好的绕过GIL,这个全局锁问题. 但是进程也是有局限的.不要建立超过CPU总核数的进程,否则效率也不高.

简单的总结一下.
当我们想实现多任务处理时,首先要想到使用multiprocessing, 但是如果觉着进程太笨重,那么就要考虑使用线程. 如果多任务
处理中需要处理的太多了,可以考虑多进程,每个进程再采用多线程.如果还处理不要,就要使用轮询模式,比如使用poll event,
twisted等方式.如果是GUI方式,则要通过事件机制,或者是消息机制处理,GUI使用单线程.所以在python里线程不要盲目用, 也
不要滥用. 但是线程不安全是事实.如果仅仅是做几个后台任务,则可以考虑使用守护线程做.如果需要做一些危险操作,可能会
崩溃的,就用子进程去做. 如果需要高度稳定性,同时并发数又不高的服务.则强烈建议用多进程的multiprocessing模块实现.
在linux或者是unix里,进程的使用代价没有windows高.还是可以接受的.

                                                            --refer from baidu zhidao
more:
    https://www.zhihu.com/question/23474039

0x02 经典多线程学习理解

http://blog.csdn.net/a921800467b/article/details/8579915

0x03 网络io 阻塞|非阻塞 同步|异步

http://www.cnblogs.com/Anker/p/3254269.html

例子:

老张爱喝茶,废话不说,煮开水.
出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶).
1 老张把水壶放到火上,立等水开.(同步阻塞)
老张觉得自己有点傻
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有.(同步非阻塞)
老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶.水开之后,能大声发出嘀~~~~的噪音.
3 老张把响水壶放到火上,立等水开.(异步阻塞)
老张觉得这样傻等意义不大
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶.(异步非阻塞)
老张觉得自己聪明了.

所谓同步异步,只是对于水壶而言.
普通水壶,同步;响水壶,异步.
虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了.这是普通水壶所不能及的.
同步只能让调用者去轮询自己(情况2中),造成老张效率的低下.

所谓阻塞非阻塞,仅仅对于老张而言.
立等的老张,阻塞;看电视的老张,非阻塞.
情况1和情况3中老张就是阻塞的,媳妇喊他都不知道.虽然3中响水壶是异步的,可对于立等的老张没有太大的意义.所以一般
异步是配合非阻塞使用的,这样才能发挥异步的效用.

——来源网络,作者不明. 

0x03 实例

good
    http://www.2cto.com/kf/201504/395330.html 
    https://mugglecoding.gitbooks.io/qa/content/poolzen_yao_shi_yong.html

http://www.cnblogs.com/zhangchaoyang/articles/5188904.html 线程池实现
https://superxiaoxiong.github.io/2016/08/25/python-threadpool/
http://www.jb51.net/article/79631.htm
http://www.jb51.net/article/71911.htm
http://www.cnblogs.com/fnng/p/3670789.html
http://5ydycm.blog.51cto.com/115934/1435042
http://www.codexiu.cn/python/blog/939/

0x04 attention

1>threadpool

    threadpool线程池模块已经过时,官方如下说明,建议使用multiprocessing模块甚至是用asyncio这个模块来通过异步实现

    This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO
    NOT USE IT FOR NEW PROJECTS! Use modern alternatives like the multiprocessing module in the standard
    library or even an asynchroneous approach with asyncio

    multiprocessing多进程多线程模块
    https://docs.python.org/2.7/library/multiprocessing.html(py官方doc)

    asyncio异步io模块(pyhton3.4新增模块,py2没有)
    https://docs.python.org/3/library/asyncio.html#module-asyncio

2>join()和setDaemon的区别

    http://blog.csdn.net/zhangzheng0413/article/details/41728869
    功能基本是相反的
        thread.join是要求所有子线程执行完,主线程才不阻塞

        多线程或是多进程中的join函数的功能将调用join函数的线程或是进程进入"阻塞模式"(这个阻塞模式是相对于线程或主进程的
        ),也即如果某个线程或进程调用了join函数,那么不运行完这个线程或进程的话,整个程序的执行流程就不会再进入到主线程中
        ,也即主线程被阻塞.

        thread.setDaemon(True)是将父线程设置为守护线程,也即主线程执行完后不管子线程有没有执行完,子线程直接结束,
        setDaemon要设置在thread.start()之前

3>pool.apply()和pool.map

eg.
from multiprocessing.dummy import Pool as ThreadPool
pool=ThreadPool(20)

for循环下的pool.apply()相当于pool.map

pool.apply()和pool.apply_async()的区别是:apply_async是并行处理,apply不是,必须等每个子线程执行完
    apply
    It blocks until the result is ready, so apply_async() is better suited for performing work in parallel.

0x05 important

multiprocessing模块是重点

multiprocessing pool源码解读
http://www.bingtel.wang/python%E5%A4%9A%E8%BF%9B%E7%A8%8B%E7%BC%96%E7%A8%8B%E4%B9%8Bmultiprocessing-pool%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB.html

https://docs.python.org/2.7/library/multiprocessing.html(py官方英文doc)
http://python.usyiyi.cn/translate/python_278/library/index.html(中文doc)
http://stackoverflow.com/questions/4474940/unsupported-operand-types-exception-with-threappool-map-but-not-map

中文不详尽,英文中说到multiprocessing.dummy就是multiprocessing的复制版本,只不过multiprocessing.dummy作用于线程
"multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module."
所以参考multiprocessing的api来使用操作线程池即可

pool.close()
    防止任何更多的任务(其他没有安排过的任务,map过的任务不属于这个范围)被提交到线程池中.一旦所有的任务已经完成了工作线程将退出.

pool.terminate()
    立即停止线程池中的工作线程(无论任务有没有完成),经测试并不能停止所有的线程池中要进行的任务,应该
    是只能停止当前正在执行的一个函数过程(子线程)

    为了停止线程池中的所有线程,尤其是在爆破密码时,某一个线程已经爆破成功,为了达到"停止其他的将要执
    行的通过map加入到pool线程池中的任务"相同的效果,在每个线程中的开头判断是否已经爆破成功,成功则直
    接return,不成功则继续本线程应该完成的工作

pool.join()
    等待线程池中的工作线程退出,必须先使用pool.close()或者pool.terminate()才能调用join()

0x06 concurrent.futures(python3.2新增模块)

https://docs.python.org/3/library/concurrent.futures.html
官网的翻译如下
https://www.bwangel.me/2016/09/23/concurrent-futures/
ThreadPoolExecutor线程池
ProcessPoolExecutor进程池

good understanding on concurrent.futures module:
http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html

map和submit
http://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p08_perform_simple_parallel_programming.html

python GIL
https://www.youtube.com/watch?v=ph374fJqFPE

0x06 my easy_crawl code

def get_random_ua():
    #得到随机user-agent值
    f=open("dicts/user-agents.txt","r+")
    all_user_agents=f.readlines()
    f.close()
    random_ua_index=random.randint(0,len(all_user_agents)-1)
    ua=re.sub(r"(\s)$","",all_user_agents[random_ua_index])
    return ua

def get_random_x_forwarded_for():
    #得到随机x-forwarded-for值
    numbers = []
    while not numbers or numbers[0] in (10, 172, 192):
        numbers = random.sample(xrange(1, 255), 4)
    return '.'.join(str(_) for _ in numbers)


def get_request(uri):
    #发出get请求,返回值为一个字典,有三个键值对:eg.{"code":200,"title":None,"content":""}
    #code是int类型
    #title如果没有则返回None,有则返回str类型
    #content如果没有则返回""
    try:
        import mechanize
    except:
        os.system("pip install mechanize")
        import mechanize
    try:
        import cookielib
    except:
        os.system("pip install cookielib")
        import cookielib
    try:
        br = mechanize.Browser()
        br.set_cookiejar(cookielib.LWPCookieJar()) # Cookie jar
        br.set_handle_equiv(True) # Browser Option
        br.set_handle_gzip(False)# use to be True
        br.set_handle_redirect(True)
        br.set_handle_referer(True)
        br.set_handle_robots(False)
        br.set_handle_refresh(mechanize._http.HTTPRefreshProcessor(), max_time=1)
        ua=get_random_ua()
        x_forwarded_for=get_random_x_forwarded_for()
        br.addheaders = [('User-agent', '%s' % ua),('X-Forwarded-For','%s' % x_forwarded_for)]
        br.open(uri)
        br._factory.is_html = True
        code=br.response().code
        title=br.title()
        content=br.response().read()
    except mechanize.HTTPError as e:
        code=e.code
        content=e.read()
        soup=BeautifulSoup(content,"lxml")
        title=soup.title.string
    return_value={'code':code,'title':title,'content':content}
    #print return_value
    return return_value


def collect_uris_from_uri(uri):
#从uri所在的html内容中收集uri到uri队列
#返回值是一个字典,{'y1':y1,'y2':y2}
#y1是根据参数uri得到的html页面中的所有uri,是个列表类型
#y2是参数uri对应的三个关键元素,y2是个字典类型,eg.{"code":200,"title":None,"content":""}
    all_uris=[]
    result=get_request(uri)
    content=result['content']
    bs=BeautifulSoup(content,"lxml")
    for each in bs.find_all('a'):
        find_uri=each.get('href')
        all_uris.append(find_uri)
    return {'y1':all_uris,'y2':result}


class MyThread(threading.Thread):
    def __init__(self,func,args,name=''):
        threading.Thread.__init__(self)
        self.name=name
        self.func=func
        self.args=args
    def run(self):
        self.result=apply(self.func,self.args)
    def get_result():
        return self.result

def crawl_uri(uri):
    #爬虫,可获取uri对应网站的所有可抓取的uri和所有网页三元素:code,title,content
    uris_queue=Queue.Queue()
    uris_uri_keyvalues={}
    #uri和uri的三个元素的对应值为一个字典的键值对
    #eg{'http://www.baiud.com':{'code':code,'title':title,'content':content},"":{},"":{},...}
    #用于收集uri与对应的三个元素的值,收集后考虑可能统一存入数据库

    #收集相同域名网站内的uris
    domain_uris=[]
    #收集二级域名
    subdomain_uris=[]
    def task_finish_func():
        while True:
            current_uri=uris_queue.get()
            http_domain=get_http_domain_from_uri(current_uri)
            #eg.main_domain_prefix从http://www.freebuf.com中得到www
            #main_domain_key_value从http://www.freebuf.com中得到freebuf
            main_domain_prefix=re.search(r"http(s)?://([^\.]*)\.([^\./]*)",current_uri).group(2)
            main_domain_key_value=re.search(r"http(s)?://([^\.]*)\.([^\./]*)",current_uri).group(3)
            #print main_domain_prefix
            #print main_domain_key_value
            #raw_input()
            result=collect_uris_from_uri(current_uri)
            uris=result['y1']
            for each in uris:
                #collect_uris_from_uri得到的结果中的元素可能是None
                if each is not None:
                    tmp=get_http_domain_pattern_from_uri(http_domain)
                    http_domain_pattern=re.compile(r"%s" % tmp)
                    if re.match(http_domain_pattern,each):
                        each=re.sub(r"/$","",each)
                        if each not in domain_uris:
                            print each
                            domain_uris.append(each)
                            #print domain_uris
                            uris_queue.put(each)
                    sub_pattern=re.compile(r"http(s)?://[^\.]*(?<!%s)\.%s" % (main_domain_prefix,main_domain_key_value))
                    if re.match(sub_pattern,each):
                        each_subdomain=get_http_domain_from_uri(each)
                        if each_subdomain not in subdomain_uris:
                            #二级域名只将http_domain部分加入收藏,不放到队列里
                            print each_subdomain
                            subdomain_uris.append(each_subdomain)
            uris_uri_keyvalues['%s' % uri]=result
            uris_queue.task_done()


    #初始化
    uri=re.sub(r"(/{0,2}(\s){0,2})$","",uri)
    start_uris=[uri]
    http_domain=get_http_domain_from_uri(uri)
    if uri!=http_domain:
        start_uris.append(http_domain)
    for each in start_uris:
        uris_queue.put(each)

    '''传统多线程
    mythreads=[]
    start=time.time()
    for i in range(15):
        mythread=MyThread(task_finish_func,())
        mythreads.append(mythread)
    for i in range(15):
        mythreads[i].setDaemon(True)
        mythreads[i].start()
        print "%s threads started" % str(i)
    uris_queue.join()
    end=time.time()
    print end-start
    print len(domain_uris)
    #upon threads=200 ===> 38,16
    #threads=5 ===> 12,16
    #threads=10 4,16
    #threads=15 3.4,16
    '''

    '''单线程
    start=time.time()
    task_finish_func()
    uris_queue.join()
    end=time.time()
    print end-start
    print len(domain_uris)
    #thread=1 task_finish_func中while True改成while uris_queue.empty() is False,要不然不会执行到最后    19.4,16
    '''

    mythreads=[]
    start=time.time()
    pool=ThreadPool(15)
    for i in range(15):
        #这里比较特殊,因为函数task_finish_func是个无限循环的函数,所以就算这里开20个线程也会因为线程池中只有15个
        #位置使得一直只有15个线程在运行,多余的5个线程相当于没有设置一样
        pool.apply_async(task_finish_func,())
    pool.close()

    while 1:
        #这里的if取代下面的uris_queue.join(),有时爬虫被ban时,如果用uris_queue.join()会无限等待
        num_of_result=len(domain_uris)
        sleep(15)
        if len(domain_uris)==num_of_result:
            print "finished,if the number of uris you get is not big enough,you may be banned to crawl :("
            break

    #uris_queue.join()
    end=time.time()
    print end-start
    print len(domain_uris)
    #threads=15(pool_size=15,for_range=20) 4.15,16
    #threads=20(pool_size=20,for_range=20) 9.2,16
    #threads=10(pool_size=10,for_range=20) 4.2,16

python, coding, effective
home
github
archive
category