当前位置: 首页 > news >正文

Python高性能编程

 一、进程池和线程池

1.串行

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import time

import requests

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

start_time = time.time()

for url in url_lists:

    response = requests.get(url)

    print(response.text)

print("Runtime: {}".format(time.time()-start_time))

# Runtime: 1.95

  

2.多进程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from multiprocessing import Process

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.text)

if __name__ == '__main__':

    p_list = []

    start_time = time.time()

    for url in url_lists:

        p = Process(target=task, args=(url,))

        p_list.append(p)

        p.start()

    for in p_list:

        p.join()

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 1.91

 

3.进程池(1)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from concurrent.futures import ProcessPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.content)

if __name__ == '__main__':

    start_time = time.time()

    pool = ProcessPoolExecutor(10)

    for url in url_lists:

        pool.submit(task,url)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 2.00

  

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

# 进程池 + 回调函数

import time

import requests

from concurrent.futures import ProcessPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    return response.content

def callback(future):

    print(future.result())

if __name__ == '__main__':

    start_time = time.time()

    pool = ProcessPoolExecutor(10)

    for url in url_lists:

        v = pool.submit(task,url)

        v.add_done_callback(callback)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

  

3.进程池(2)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

import time

import requests

from  multiprocessing import Pool

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    return response.content

def callBackFunc(content):

    print(content)

if __name__ == '__main__':

    start_time = time.time()

    pool = Pool(10)

    for url in url_lists:

        pool.apply_async(func=task,args=(url,),callback=callBackFunc)

    pool.close()

    pool.join()

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 1.96

 

2019-03-06 補充

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

# Parallel 底层調用的是 multiprocessing

import time

from joblib import Parallel, delayed

def func(idx):

    print(idx)

    time.sleep(1)

    return {'idx':idx}

start_ts = time.time()

results = Parallel(-1)(

    delayed(func)(x) for in range(4)

)

print(results)

print('Runtime : {}'.format(time.time()-start_ts))

4.多线程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from threading import Thread

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.text)

if __name__ == '__main__':

    t_list = []

    start_time = time.time()

    for url in url_lists:

        t = Thread(target=task, args=(url,))

        t_list.append(t)

        t.start()

    for in t_list:

        t.join()

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 0.49

  

5.线程池

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import time

import requests

from concurrent.futures import ThreadPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    print(response.content)

if __name__ == '__main__':

    start_time = time.time()

    pool = ThreadPoolExecutor(10)

    for url in url_lists:

        pool.submit(task,url)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

# Runtime: 0.51

  

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

# 线程池 + 回调函数

import time

import requests

from concurrent.futures import ThreadPoolExecutor

"""

Py2里    没有线程池   但是有进程池

Py3里    有线程池     有进程池

"""

url_lists = [

    'http://www.baidu.com',

    'http://fanyi.baidu.com',

    'http://map.baidu.com',

    'http://music.baidu.com/',

    'http://tieba.baidu.com',

    'http://v.baidu.com',

    'http://image.baidu.com',

    'http://zhidao.baidu.com',

    'http://news.baidu.com',

    'http://xueshu.baidu.com']

def task(url):

    response = requests.get(url)

    return response.content

def callback(future):

    print(future.result())

if __name__ == '__main__':

    start_time = time.time()

    pool = ThreadPoolExecutor(10)

    for url in url_lists:

        v = pool.submit(task,url)

        v.add_done_callback(callback)

    pool.shutdown(wait=True)

    print("Runtime: {}".format(time.time() - start_time))

  

二、异步非阻塞

1

2

3

4

5

6

7

"""

异步非阻塞/异步IO

    非阻塞: 不等待

      异步: 回调函数

    本质:一个线程完成并发操作(前提是执行过程中一定得有IO,这样才能让线程空闲出来去执行下一个任务)

"""

1.asyncio示例1

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

import asyncio

@asyncio.coroutine

def func1():

    print('before...func1......')

    yield from asyncio.sleep(2)

    print('end...func1......')

@asyncio.coroutine

def func2():

    print('before...func2......')

    yield from asyncio.sleep(1)

    print('end...func2......')

@asyncio.coroutine

def func3():

    print('before...func3......')

    yield from asyncio.sleep(3)

    print('end...func3......')

tasks = [func1(), func2(), func3()]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

### 结果 ###

before...func3......

before...func2......

before...func1......

end...func2......

end...func1......

end...func3......

2.asyncio示例2

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

####################################################################################

# async/await 是 python3.5中新加入的特性,将异步从原来的yield 写法中解放出来,变得更加直观;

# async 写在def前,替代了装饰器@asyncio.coroutine;await 替换了yield from;

####################################################################################

import asyncio

async def func1():

    print('before...func1......')

    await asyncio.sleep(2)

    print('end...func1......')

async def func2():

    print('before...func2......')

    await asyncio.sleep(1)

    print('end...func2......')

async def func3():

    print('before...func3......')

    await asyncio.sleep(3)

    print('end...func3......')

tasks = [func1(), func2(), func3()]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

  

3.asyncio示例3

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

import asyncio

@asyncio.coroutine

def fetch_async(host, url='/'):

    print(host, url)

    reader, writer = yield from asyncio.open_connection(host, 80)

    request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)

    request_header_content = bytes(request_header_content, encoding='utf-8')

    writer.write(request_header_content)

    yield from writer.drain()

    text = yield from reader.read()

    print(host, url, text)

    writer.close()

task_list = [

    fetch_async('www.cnblogs.com''/standby/'),

    fetch_async('www.cnblogs.com''/standby/p/7739797.html'),

    fetch_async('www.cnblogs.com''/wupeiqi/articles/6229292.html')

]

loop = asyncio.get_event_loop()

results = loop.run_until_complete(asyncio.gather(*task_list))

loop.close()

  

4.asyncio+aiohttp示例1

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import asyncio

import aiohttp

import async_timeout

async def fetch(session, url):

    with async_timeout.timeout(10):

        async with session.get(url) as response:

            return await response.text()

async def fetch_async(url):

    async with aiohttp.ClientSession() as session:

        html = await fetch(session, url)

        print(html)

tasks = [

    fetch_async('https://api.github.com/events'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/client.html')]

event_loop = asyncio.get_event_loop()

results = event_loop.run_until_complete(asyncio.gather(*tasks))

event_loop.close()

或者

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

import asyncio

import aiohttp

import async_timeout

async def fetch_async(url):

    async with aiohttp.ClientSession() as session:

        with async_timeout.timeout(10):

            async with session.get(url) as resp:

                print(resp.status)

                print(await resp.text())

tasks = [

    fetch_async('https://api.github.com/events'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/'),

    fetch_async('http://aiohttp.readthedocs.io/en/stable/client.html')]

event_loop = asyncio.get_event_loop()

results = event_loop.run_until_complete(asyncio.gather(*tasks))

event_loop.close()

5.asyncio+requests示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import asyncio

import requests

@asyncio.coroutine

def fetch_async(func, *args):

    loop = asyncio.get_event_loop()

    future = loop.run_in_executor(None, func, *args)

    response = yield from future

    print(response.url, response.content)

tasks = [

    fetch_async(requests.get'http://aiohttp.readthedocs.io/en/stable/'),

    fetch_async(requests.get'https://api.github.com/events')

]

loop = asyncio.get_event_loop()

results = loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

补充:

1

2

3

4

5

有时候会遇到 RuntimeError: Event loop is closed 这个错误

参考:https://stackoverflow.com/questions/45600579/asyncio-event-loop-is-closed

在 fetch_async 函数里添加一下语句即可

asyncio.set_event_loop(asyncio.new_event_loop())

6.gevent+requests示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import gevent

import requests

from gevent import monkey

from gevent.pool import Pool

monkey.patch_all()

def fetch_async(method, url, req_kwargs):

    print(method, url, req_kwargs)

    response = requests.request(method=method, url=url, **req_kwargs)

    print(response.url, response.content)

# ##### 发送请求 #####

# gevent.joinall([

#     gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),

#     gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),

#     gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),

#     gevent.spawn(fetch_async, method='get', url='https://api.github.com/events', req_kwargs={}),

# ])

# ##### 发送请求(协程池控制最大协程数量) #####

# pool = Pool(None)

pool = Pool(3)

gevent.joinall([

    pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://api.github.com/events', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.baidu.com', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.ibm.com', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.intel.com', req_kwargs={}),

    pool.spawn(fetch_async, method='get', url='https://www.iqiyi.com', req_kwargs={}),

])

使用gevent协程并获取返回值示例:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

def get_single_chart_data_flux(single_data_param):

    import gevent

    from gevent import monkey

    from gevent.pool import Pool as gPool

    monkey.patch_socket()

    ip,port,timestamp,time_length,type_name,subtype_name,filter_str_list,appid,legend = single_data_param

    ModelClass = get_model_class(type_name)

    func = apps.get_app_config('serverdata').service.get_single_chart_data

    pool = gPool(len(filter_str_list))

    func_li = []

    for filter_str in filter_str_list:

        func_li.append(pool.spawn(func,ModelClass,ip,port,timestamp,time_length,subtype_name,filter_str,appid,legend))

    ret_li = gevent.joinall(func_li)

    # view_logger.debug(ret_li[0].get('value'))

    result_li = [{'filter_con':item.get('value')[2], 'value':item.get('value')[1], 'legend':item.get('value')[3]} for item in ret_li]

    return result_li

7.Twisted示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

from twisted.web.client import getPage, defer

from twisted.internet import reactor

def all_done(arg):

    reactor.stop()

def callback(contents,url):

    print(url,contents)

deferred_list = []

url_list = [

    'http://www.bing.com',

    'http://www.baidu.com',

    'https://www.python.org',

    'https://www.yahoo.com',

    'https://www.github.com'

]

start_time = time.time()

for url in url_list:

    deferred = getPage(bytes(url, encoding='utf8'))

    deferred.addCallback(callback,url)

    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)

dlist.addBoth(all_done)

reactor.run()

8.Tornado示例

以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率;

而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】 

三、自定义异步非阻塞模块

1.简单示例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

"""

异步非阻塞/异步IO

    非阻塞: 不等待

      异步: 回调函数

    本质:一个线程完成并发操作(前提是执行过程中一定得有IO,这样才能让线程空闲出来去执行下一个任务)

     

     

IO 多路复用  +  socket

    - IO多路复用:  select epoll 用于检测socket对象是否发生变化(是否连接成功,是否有数据到来)

    - socket  :  socket客户端

     

    - IO请求是不占用CPU的,计算型的才占用CPU

"""

import socket

import select

conn_list = []

input_list = []

for url in range(20):

    client = socket.socket()

    client.setblocking(False)

    try:

        client.connect(('61.135.169.121',80))

    except BlockingIOError as e:

        pass

    conn_list.append(client)

    input_list.append(client)

while True:

    rlist, wlist, errlist = select.select(input_list, conn_list, [], 0.05)

    for sock in wlist:

        sock.sendall(b"GET / HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n")

        conn_list.remove(sock)

    for sock in rlist:

        data = sock.recv(8192)

        sock.close()

        input_list.remove(sock)

        print(data)

    if not input_list:

        break

2.自定义异步非阻塞模块

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

#!/usr/bin/python3.5

# -*- coding:utf-8 -*-

"""

异步非阻塞/异步IO

    非阻塞: 不等待

      异步: 回调函数

    本质:一个线程完成并发操作(前提是执行过程中一定得有IO,这样才能让线程空闲出来去执行下一个任务)

     

     

IO 多路复用  +  socket

    - IO多路复用:  select epoll 用于检测socket对象是否发生变化(是否连接成功,是否有数据到来)

    - socket  :  socket客户端

     

    - IO请求是不占用CPU的,计算型的才占用CPU

"""

import socket

import select

from urllib import parse

class Request():

    def __init__(self,sock,url,callback):

        """

        初始化

        :param sock: client's socket

        :param callback: callback function

        :param url: page url which wanna crawling

        """

        self.sock = sock

        self.url = url

        self.callback = callback

    def fileno(self):

        return self.sock.fileno()

    @property

    def host(self):

        domain = parse.urlparse(self.url)

        return domain.netloc

    @property

    def pathinfo(self):

        domain = parse.urlparse(self.url)

        return domain.path

def async_request(url_list):

    conn_list = []

    input_list = []

    for li in url_list:

        sock = socket.socket()

        sock.setblocking(False)

        obj = Request(sock, li[0], li[1])

        try:

            sock.connect((obj.host,80))

        except BlockingIOError as e:

            pass

        conn_list.append(obj)

        input_list.append(obj)

    while True:

        # 监听socket是否已经发生变化 [request_obj,request_obj....request_obj]

        # 如果有请求连接成功:wlist = [request_obj,request_obj]

        # 如果有响应的数据:  rlist = [request_obj,request_obj....client100]

        rlist, wlist, errlist = select.select(input_list, conn_list, [], 0.05)

        for obj in wlist:

            # print("链接成功,发送请求...")

            # obj.sock.sendall("GET {0} HTTP/1.0\r\nHost: {1}\r\n\r\n".format(obj.pathinfo,obj.host).encode('utf-8'))

            obj.sock.sendall(bytes("GET {0} HTTP/1.0\r\nHost: {1}\r\n\r\n".format(obj.pathinfo,obj.host),encoding='utf-8'))

            conn_list.remove(obj)

        for obj in rlist:

            # print("获取响应...")

            data = obj.sock.recv(8192)

            obj.callback(data)

            obj.sock.close()

            input_list.remove(obj)

        if not input_list:

            break

if __name__ == '__main__':

    def callback1(data):

        print("cnblogs...", data)

    def callback2(data):

        print("csdn...", data)

    def callback3(data):

        print("tornadoweb...", data)

    url_list = [

        ['http://www.cnblogs.com/standby/p/7589055.html', callback1],

        ['http://www.cnblogs.com/wupeiqi/articles/6229292.html', callback1],

        ['http://blog.csdn.net/vip_wangsai/article/details/51997882', callback2],

        ['http://blog.csdn.net/hjhmpl123/article/details/53378068', callback2],

        ['http://blog.csdn.net/zcc_0015/article/details/50688145', callback2],

        ['http://www.tornadoweb.org/en/stable/guide.html', callback3],

        ['http://www.tornadoweb.org/en/stable/guide/async.html', callback3],

        ['http://www.tornadoweb.org/en/stable/guide/coroutines.html#python-3-5-async-and-await', callback3]

    ]

    async_request(url_list)

  

3.牛逼的异步IO模块

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

import select

import socket

import time

class AsyncTimeoutException(TimeoutError):

    """

    请求超时异常类

    """

    def __init__(self, msg):

        self.msg = msg

        super(AsyncTimeoutException, self).__init__(msg)

class HttpContext(object):

    """封装请求和相应的基本数据"""

    def __init__(self, sock, host, port, method, url, data, callback, timeout=5):

        """

        sock: 请求的客户端socket对象

        host: 请求的主机名

        port: 请求的端口

        port: 请求的端口

        method: 请求方式

        url: 请求的URL

        data: 请求时请求体中的数据

        callback: 请求完成后的回调函数

        timeout: 请求的超时时间

        """

        self.sock = sock

        self.callback = callback

        self.host = host

        self.port = port

        self.method = method

        self.url = url

        self.data = data

        self.timeout = timeout

        self.__start_time = time.time()

        self.__buffer = []

    def is_timeout(self):

        """当前请求是否已经超时"""

        current_time = time.time()

        if (self.__start_time + self.timeout) < current_time:

            return True

    def fileno(self):

        """请求sockect对象的文件描述符,用于select监听"""

        return self.sock.fileno()

    def write(self, data):

        """在buffer中写入响应内容"""

        self.__buffer.append(data)

    def finish(self, exc=None):

        """在buffer中写入响应内容完成,执行请求的回调函数"""

        if not exc:

            response = b''.join(self.__buffer)

            self.callback(self, response, exc)

        else:

            self.callback(self, None, exc)

    def send_request_data(self):

        content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (

            self.method.upper(), self.url, self.host, self.data,)

        return content.encode(encoding='utf8')

class AsyncRequest(object):

    def __init__(self):

        self.fds = []

        self.connections = []

    def add_request(self, host, port, method, url, data, callback, timeout):

        """创建一个要请求"""

        client = socket.socket()

        client.setblocking(False)

        try:

            client.connect((host, port))

        except BlockingIOError as e:

            pass

            # print('已经向远程发送连接的请求')

        req = HttpContext(client, host, port, method, url, data, callback, timeout)

        self.connections.append(req)

        self.fds.append(req)

    def check_conn_timeout(self):

        """检查所有的请求,是否有已经连接超时,如果有则终止"""

        timeout_list = []

        for context in self.connections:

            if context.is_timeout():

                timeout_list.append(context)

        for context in timeout_list:

            context.finish(AsyncTimeoutException('请求超时'))

            self.fds.remove(context)

            self.connections.remove(context)

    def running(self):

        """事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""

        while True:

            r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)

            if not self.fds:

                return

            for context in r:

                sock = context.sock

                while True:

                    try:

                        data = sock.recv(8096)

                        if not data:

                            self.fds.remove(context)

                            context.finish()

                            break

                        else:

                            context.write(data)

                    except BlockingIOError as e:

                        break

                    except TimeoutError as e:

                        self.fds.remove(context)

                        self.connections.remove(context)

                        context.finish(e)

                        break

            for context in w:

                # 已经连接成功远程服务器,开始向远程发送请求数据

                if context in self.fds:

                    data = context.send_request_data()

                    context.sock.sendall(data)

                    self.connections.remove(context)

            self.check_conn_timeout()

if __name__ == '__main__':

    def callback_func(context, response, ex):

        """

        :param context: HttpContext对象,内部封装了请求相关信息

        :param response: 请求响应内容

        :param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None)

        :return:

        """

        print(context, response, ex)

    obj = AsyncRequest()

    url_list = [

        {'host''www.google.com''port': 80, 'method''GET''url''/''data''''timeout': 5,

         'callback': callback_func},

        {'host''www.baidu.com''port': 80, 'method''GET''url''/''data''''timeout': 5,

         'callback': callback_func},

        {'host''www.bing.com''port': 80, 'method''GET''url''/''data''''timeout': 5,

         'callback': callback_func},

    ]

    for item in url_list:

        print(item)

        obj.add_request(**item)

    obj.running()

相关文章:

Python高性能编程

一、进程池和线程池 1.串行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import time import requests url_lists [ http://www.baidu.com, http://fanyi.baidu.com, http://map.baidu.com, http://music.baidu.com/, http://tieba.baid…...

MVVM模式下如何正确【视图绑定+数据】

概述 我如何&#xff08;不在后面的代码中使用代码&#xff09;自动绑定到我想要的视图&#xff1f;据我了解&#xff0c;如果正确完成&#xff0c;这就是模式应该如何工作。我可以使用主窗口 xaml 中的代码实现这一切&#xff0c;我甚至正确创建了一个资源字典&#xff08;因…...

外包测试3年,离职后成功入职华为,拿到offer的那天我泪目了....

一提及外包测试&#xff0c;大部分人的第一印象就是&#xff1a;工作强度大&#xff0c;技术含量低&#xff0c;没有归属感&#xff01;外包工作三年总体感受就是这份工作缺乏归属感&#xff0c;心里总有一种落差&#xff0c;进步空间不大&#xff0c;接触不到核心技术&#xf…...

Qt Study

按钮->点击->窗口->关闭窗口 connect(信号的发送者&#xff0c;发送具体信号&#xff0c;信号的接收者&#xff0c;信号的处理); 信号处理函数称为槽 信号槽的优点&#xff0c;松散耦合&#xff0c;信号发送端和接收端本身是没有关联的&#xff0c;通过connect连接…...

JS混淆技术探究及解密方法分析

随着Web技术的快速发展&#xff0c;JavaScript被广泛应用于网页开发、移动应用开发等领域。然而&#xff0c;JavaScript代码很容易被反编译、解密&#xff0c;这给保护网站和应用程序的安全性带来了严重的挑战。为了解决这个问题&#xff0c;JS混淆技术应运而生。JS混淆就是将J…...

智慧制硅厂 Web SCADA 生产线

我国目前是全球最大的工业硅生产国、消费国和贸易国&#xff0c;且未来该产业的主要增量也将来源于我国。绿色低碳发展已成为全球大趋势和国际社会的共识&#xff0c;随着我国“双碳”目标的推进&#xff0c;光伏产业链快速发展&#xff0c;在光伏装机需求的带动下&#xff0c;…...

案例09-数据类型不一致导致equals判断为false

一&#xff1a;背景介绍 在判断课程id和班级id的时候如果一致就像课程信息进行更新&#xff0c;如果不一致就插入一条新的数据。其实两个变量的值是一致的但是类型是不一致的。这就导致数据库中已经有一条这样的数据了&#xff0c;在判断的时候结果为false&#xff0c;就有插入…...

springsecurity中的类

Authentication AuthenticationProvider 每一个AuthenticationProvider对应一个Authentication 很多个AuthenticationProvider 由一个 ProviderManager管理 ProviderManager implements AuthenticationManager 一个ProviderManager有很多个 AuthenticationProvider Usern…...

k8s配置管理

一、configmap 1.1 configmap概述 Configmap 是 k8s 中的资源对象&#xff0c;用于保存非机密性的配置的&#xff0c;数据可以用 key/value 键值对的形式保存&#xff0c;也可通过文件的形式保存。 1.2 configmap作用 我们在部署服务的时候&#xff0c;每个服务都有自己的配置…...

技术官方文档中的代码是用什么展示的?代码高亮插件总结

****内容预警****菜鸟教程***大佬绕道我们经常看到各种技术官方文档&#xff0c;有很多代码展示的区域&#xff0c;用于我们复制粘贴代码&#xff0c;比如vue 的官网当我们需要自己实现这么一个网站的时候&#xff0c;我就开始手忙脚乱&#xff0c;这到底是咋实现的&#xff1f…...

2023年中职组网络安全竞赛——综合渗透测试解析

综合渗透测试 题目如下: PS:需求环境可私信博主,求个三连吧! 解析如下: 通过本地PC中的渗透测试平台KALI2020对服务器场景进行渗透攻击,获取到RSYNC服务所开放的端口,将RSYNC服务开放的端口数值进行MD5加密后作为FLAG提交(如MD5加密前:812);...

【全网最细PAT题解】【PAT乙】1044 火星数字(测试点2,测试点4详细解释)

题目链接 1044 火星数字 题目描述 火星人是以 13 进制计数的&#xff1a;地球人的 0 被火星人称为 tret。 地球人数字 1 到 12 的火星文分别为&#xff1a;jan, feb, mar, apr, may, jun, jly, aug, sep, oct, nov, dec。 火星人将进位以后的 12 个高位数字分别称为&#xff1a…...

rsync+xinetd+inotify+sersync

一、介绍 1.1、rsync 对比 scp 相同&#xff1a; 都有拷贝的功能不同&#xff1a; rsync:具有增量复制&#xff0c;每次复制的时候&#xff0c;会扫描对端是否在同路径下有我要发送的一样的文件或者目录&#xff0c;如果&#xff0c;如果存在&#xff0c;则不进行复制。边复制&…...

CSS - 扫盲

文章目录1. 前言2. CSS2.1 css 的引入方式2.2 选择器2.3 CSS 常用属性2.3.1 字体属性2.3.2 文本属性2.3.3 背景属性2.4 圆角矩形2.5 元素的显示模式2.6 盒子模型2.7 弹性布局1. 前言 上文我们简单 将 HTML 过了一遍 &#xff0c; 知道了 HTML 知识表示页面的结构和内容 &#x…...

ChatGPT能完全取代软件开发吗,看看它怎么回答?

最近网上一直疯传&#xff0c;ChatGPT 最可能取代的 10 种工作。具体包括①、技术类工作&#xff1a;程序员、软件工程师、数据分析师②、媒体类工作&#xff1a;广告、内容创作、技术写作、新闻③、法律类工作&#xff1a;法律或律师助理④、市场研究分析师⑤、教师⑥、金融类…...

Vue3学习笔记

一、Ref ref, isRef, shallowRef, triggerRef, customRef ref返回的是es6的一个class类&#xff0c;取值和修改都要加上.valueref 和 shallowRef不能一起写&#xff0c;会引起shallowRef的视图更新ref shallowRef triggerRef <template><div class"home&quo…...

【React】pro-mobile

1.项目介绍 实现react移动端项目 2.目标&#xff1a; 能够应用CRAReactMobxAntd-mobile开发C端项目掌握基于React的C端项目开发流程学会如何应用next优化项目 3.使用技术栈 脚手架&#xff1a;cra dva-cliumi 脚本&#xff1a;ts react版本&#xff1a;react v18 2022年更…...

Substrate 基础教程(Tutorials) -- 授权特定节点

五、授权特定节点 在添加可信节点中&#xff0c;您看到了如何使用一组已知的验证器节点构建一个简单的网络。该教程演示了一个简化版的许可网络&#xff08;permissioned network&#xff09;。在一个被许可的网络中&#xff0c;只有被授权的节点&#xff08;authorized nodes…...

使用qemu-img转换镜像格式

qemu功能强大&#xff0c;详细了解其功能请到官网查看 https://www.qemu.org/docs/master/system/images.html qemu-img能将RAW、qcow2、VMDK、VDI、VHD&#xff08;vpc&#xff09;、VHDX、qcow1或QED格式的镜像转换成VHD格式&#xff0c;也可以实现RAW和VHD格式的互相转换。 …...

Springboot怎么集成Thymeleaf模板引擎?

Thymeleaf介绍Thymeleaf&#xff0c;是一个XML/XHTML/HTML模板引擎&#xff0c;开源的java库&#xff0c;可以用于SpingMVC项目中&#xff0c;用于代替JSP、FreeMarker或者其他的模板引擎&#xff1b;页面与数据分离&#xff0c;提高了开发效率&#xff0c;让代码重用更容易。S…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机&#xff0c;因为在使用过程中发现 Airsim 对外部监控相机的描述模糊&#xff0c;而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置&#xff0c;最后在源码示例中找到了&#xff0c;所以感…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

GitHub 趋势日报 (2025年06月06日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...

4. TypeScript 类型推断与类型组合

一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式&#xff0c;自动确定它们的类型。 这一特性减少了显式类型注解的需要&#xff0c;在保持类型安全的同时简化了代码。通过分析上下文和初始值&#xff0c;TypeSc…...

【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验

Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...

41道Django高频题整理(附答案背诵版)

解释一下 Django 和 Tornado 的关系&#xff1f; Django和Tornado都是Python的web框架&#xff0c;但它们的设计哲学和应用场景有所不同。 Django是一个高级的Python Web框架&#xff0c;鼓励快速开发和干净、实用的设计。它遵循MVC设计&#xff0c;并强调代码复用。Django有…...