>

Zookeeper 简介

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 Hbase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

分布式服务简介

对于一个分布式服务,最基本的一项功能就是服务的注册和发现,我们利用 zookeeper 的 EPHEMERAL 节点,就可以很方便的实现该功能。EPHEMERAL 节点是临时节点,其生命周期是和客户端会话绑定的,当会话连接断开时,节点也会被自动删除。

服务的注册与发现原理

当我们想要提供分布式服务时,我们可以把提供服务的服务器地址(IP:Port)注册到 zookeeper 上(创建临时节点),一旦服务停止,就把临时节点删除。在客户端方面,客户端通过连接到 zookeeper,获取注册了的服务器地址从而可以连接到相应的服务器,使用其提供的服务。

下边我们就来实现一个简单的分布式服务的设计

分布式服务通常由多个服务器节点提供服务,比如有三台机器 Server1, Server2, Server3。我们可以先在 zookeeper 中创建一个根节点,在三台服务器分别启动时,然后把其相应的机器IP地址挂在这个根节点下,而且,随后的增减机器都可以在这个根节点下动态的变化。


##### 服务端 Server 创建 zookeeper 连接,创建一个根节点叫 GoServers。 每当一个服务器程序启动时,在 GoServers 节点下创建一个新节点,节点名为"IP:PORT",这样就完成了一个服务的注册 。当服务结束时(服务器断开连接),创建的节点会被 zookeeper 自动删除(有延迟,心跳检测间隔),这样,Client 就不再看得到此节点,也就不会连到该节点了。
##### 客户端 Client 先从 zookeeper 获取 GoServers 节点下所有子节点,这样就拿到了所有注册了的 Server。 客户端获取所有注册了的 Server 之后,还应该对这些节点进行监听以便在其中某个节点失效后或者服务停止之后可以自动响应(也即不再像那个节点请求服务),监听的方式可以避免每次都轮询去获取服务列表。 从 Servers 列表中选中一个节点(由于同一个根节点下的 Server 提供的都是相同的服务,随机选取一个即可,实际服务一般会提供多种策略),创建连接进行通信。

实现

下面将实现一个简单的分布式服务,并展示如何进行服务的注册与发现,为了简单起见,每次 Client 连接 Server,在获取 Server 发送的时间后就断开。

主要代码如下:

utils.go: 封装了一些 zk 的常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package zkutils

import (
"fmt"
"path"
"strings"
"time"

"github.com/samuel/go-zookeeper/zk"
)

const (
timeOut = 20
)

var hosts []string = []string{"101.199.97.62:2181"}

func Connect() (conn *zk.Conn, err error) {
conn, _, err = zk.Connect(hosts, timeOut*time.Second)
if err != nil {
fmt.Println(err)
}
return
}

func RegistServer(conn *zk.Conn, host string) (err error) {
fullpath := "/go_servers/" + host
tpath := ""
for _, str := range strings.Split(fullpath, "/")[1:] {
tpath = path.Join(tpath, "/", str)
fmt.Printf("create zookeeper path: %s\n", tpath)
_, err := conn.Create(tpath, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil {
if err == zk.ErrNodeExists {
fmt.Printf("zk.create(%s) exists\n", tpath)
} else {
fmt.Printf("zk.create(%s) Errorf(%v)\n", tpath, err)
return err
}
}
}
return
}

func GetServerList(conn *zk.Conn) (list []string, err error) {
list, _, err = conn.Children("/go_servers")
return
}

server.go: 服务端程序,把自己注册到 zookeeper 上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"fmt"
"net"
"os"
"time"

"coding.net/go/basic/zkutils"
)

func startServer(host string) {
tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
fmt.Println(tcpAddr)
checkError(err)

listener, err := net.ListenTCP("tcp", tcpAddr)
checkError(err)

// Register zk node
conn, err := zkutils.Connect()
if err != nil {
fmt.Printf("connect zk error: %v\n", err)
}
defer conn.Close()

err = zkutils.RegistServer(conn, host)
if err != nil {
fmt.Printf("register node error: %v\n", err)
}

for {
conn, err := listener.Accept()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
continue
}
go handleClient(conn, host)
}

fmt.Println("done!")
}

func handleClient(conn net.Conn, host string) {
defer conn.Close()

daytime := time.Now().String()
conn.Write([]byte(host + ":" + daytime))
}

func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err)
os.Exit(1)
}
}

func main() {
go startServer("127.0.0.1:8897")
go startServer("127.0.0.1:8898")
go startServer("127.0.0.1:8899")

a := make(chan bool, 1)
<-a
}

client.go: 客户端程序,连接 zookeeper 并获取所有已注册到 zookeeper 上的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main

import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net"
"os"
"time"

"coding.net/go/basic/zkutils"
)

func getServerHost() (host string, err error) {
conn, err := zkutils.Connect()
if err != nil {
fmt.Printf(" connect zk error: %s \n ", err)
return
}
defer conn.Close()
serverList, err := zkutils.GetServerList(conn)
if err != nil {
fmt.Printf(" get server list error: %s \n", err)
return
}

count := len(serverList)
if count == 0 {
err = errors.New("server list is empty \n")
return
}

// 随机选中一个返回
r := rand.New(rand.NewSource(time.Now().UnixNano()))
host = serverList[r.Intn(3)]
return
}

func startClient() {
serverHost, err := getServerHost()
if err != nil {
fmt.Printf("get server host fail: %v \n", err)
return
}

fmt.Println("connect host: " + serverHost)
tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost)
checkError(err)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkError(err)
defer conn.Close()

_, err = conn.Write([]byte("timestamp"))
checkError(err)

result, err := ioutil.ReadAll(conn)
checkError(err)
fmt.Println(string(result))

return
}

func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}

func main() {
for i := 0; i < 100; i++ {
startClient()
time.Sleep(1 * time.Second)
}
}

全文完!