>

mgo 是 Go 语言用于连接 mongodb 的 开源 driver 程序。其源码共 2.6w+ 行代码,代码简洁而且质量很高,值得学习。


#### mgo 的连接模型 --- 使用 mgo 连接 mongodb 可以用下面两种方式:
1
session, err := mgo.Dial(host)

或者

1
session, err := mgo.DialWithInfo()

这个 session 会被全局使用,通常,在实际的程序中,我们会开启 goroutine 来处理每个连接,其他的 goroutine 会通过 session.Clone() 来复用这个连接,使用完成之后通过 session.Close() 来关闭这个连接。
如果并发很高,其他 goroutine 可能来不及释放 session,那么当前 goroutine 就要么等待,要么新建一个 session。


#### 测试方法 --- 在程序中开启 10000 个 goroutines, 在每个 goroutine 里面 time.Sleep(10 * time.Second),睡眠 10 秒的目的 Hold 住连接暂时不释放,这样的话,就可以看到 mgo 会不断创建新连接。 我们可以在 mongodb shell 中执行 `db.serverStatus().connections` 来查看连接数或者直接在本地客户端上执行 `netstat -natp | grep app_name | wc -l` 来查看连接数。

在 sleep 状态下由于不释放连接,测试效果如下:
100 并发: mongodb 增加 100 个连接
1w 并发: mongodb 增加 4390 - 4700 个左右 的连接。
5w 并发: 同上
10w 并发: 同上

由于 mgo 默认的 pool limit 是 4096, 在高并发情况下(超过连接池限制时),如果每个 session 不调用 close(我这里使用 sleep 的方式让其不释放连接),这样,连接数会很快就达到 4096,并堵死其他请求,所以 clone 或 copy session 时一定要使用 defer close() 把连接关闭。启用 maxPoolLimit 参数则会限制总连接大小,连接到限制则当前协程会 sleep 等待,直到可以创建连接,高并发时锁有问题,会导致多创建一些连接,我测试下来发现最多4700多个,也就是多了超出连接池限制多创建了 600 多个连接。

运行 testmgo 4096 > res.txt 创建 4096 个 goroutines, 并把真正执行连接(dial)了的信息打印, log 全部导到 res.txt 中方便分析。

$ grep "dial" res.txt | wc -l
104

$ grep "n > 0" res.txt | wc -l
3989

可以看到真正的 dial 只有 104 次, 也就是只创建了 104 个 socket 物理连接, 而有 3989 次都复用了 socket。3989 + 104 = 4093。
创建的 socket 少,跟查询语句耗时有关,如果一条查询语句耗时很短,那么连接会很快释放而被复用。相反,如果查询比较耗时,那么那个 socket 可能一直占用,那么其他请求来的时候没有可用的 socket 就会创建一个新的 socket。当创建的连接超过 pool limit 时,会返回 errPoolLimit 错误,并且 sleep 0.1s 之后重新获取/创建 socket。

代码:

1
2
3
4
5
6
7
8
9
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
if err == errPoolLimit {
if !warnedLimit {
warnedLimit = true
log("WARNING: Per-server connection limit reached.")
}
time.Sleep(100 * time.Millisecond) // 注意这里, Sleep 了
continue
}

注意:在不调用 session close 的情况下,其他的连接请求只能新建连接,


#### 整个连接流程的源码解读 ---
1
2
3
4
5
6
7
8
sess := session.Copy()   -> copySession()
c := sess.DB("test").C("person") ->
c.Find().One() ->
s.acquireSocket() ->
s.cluster.AcquireSocket() ->
server.AcquireSocket(poolLimit, socketTimeout) ->
server.Connect(timeout) ->
server.net.DialTimeout("tcp", server.ResolvedAddr, timeout)

核心函数
(1). session.DialWithInfo()
(2). cluster.newCluster()
(3). session.newSession()


###### (1). session.DialWithInfo() --- DialWithInfo establishes a new session to the cluster identified by info. Dial("localhost:27017") 内部实际上也会调用 DialWithInfo()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func DialWithInfo(info *DialInfo) (*Session, error) {
addrs := make([]string, len(info.Addrs))
for i, addr := range info.Addrs {
p := strings.LastIndexAny(addr, "]:")
if p == -1 || addr[p] != ':' {
// XXX This is untested. The test suite doesn't use the standard port.
addr += ":27017"
}
addrs[i] = addr
}
cluster := newCluster(addrs, info.Direct, info.FailFast, dialer{info.Dial, info.DialServer}, info.ReplicaSetName)
session := newSession(Eventual, cluster, info.Timeout) // 注意: 初始化时, consistency 是 Eventual
session.defaultdb = info.Database
...
if info.PoolLimit > 0 { // info.PoolLimit > 0 说明传进来的 info对象设置了 PoolLimit, 否则默认为0
session.poolLimit = info.PoolLimit // 只要用户自定义了 PoolLimit 的值, 那么就使用该值, 否则使用默认的 4096
}
cluster.Release() // 注意: 这里 cluster 就 Release 了, ref-1
// People get confused when we return a session that is not actually established to any servers yet (e.g. what if url was wrong).
// So, ping the server to ensure there's someone there, and abort if it fails.
if err := session.Ping(); err != nil {
session.Close()
return nil, err
}
session.SetMode(Strong, true) // 完事之后, 把 consistency 设置成了 Strong
return session, nil
}

##### (2). cluster.newCluster() ---
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string) *mongoCluster {
cluster := &mongoCluster{
userSeeds: userSeeds, // seed server, 用于 查找 topology
references: 1, // 这里要注意:创建时 cluster.reference 就是 1, 在 cluster.syncServersLoop() 里面又会 +1
direct: direct, 所以,一旦 cluster 创建了, 那么 ref 就至少是 1
failFast: failFast,
dial: dial,
setName: setName,
}
cluster.serverSynced.L = cluster.RWMutex.RLocker()
cluster.sync = make(chan bool, 1)
stats.cluster(+1)
go cluster.syncServersLoop() // 这里 sync servers, 会发现 topology 中所有的 servers
return cluster
}

##### (3). session.newSession() ---

创建新 Session, 函数原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
func newSession(consistency Mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
cluster.Acquire()
session = &Session{
cluster_: cluster,
syncTimeout: timeout,
sockTimeout: timeout,
poolLimit: 4096,
}
debugf("New session %p on cluster %p", session, cluster)
session.SetMode(consistency, true) session.SetSafe(&Safe{})
session.queryConfig.prefetch = defaultPrefetch
return session
}

session 有两个 socket, 一个是 masterSocket, 一个是 slaveSocket, 如果 masterSocket 是 nil 的话, 那么 slaveOk 就是true。

注意: 在 newSession() 的时候,这两个 sockets 都不会初始化,所以一直会是 nil。
直到真正调用 query 去查询数据库的时候才会通过 session.acqureSocket() 去新建一个 socket!

详细代码:

1
2
3
c := session.DB("lanxin").C("user_message_to_audit_t")
var res interface{}
err = c.Find(bson.M{"audit_message_id": 99}).One(&res)

collection.Find() 函数返回一个 Query 对象,
Query 对象的 One()函数(linenumber 3032)里面调用 socket, err := session.acquireSocket(true),
这时候才真正创建一个 socket!

session 的 acquireSocket 定义在 linenumber 4360

详细代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (q *Query) One(result interface{}) (err error) {
q.m.Lock()
session := q.session
op := q.op // Copy.
q.m.Unlock()
socket, err := session.acquireSocket(true)
if err != nil {
return err
}
defer socket.Release()
op.limit = -1
session.prepareQuery(&op)
expectFindReply := prepareFindOp(socket, &op, 1)
data, err := socket.SimpleQuery(&op)
if err != nil {
return err
}
if data == nil {
return ErrNotFound
}
if expectFindReply {
var findReply struct {
Ok bool
Code int
Errmsg string
Cursor cursorData
}
err = bson.Unmarshal(data, &findReply)
if err != nil {
return err
}
if !findReply.Ok && findReply.Errmsg != "" {
return &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
}
if len(findReply.Cursor.FirstBatch) == 0 {
return ErrNotFound
}
data = findReply.Cursor.FirstBatch[0].Data
}
if result != nil {
err = bson.Unmarshal(data, result)
if err == nil {
debugf("Query %p document unmarshaled: %#v", q, result)
} else {
debugf("Query %p document unmarshaling failed: %#v", q, err)
return err
}
}
return checkQueryError(op.collection, data)
}

或者

1
err = c.Find(bson.M{"audit_message_id": 99}).All(&res)

会调用 Query 对象的 All() 函数, All() 内部会调用 q.Iter().All()
Iter() 里面会调用 socket, err := session.acquireSocket(true) 来新建一个 socket!

详细代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (q *Query) Iter() *Iter {
q.m.Lock()
session := q.session
op := q.op
prefetch := q.prefetch
limit := q.limit
q.m.Unlock()
iter := &Iter{
session: session,
prefetch: prefetch,
limit: limit,
timeout: -1,
}
iter.gotReply.L = &iter.m
iter.op.collection = op.collection
iter.op.limit = op.limit
iter.op.replyFunc = iter.replyFunc()
iter.docsToReceive++
socket, err := session.acquireSocket(true)
if err != nil {
iter.err = err
return iter
}
defer socket.Release()
session.prepareQuery(&op)
op.replyFunc = iter.op.replyFunc
if prepareFindOp(socket, &op, limit) {
iter.findCmd = true
}
iter.server = socket.Server()
err = socket.Query(&op)
if err != nil {
// Must lock as the query is already out and it may call replyFunc.
iter.m.Lock()
iter.err = err
iter.m.Unlock()
}
return iter
}

下面解读处理连接过程的代码


###### (1). session.acquireSocket() --- session.acquireSocket() 会先判断是否已经存在 masterSocket 和 slaveSocket, 如果都为 nil。 那么就会调用 cluster.AcquireSocket(), session 拿到新创建的 socket 之后, 调用 setSocket(socket) -> socket.Acquire() 返回 serverInfo, 通过 serverInfo.Master 来判断是 master 还是 slave, 然后相应的设置 s.masterSocket 和 s.slaveSocket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
// Read-only lock to check for previously reserved socket.
s.m.RLock()
// If there is a slave socket reserved and its use is acceptable, take it as long
// as there isn't a master socket which would be preferred by the read preference mode.
if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
socket := s.slaveSocket
socket.Acquire()
s.m.RUnlock()
return socket, nil
}
if s.masterSocket != nil {
socket := s.masterSocket
socket.Acquire()
s.m.RUnlock()
return socket, nil
}
s.m.RUnlock()
// No go. We may have to request a new socket and change the session,
// so try again but with an exclusive lock now.
s.m.Lock()
defer s.m.Unlock()
if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) {
s.slaveSocket.Acquire()
return s.slaveSocket, nil
}
if s.masterSocket != nil {
s.masterSocket.Acquire()
return s.masterSocket, nil
}
// Still not good. We need a new socket.
sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
if err != nil {
return nil, err
}
// Authenticate the new socket.
if err = s.socketLogin(sock); err != nil {
sock.Release()
return nil, err
}
// Keep track of the new socket, if necessary.
// Note that, as a special case, if the Eventual session was
// not refreshed (s.slaveSocket != nil), it means the developer
// asked to preserve an existing reserved socket, so we'll
// keep a master one around too before a Refresh happens.
if s.consistency != Eventual || s.slaveSocket != nil {
s.setSocket(sock)
}
// Switch over a Monotonic session to the master.
if !slaveOk && s.consistency == Monotonic {
s.slaveOk = false
}
return sock, nil
}

func (s *Session) setSocket(socket *mongoSocket) {
info := socket.Acquire()
if info.Master {
if s.masterSocket != nil {
panic("setSocket(master) with existing master socket reserved")
}
s.masterSocket = socket
} else {
if s.slaveSocket != nil {
panic("setSocket(slave) with existing slave socket reserved")
}
s.slaveSocket = socket
}
}

###### (2). cluster.AcquireSocket() ---
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
var started time.Time
var syncCount uint
warnedLimit := false
for {
cluster.RLock()
for {
mastersLen := cluster.masters.Len()
slavesLen := cluster.servers.Len() - mastersLen
debugf("Cluster has %d known masters and %d known slaves.", mastersLen, slavesLen)
if mastersLen > 0 && !(slaveOk && mode == Secondary) || slavesLen > 0 && slaveOk {
break
}
if mastersLen > 0 && mode == Secondary && cluster.masters.HasMongos() {
break
}
if started.IsZero() {
// Initialize after fast path above.
started = time.Now()
syncCount = cluster.syncCount
} else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.failFast && cluster.syncCount != syncCount {
cluster.RUnlock()
return nil, errors.New("no reachable servers")
}
log("Waiting for servers to synchronize...")
cluster.syncServers()
// Remember: this will release and reacquire the lock.
cluster.serverSynced.Wait()
}
var server *mongoServer // 这里的 server 会取一个 Best Fit 的!
if slaveOk {
server = cluster.servers.BestFit(mode, serverTags)
} else {
server = cluster.masters.BestFit(mode, nil)
}
cluster.RUnlock()
if server == nil {
// Must have failed the requested tags. Sleep to avoid spinning.
time.Sleep(1e8)
continue
}
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
if err == errPoolLimit {
if !warnedLimit {
warnedLimit = true
log("WARNING: Per-server connection limit reached.")
}
time.Sleep(100 * time.Millisecond)
continue
}
if err != nil {
cluster.removeServer(server)
cluster.syncServers()
continue
}
if abended && !slaveOk {
var result isMasterResult
err := cluster.isMaster(s, &result)
if err != nil || !result.IsMaster {
logf("Cannot confirm server %s as master (%v)", server.Addr, err)
s.Release()
cluster.syncServers()
time.Sleep(100 * time.Millisecond)
continue
}
}
return s, nil
}
panic("unreached")
}

###### 3. server.AcquireSocket() ----
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// AcquireSocket returns a socket for communicating with the server.
// This will attempt to reuse an old connection, if one is available. Otherwise,
// it will establish a new one. The returned socket is owned by the call site,
// and will return to the cache when the socket has its Release method called
// the same number of times as AcquireSocket + Acquire were called for it.
// If the poolLimit argument is greater than zero and the number of sockets in
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
if server.closed {
server.Unlock()
return nil, abended, errServerClosed
}
n := len(server.unusedSockets)
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
server.Unlock()
return nil, false, errPoolLimit
}
if n > 0 {
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
server.unusedSockets = server.unusedSockets[:n-1]
info := server.info
server.Unlock()
err = socket.InitialAcquire(info, timeout)
if err != nil {
continue
}
} else {
server.Unlock()
socket, err = server.Connect(timeout)
if err == nil {
server.Lock()
// We've waited for the Connect, see if we got closed in the meantime
if server.closed {
server.Unlock()
socket.Release()
socket.Close()
return nil, abended, errServerClosed
}
server.liveSockets = append(server.liveSockets, socket) // 获得新建的 socket 之后, 就放在 server.liveSockets 列表里!
server.Unlock()
}
}
return
}
panic("unreachable")
}

###### (4). server.Connect() ----
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Connect establishes a new connection to the server. This should generally be done through server.AcquireSocket().
func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error) {
server.RLock()
master := server.info.Master
dial := server.dial
server.RUnlock()
logf("Establishing new connection to %s (timeout=%s)...", server.Addr, timeout)
var conn net.Conn
var err error
switch {
case !dial.isSet():
// Cannot do this because it lacks timeout support. :-(
//conn, err = net.DialTCP("tcp", nil, server.tcpaddr)
conn, err = net.DialTimeout("tcp", server.ResolvedAddr, timeout) // 1. 这里是真正的 dial, 建立连接!
if tcpconn, ok := conn.(*net.TCPConn); ok {
tcpconn.SetKeepAlive(true) // 2. 建立连接之后设置 keepAlive !
} else if err == nil {
panic("internal error: obtained TCP connection is not a *net.TCPConn!?")
}
case dial.old != nil:
conn, err = dial.old(server.tcpaddr)
case dial.new != nil:
conn, err = dial.new(&ServerAddr{server.Addr, server.tcpaddr})
default:
panic("dialer is set, but both dial.old and dial.new are nil")
}
if err != nil {
logf("Connection to %s failed: %v", server.Addr, err.Error())
return nil, err
}
logf("Connection to %s established.", server.Addr)
stats.conn(+1, master)
return newSocket(server, conn, timeout), nil
}

###### (5). socket.newSocket() ---- 通过 dial 建立一个 conn 之后, 需要新建一个 socket struct, 把 conn 存进去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func newSocket(server *mongoServer, conn net.Conn, timeout time.Duration) *mongoSocket {
socket := &mongoSocket{
conn: conn, // 这里把新连接的 conn 存给了 sockt 对象
addr: server.Addr,
server: server,
replyFuncs: make(map[uint32]replyFunc),
}
socket.gotNonce.L = &socket.Mutex
if err := socket.InitialAcquire(server.Info(), timeout); err != nil {
panic("newSocket: InitialAcquire returned error: " + err.Error())
}
stats.socketsAlive(+1)
debugf("Socket %p to %s: initialized", socket, socket.addr)
socket.resetNonce()
go socket.readLoop()
return socket
}

###### 6. socket.InitialAcquire() --- 在新建一个 socket (尚未有人使用, 即 ref == 0)时,或者 从 pool 里面取出一个空闲的(ref == 0)时, 需要调用这个 InitialAcquire() 把 ref + 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// InitialAcquire obtains the first reference to the socket, either right after the connection is made 
// or once a recycled socket is fetched from pool for reuse.
func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout time.Duration) error {
socket.Lock()
if socket.references > 0 {
panic("Socket acquired out of cache with references")
}
if socket.dead != nil {
dead := socket.dead
socket.Unlock()
return dead
}
socket.references++
socket.serverInfo = serverInfo
socket.timeout = timeout
stats.socketsInUse(+1)
stats.socketRefs(+1)
socket.Unlock()
return nil
}

##### 关闭连接 ---

session.Close() -> session.unsetSocket() -> socket.Release()
-> cluster.Release() -> for … range –> server.Close() ->
for … range { liveSockets, unusedSockets } -> socket.Close() -> socket.Kill()


###### (1). session.Close() ----
1
2
3
4
5
6
7
8
9
10
func (s *Session) Close() {
s.m.Lock()
if s.cluster_ != nil {
debugf("Closing session %p", s)
s.unsetSocket() // 1. ref--, logout credential
s.cluster_.Release() // 2. cluster 中所有 server 里存的 liveSockets 和 unusedSockets 全部 close()
s.cluster_ = nil
}
s.m.Unlock()
}

###### (2). session.unsetSocket() ---
1
2
3
4
5
6
7
8
9
10
func (s *Session) unsetSocket() {
if s.masterSocket != nil {
s.masterSocket.Release()
}
if s.slaveSocket != nil {
s.slaveSocket.Release()
}
s.masterSocket = nil
s.slaveSocket = nil
}

###### (3). socket.Release() --- socket.Release() 函数里面会先判断引用计数是否为0, 为0的话就close, 否则 references--
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (socket *mongoSocket) Release() {
socket.Lock()
if socket.references == 0 {
panic("socket.Release() with references == 0")
}
socket.references--
stats.socketRefs(-1)
if socket.references == 0 {
stats.socketsInUse(-1)
server := socket.server
socket.Unlock()
socket.LogoutAll()
// If the socket is dead server is nil.
if server != nil {
server.RecycleSocket(socket)
}
} else {
socket.Unlock()
}
}

###### (4). cluster.Release() ---
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (cluster *mongoCluster) Release() {
cluster.Lock()
if cluster.references == 0 {
panic("cluster.Release() with references == 0")
}
cluster.references--
debugf("Cluster %p released (refs=%d)", cluster, cluster.references)
if cluster.references == 0 {
for _, server := range cluster.servers.Slice() {
server.Close() // 注意: 在这里关闭 server 中的所有 sockets
}
// Wake up the sync loop so it can die.
cluster.syncServers()
stats.cluster(-1)
}
cluster.Unlock()
}

###### (5). server.Close() ---
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (server *mongoServer) Close() {
server.Lock()
server.closed = true
liveSockets := server.liveSockets
unusedSockets := server.unusedSockets
server.liveSockets = nil
server.unusedSockets = nil
server.Unlock()
logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
for i, s := range liveSockets {
s.Close() // 注意: 这里关闭 socket
liveSockets[i] = nil
}
for i := range unusedSockets {
unusedSockets[i] = nil
}
}

###### (6). socket.Close() ---
1
2
3
func (socket *mongoSocket) Close() {
socket.kill(errors.New("Closed explicitly"), false)
}

###### 7. sockete.kill() ----
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (socket *mongoSocket) kill(err error, abend bool) {
socket.Lock()
if socket.dead != nil {
debugf("Socket %p to %s: killed again: %s (previously: %s)", socket, socket.addr, err.Error(), socket.dead.Error())
socket.Unlock()
return
}
logf("Socket %p to %s: closing: %s (abend=%v)", socket, socket.addr, err.Error(), abend)
socket.dead = err
socket.conn.Close() // 注意: 这里是真正关闭连接!
stats.socketsAlive(-1)
replyFuncs := socket.replyFuncs
socket.replyFuncs = make(map[uint32]replyFunc)
server := socket.server
socket.server = nil
socket.gotNonce.Broadcast()
socket.Unlock()
for _, replyFunc := range replyFuncs {
logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error())
replyFunc(err, nil, -1, nil)
}
if abend {
server.AbendSocket(socket)
}
}

注意:只有两个地方会调用 conn.Close() 也就是真正的关闭一个连接。

  • 1) 一个是使用 udp trick 去获得一个 tcp 地址的时候, 如果udp 连接成功了, 就获得了 ip 地址,
    由于目的只是为了获得一个 ip, 所以获得之后就把连接关闭。
  • 2) 另外一个就是在 socket.kill() 里面 调用 conn.Close() 关闭连接。

结论:只要程序没有终止,已经获得的连接 conn 永远不会自动关闭, cluster.reference 永远至少是 1。

Socket 模块通过引用计数的方式来管理所有已连接的和未连接的 sockets, 从 pool(实际上就是一个 []*mongoSocket 数组) 里面取一个 socket,
调用 Acquire() 函数会把 referece++,

调用 Session.Copy() 时会把 Cluster.reference++, 然后判断 session.masterSocket, session.slaveSocket 是否不为nil的话,就复用这个 socket。
否则就

压力测试

$ ab -n100  -c1000 localhost/

查询连接数

$ netstat -n| awk '/^tcp/{++S[$NF]}END{for (key in S) print key,S[key]}'

建议:对于数据库的连接来说,应该通过应用程序来控制并发连接数,而不是通过 pool 大小来控制并发连接数。




参考

https://github.com/go-mgo/mgo
http://www.cnblogs.com/shenguanpu/p/5318727.html