到目前为止,我们学习的套接字都是阻塞的,这就意味着,如果我们想实现并发,比如一个服务器同时(感觉上)服务多个客户端,就需要用多线程或者多进程或者协程来解决。
当然,使用多进程或多线程来解决阻塞问题,优点是简单清晰,但是耗费系统资源,效率也不是甚好,如果使用协程确实是一个很好的方法,同学们可以用协程实现阻塞套接字的并发问题。
但是,在协程这种技术没有出现之前,主流的解放方案是什么呢?比如 nginx,nodejs,tornado 都没有使用多线程,它们都是一个进程只启动一个线程,当然你可以启动多个进程,但是,哪怕是一个进程它们也实现了并发。
那些主流的服务器或框架是如何用单线程实现并发的,首先,前提条件是保证没有阻塞行为存在,所以,我们要把套接字弄成非阻塞的,然后,他们对每个网络 IO 进行轮询,如果有消息就处理,没有消息就忽略轮询下一个网络 IO。
总之,单线程情况下(不使用协程),用轮询的方式是实现并发的技术,用非阻塞行为是实现并发的前提条件。
注意,我们这儿说的阻塞行为一般是指 IO 密集型(网络 IO 或 磁盘 IO),而非计算密集型(CPU)。
调用套接字的 setblocking(False)
语句,即可把套接字变成非阻塞套接字。
''' 服务器端 ''' import socket ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # tcp的套接字 ss.setblocking(False) # recv,send,connect,accept 都不是阻塞的 ss.bind(('127.0.0.1', 9999)) ss.listen(5) sockslist = [] # 存储客户端的套接字 # 每个客户业务执行的时间粒度比较大 # 并发(是一种感觉) # 异步 while True: try: serversock, addr = ss.accept() # 协议栈缓冲区没有套接字,抛出异常 sockslist.append(serversock) # 来客户端连接把连上的套接字入队列 except: pass for sock in sockslist: # 循环遍历已连上的套接字做读写操作 try: data = sock.recv(1024).decode("utf-8") # 任务(对接一个客户端一个任务)异步并发(任务) if not data: print("客户端退出") sock.close() sockslist.remove(sock) else: print(data) sock.send("hello,我是老鸟python服务器".encode("utf-8")) except: pass
''' 客户端 ''' import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('127.0.0.1', 9999)) while True: client_data = input("请输入数据:") s.send(client_data.encode("utf-8")) try: recvdata = s.recv(1024).decode("utf-8") print(recvdata) except: s.close() break s.close()
我们先启动服务器,然后启动多个客户端,你会发现我们的单线程服务器可以同时(感觉上)和多个客户端进行通信。
由于这种单线程异步轮询结合非阻塞套接字的网络架构是编写网络服务器的主流方式,以至于,出现了好多成熟的网络模型,比如 select 模型,poll 模型,epoll 模型等等。nginx,nodejs,tornado 都是使用 epoll 模型。
打个比方,在 2.4G 内存,1 核 cpu 上,如果我们使用多线程处理并发,一般在 2000-4000 个客户端就会感觉不平滑了;如果是我们自己单线程轮询或者使用 select 或 poll 模型,大概可以服务 8000 个左右的客户端;如果我们采用 epoll 模型,则可以服务器 80 万个客户端左右。
首先我们以 select 模型为例,select 模型和 poll 模型一样,都是基于事件驱动的,也就是说,我们的网络 IO 行为(读写)都是通过事件通知告知的。
''' 服务器端 ''' import select import socket import queue # select模型:只有两种事件(读事件和写事件),读事件包含对方的connect,send,close;写事件是我们的send server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) # 设置非阻塞套接字 # 设置多路复用 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ('127.0.0.1', 9898) server.bind(server_address) server.listen(5) # 监听套接字,对方的connect对我们的套接字来说也是读事件到来 inputs = [server] # 我们要写的socket outputs = [] # 字典存放阻塞的队列 QUEUE message_queues = {} # 超时时间,暂时不用 timeout = 400 while inputs: print("等待下一个事件的到来") readable, writable, exceptional = select.select(inputs, outputs, inputs) for s in readable: # 读事件到来,包含对方connect,send 和 close 或异常退出 if s is server: # 如果是对方connect connection, client_address = s.accept() print("欢迎到来", client_address) connection.setblocking(0) inputs.append(connection) # 加入已经连上的某个客户端的套接字 message_queues[connection] = queue.Queue() # 给该套接字对应一个发送消息的队列 else: # 对方的send或者对方的close或者异常退出 try: data = s.recv(1024).decode("utf-8") if data: # 对方send print(" received ", data, "from ", s.getpeername()) message_queues[s].put(data) if s not in outputs: outputs.append(s) # 触发写事件 else: # 对方close print("对方关闭", client_address) if s in outputs: outputs.remove(s) inputs.remove(s) s.close() del message_queues[s] # 移除消息队列 except: # 异常退出 print("客户端异常退出", client_address) if s in outputs: outputs.remove(s) inputs.remove(s) s.close() del message_queues[s] # 移除消息队列 for s in writable: # 写事件 try: next_msg = message_queues[s].get_nowait() except queue.Empty: print(" ", s.getpeername(), u'空队列') outputs.remove(s) else: print(u" 发送 ", next_msg, u" 到 ", s.getpeername()) s.send(next_msg.encode("utf-8")) for s in exceptional: print(" 异常情况 ", s.getpeername()) inputs.remove(s) # 停止监听 if s in outputs: outputs.remove(s) s.close() del message_queues[s] # 移除消息队列
''' 客户端 ''' import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('127.0.0.1', 9898)) while True: data = input("请输入数据:") s.send(data.encode("utf-8")) try: data = s.recv(1024).decode("utf-8") print(data) except: s.close() s.close()
同样,我们先启动服务器端,然后启动多个客户端,我们发现单线程的服务器可以同时(感觉上)对多个客户端进行并发服务。
epoll 模型在 Linux2.6 之后才出现的 ,它具备了 select 模型和 poll 模型的一切优点,公认为性能最好的多路 IO 就绪通知方法。epoll 模型目前只支持 linux 系统,他是 linux 系统内核实现的。
''' 服务器端 ''' import socket, select server = socket.socket() server.bind(("127.0.0.1", 1688)) server.listen(5) msgs = [] fd_socket = {server.fileno(): server} epoll = select.epoll() # 注册服务器的 写就绪 epoll.register(server.fileno(), select.EPOLLIN) while True: for fd, event in epoll.poll(): sock = fd_socket[fd] print(fd, event) # 返回的是文件描述符 需要获取对应socket if sock == server: # 如果是服务器 就接受请求 client, addr = server.accept() # 注册客户端写就绪 epoll.register(client.fileno(), select.EPOLLIN) # 添加对应关系 fd_socket[client.fileno()] = client # 读就绪 elif event == select.EPOLLIN: data = sock.recv(2018).decode("utf-8") if not data: # 注销事件 epoll.unregister(fd) # 关闭socket sock.close() # 删除socket对应关系 del fd_socket[fd] print(" somebody fuck out...") continue print(data.decode("utf-8")) # 读完数据 需要把数据发回去所以接下来更改为写就绪=事件 epoll.modify(fd, select.EPOLLOUT) # 记录数据 msgs.append((sock,data.upper())) elif event == select.EPOLLOUT: for item in msgs[:]: if item[0] == sock: sock.send(item[1].encode("utf-8")) msgs.remove(item) # 切换关注事件为写就绪 epoll.modify(fd,select.EPOLLIN)
''' 客户端 ''' # 创建客户端socket对象 import socket clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 服务端IP地址和端口号元组 server_address = ('127.0.0.1',1688) # 客户端连接指定的IP地址和端口号 clientsocket.connect(server_address) while True: # 输入数据 data = raw_input('please input:') if data == "q": break if not data: continue # 客户端发送数据 clientsocket.send(data.encode("utf-8")) #客户端接收数据 server_data = clientsocket.recv(1024) print ('客户端收到的数据:',server_data) # 关闭客户端socket clientsocket.close()
大家注意,epoll 模型目前只有 linux 系统支持,所以上面的案例可以在 linux 系统实验。
我们把 select 模型和 epoll 模型做一次对比。
select 模型,需要遍历 socket 列表,频繁的对等待队列进行添加移除操作。
select 模型,数据到达后还需要给遍历所有 socket 才能获知哪些 socket 有数据两个操作消耗的时间随着要监控的 socket 的数量增加而大大增加,处于效率考虑才规定了最大只能监视 1024 个 socket。
epoll要解决的问题。
避免频繁的对等待队列进行操作。
避免遍历所有 socket 对于第一个问题 epoll,采取的方案是,将对等待队列的维护和,阻塞进程这两个操作进行拆分,第二个问题是 select 中进程无法获知哪些 socket 是有数据的所以需要遍历。
epoll 为了解决这个问题,在内核中维护了一个就绪列表。
会编写非阻塞套接字实现单线程异步并发。
会使用 select 模型。
会使用 epoll 模型。
利用协程实现单线程阻塞套接字的并发,编写服务器和客户端案例。
js异步就是单线程异步轮询的