Go BIO/NIO探讨(2):net库对socket/bind/listen/accept的封装
2023-1-14 22:1:13 Author: Go语言中文网(查看原文) 阅读量:35 收藏

前面一篇文章提到,Go内置的 net/http中使用了Blocking IO,主要体现在两层 for 循环。但真的是这样吗?

本文我们看看 Go net库中 Server.ListenAndServe 的实现细节。

net.Listen("tcp", addr) 方法通过系统调用 socket、bind、listen 生成 net.Listener 对象,在后面的for 循环中,通过系统调用 accept 等待新的tcp conn,将其包装成一个 conn 对象,在新的 goroutine 中对这个conn进行处理。这里是典型的 per goroutine per connection 模型。这个环节看起来是阻塞的,但创建 socket 时设置了syscall.SOCK_NONBLOCK,对后来有什么影响?

// net/http/server.go struct Server
func (srv *Server) ListenAndServe() error {
ln, err := net.Listen("tcp", addr)
// ... 省略部分代码
return srv.Serve(ln)
}

func (srv *Server) Serve(l net.Listener) error {
for {
// ...
rw, err := l.Accept()
// ...
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}

net.Listener

net.Listen 触发一系列的系统调用(主要是 socket、bind、listen),生成一个 net.Listener 对象。这个函数创建两类Listener: TCP 支持跨机器的网络通信,UNIX支持本机的多进程通信。

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// ... 省略部分代码
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
// ... 省略部分代码
}

由于两者都是先触发 syscall.Socket,我们从 socket 系统调用的视角来看两者的区别。

// https://man7.org/linux/man-pages/man2/socket.2.html
#include <sys/socket.h>
int socket(int family, int type, int protocol);

socket() 创建一个用于网络通信的endpoint,并返回对应的套接字,也叫socket file descriptor。它是一个 int 值,Linux C代码里一般用 sockfd 作为变量名,而 Go net库里一般用 fd 作为变量名。

第一个参数 family 参数用来指定通信的协议族(protocol family),常用的enum值有:

  1. AF_UNIX/AF_LOCAL: Unix域协议, 用于本机的进程间通信

  2. AF_INET: IPV4协议

  3. AF_INET6: IPV6协议

  4. AF_ROUTE: 路由套接字

  5. 全量Enum定义在Linux <sys/socket.h> 下

第二个参数 type 参数用来指定通信语义,常用enum值有:

  1. SOCK_STREAM=1: 基于TCP, 提供有序、可靠、双向、基于连接的字节流,不限制消息长度,支持消息的优先级传输;

  2. SOCK_DGRAM=2: 基于UDP, 支持数据报,不是基于连接的、不保证可靠性,且消息的最大长度是固定的;

  3. SOCK_RAW=3: 支持通过原始的网络协议访问

  4. SOCK_RDM=4:

  5. SOCK_SEQPACKET=5: 基于TCP, 提供有序、可靠、双向、基于连接的字节流,但消息的最大长度是固定的,超出的部分会被丢弃;

除了这几个,还有两个enum值在 Go net/http 被用到了,分别是:

  1. SOCK_NONBLOCK: 设置 accept 和 read/write操作为 O_NONBLOCK, 对应的场景有:

  • 接收连接 accept: 同步模式下没有新连接时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误

  • read类操作: 同步模式下socket缓冲区没有数据可读时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误

  • write类操作: 同步模式下socket缓冲区已满无法写入时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误

  1. SOCK_CLOEXEC: 由于fork时,子进程默认拷贝父进程的数据空间、堆、栈等,当然也包含socket, 通过设置这个flag, 可以保证fork出来的子进程不持有父进程创建的socket.

第三个参数 protocol 指定通信协议,对于domain=AF_INET/AF_INET6来说,常见的enum值有 IPPROTO_TCP IPPROTO_UDP,全量.

socket() 返回一个 socket file descriptor,但并没有协议和地址与其关联。对于tcp client端而言,可以由系统随机指定一个端口;对于一个 tcp server 而言,必须设置一个公开可访问的ip地址和端口。bind函数实现了这个功能:

#include <sys/socket.h>

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

// sockaddr 包含
struct sockaddr {
sa_family_t sa_family;
char sa_data[14];
}

其中 sockfd 参数是 socket函数的返回值,后面两个参数指定协议类型和地址。

当socket被创建以后,它并不能被动地接收创建连接请求,此时它只能作为一个client使用。要被动地接收请求,转化为server,需要依赖 listen函数。该函数调用以后,sockfd的状态会从 closed 转换为 listen (netstat 命令可以进行查看)。listen函数的声明如下:

#include <sys/socket.h>

int listen(int sockfd, int backlog);

第一个参数 sockfd 是 socket函数的返回值,第二个参数指定了处于ESTABLISHED状态的sockets的队列大小(从Linux 2.2起), 而不是处于 SYNC_RCVD状态的sockets队列的大小。这里提到的两个状态在TCP连接的三次握手中有所定义:

backlog 默认值是0x80即128,通常可以配置在文件 /proc/sys/net/ipv4/tcp_max_syn_backlog 中,同时受到/proc/sys/net/core/somaxconn 的限制。在 Go 中,这个参数可以通过 func maxListenerBacklog() int 获取。如果队列满了,Client端会收到 ECONNREFUSED 错误,即 connection refused。

小结一下,Linux操作系统层面创建一个tcp server,走的逻辑是:

  1. socket函数创建一个套接字 sockfd,默认状态是 Closed

  2. bind函数绑定 sockfd 与特定的协议地址,比如 tcp 0.0.0.0:8080

  3. listen函数修改sockfd的状态为 LISTEN,内核开始监听套接字,三次握手建立连接

回到 Go net 库的处理流程,我们关注的函数是 sysListener.listenTCP:

// net/tcpsock_posix.go struct sysListener
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

对于一个 tcp server,实例 sl 可以被这样赋值,系统调用被封装在函数 internetSocket 里:

sl := &sysListener{
ListenConfig: *lc, // lc 是默认值
network: network, // string "tcp"
address: address, // string "0.0.0.0:8080"
}

对于一个 tcp server,函数 internetSocket 接收到的参数可以是:

func internetSocket(
ctx context.Context, // context.Background()
net string, // "tcp"
laddr sockaddr, // &TCPAddr{IP:"0.0.0.0",Port:8080,Zone:"不知道是啥"}, DNS服务读取的地址
raddr sockaddr, // nil, os=aix|windows|openbsd && mode="dail" 才需要
sotype int, // syscall.SOCK_STREAM
proto int, // 0
mode string, // "listen"
ctrlFn func(string, string, syscall.RawConn) error // sl.ListenConfig.Control
) (fd *netFD, err error) {

函数 internetSocket 同样只是一层封装,内部调用的是函数 socket。函数socket内部按照顺序调用了socket/bind/listen,返回一个套接字,这个套接字使用network poller,支持异步IO。函数 socket的主要逻辑如下:

// 通过sysSocket执行socket系统调用
// 返回一个套接字s
s, err := sysSocket(family, sotype, proto)

// 封装int类型的套接字为一个结构体
if fd, err = newFD(s, family, sotype, net); err != nil {
// ...省略部分代码

// 对于 SOCK_STREAM, SOCK_SEQPACKET类型,调用bind和listen
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil

对于linux tcp server 而言,函数 sysSocket 的关键只有一行代码:

s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)

SOCK_STREAM、SOCK_NONBLOCK 和 SOCK_CLOEXEC 的语义前面已经讲过,不再赘述。

socketFunc 定义存放在 net/hook_unix.go 里,与listenFunc在一块:

// Placeholders for socket system calls.
socketFunc func(int, int, int) (int, error) = syscall.Socket
connectFunc func(int, syscall.Sockaddr) error = syscall.Connect
listenFunc func(int, int) error = syscall.Listen
getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt

通过sysSocket拿到套接字以后,通过函数newFD将其封装成一个结构体,类型是 *net.netFD:

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}

其中,结构体内部poll.FD定义了读写的逻辑,它封装了6个系统调用:

readSyscallName     = "read"
readFromSyscallName = "recvfrom"
readMsgSyscallName = "recvmsg"
writeSyscallName = "write"
writeToSyscallName = "sendto"
writeMsgSyscallName = "sendmsg"

在创建套接字时,已经设置了 SOCK_NONBLOCK flag,如果没有可用的连接,读写数据时,会收到 EWOULDBLOCK/EAGAIN 错误。Go net库的处理是等待一段时间,我们看其中一个例子:

// ReadMsgInet4 is ReadMsg, but specialized for syscall.SockaddrInet4.
func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.SockaddrInet4) (int, int, int, error) {
if err := fd.readLock(); err != nil {
return 0, 0, 0, err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, 0, 0, err
}
for {
n, oobn, sysflags, err := unix.RecvmsgInet4(fd.Sysfd, p, oob, flags, sa4)
if err != nil {
if err == syscall.EINTR {
continue
}
// TODO(dfc) should n and oobn be set to 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, oobn, sysflags, err
}
}

回到 func (sl *sysListener) listenTCP 方法,函数 internetSocket 返回一个套接字结构体的实例,用来构建 TCPListener 对象 &TCPListener{fd: fd, lc: sl.ListenConfig}。后面 accept tcp conn 时,会用到 net.netFD 的 accept 方法,后者只是封装了 poll.DF 的 Accept 方法。

回到 net/http 下的 struct Server 的 ListenAndServe 方法,它包含两步:

  1. net.Listen 方法获取 ln *TCPListener

  2. srv.Serve(ln)

前面详细说明了第一步的细节,后面我们看第二步如何Serve。

TCPListenr.Accept

对于 linux下的 tcp server,系统调用 accept 发生在 socket、bind、listen 之后,它从内核中的 ESTABLISHED 队列中获取一个建立完成的链接。通过函数socket生成的套接字sockfd可以是阻塞或非阻塞(NONBLOCK),它的函数声明如下:

#include <sys/socket.h>

int accept(int sockfd, struct sockaddr *restrict addr,
socklen_t *restrict addrlen);

#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <sys/socket.h>

int accept4(int sockfd, struct sockaddr *restrict addr,
socklen_t *restrict addrlen, int flags);

对于阻塞/非阻塞的套接字, accept 的表现并不相同:

  1. 阻塞的sockfd: 调用方会一直被阻塞,直到有一个ESTABLISHED的tcp conn

  2. 非阻塞的sockfd: 函数accept会返回 EAGAIN 或 EWOULDBLOCK 的错误

Go net库使用的是非阻塞的套接字,我们看这部分代码的逻辑:

// net/net.go struct Server
func (srv *Server) Serve(l net.Listener) error {
for {
rw, err := l.Accept()
// ... 省略部分代码

这里 ln 的类型是 *TCPListener, 其方法Accept的定义如下:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

struct TCPListener 的结构如下, accept 依赖成员变量 fd *net.netFD,它通过 pdf poll.FD 的 Accept 方法获取client端的套接字,并封装成一个 net.netFD 对象:

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}

func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
// ... 省略部分代码

if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

继续看 poll.FD 的方法 Accept。它内部是一个 for 循环,先尝试通过系统调用accpt4 获取一个套接字,结果会有下面几种情况:

  1. 获取成功, err == nil, 函数直接return即可

  2. syscall.EINTR 表示系统调用期间收到操作系统的信号,但并没有实质的错误发生,所以选择重试

  3. syscall.EAGAIN 表示目前并没有establish新的tcp conn,处理是通过 waitRead 将当前goroutine挂起,等待被唤醒

  4. syscall.ECONNABORTED 表示远程连接已经在ESTABLISHED队列,还没有被Accept时,client端放弃连接

  5. 其他错误: 返回一个错误

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}

// 代码路径: internal/poll/sock_cloexec.go

// Wrapper around the accept system call that marks the returned file
// descriptor as nonblocking and close-on-exec.
func accept(s int) (int, syscall.Sockaddr, string, error) {
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// ... 省略部分代码

另外可以看到,accept4 系统调用时,传入了 SOCK_NONBLOCK 和 SOCK_CLOEXEC 两个 flag,socket 系统调用也使用了这两个 flag。

通过 ln.Accept 获取到ESTABLISHED连接的套接字以后,就可以对远端的client进行服务了。

在本文中,总共有两类套接字(socket):

  1. server端监听的套接字, 通过socket系统调用创建。它的生命周期和server同样长

  2. 已连接的套接字, 通过accept系统调用创建。它的生命周期比较短,尤其是对于应用层是HTTP短链接的情况

在第一篇文章"Go BIO/NIO探讨(1):Gin框架中如何处理HTTP请求"中,我们提到了两层 for 循环,本文只是讲了第一层。从阻塞、非阻塞的角度来看,TCPListener.Accept 方法看起来是block的实现,但底层的套接字和系统调用设置了 NONBLOCK flag,可以说是基于 NONBLOCK 的方式实现的。单纯从网络的视角看,这称得上是 Non-blocking IO 了。


推荐阅读

福利
我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。


文章来源: http://mp.weixin.qq.com/s?__biz=MzAxMTA4Njc0OQ==&mid=2651453975&idx=1&sn=7471cbf3a5510be90815dfffa28ac86a&chksm=80bb24e5b7ccadf3633dc31621ea538f7ce89f51627796f3374d166dec36adae1568d7ebe83b#rd
如有侵权请联系:admin#unsafe.sh