Skip to content

Commit

Permalink
add examples
Browse files Browse the repository at this point in the history
add examples
  • Loading branch information
werbenhu committed Apr 7, 2023
1 parent 1329250 commit 1a9bb83
Show file tree
Hide file tree
Showing 16 changed files with 409 additions and 113 deletions.
65 changes: 64 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,65 @@
# srouter
A consistent hash routing server that supports automatic discovery.
一个用纯Go语言写的一致性哈希路由服务器,支持自动发现。

#### 什么是一致性哈希

> 百度百科:一种特殊的哈希算法,目的是解决分布式缓存的问题。在移除或者添加一个服务器时,能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系。一致性哈希解决了简单哈希算法在分布式哈希表中存在的动态伸缩等问题。
假设有1000万个用户,100个服务器node,请设计一种算法合理地将用户分配到这些服务器上。普通的哈希算法是将1000万个用户各自的userid计算出hash值,取余100,然后选择对应编号的服务器。由于该算法使用节点数取余的方法,强依赖node的数目,当node数发生变化的时候,比如100个服务器有5个宕机了,现在只剩95个。这时候每个用户都需要重新分配服务器,1000万个用户的id计算出hash值,取余95,大部分用户所属的服务都要变更。

> 一致性哈希主要解决的问题就是:当node数发生变化时,能够尽量少的移动数据。
#### 如何运行

##### 编译路由服务
```sh
cd cmd
go build -o srouter.exe
```

##### 启动路由服务器
``` sh
# 这里演示启动2个,启动数量可以自己根据实际情况定
# 启动第1个
./srouter.exe -addr=":7370" `
-advertise="172.16.3.3:7370" `
-id=router-service-1 `
-api-addr=":9000" `
-service="172.16.3.3:9000"

# 启动第2个
# 第2个多一个参数-routers="172.16.3.3:7370"
# 这里是需要将第2个注册到第1个去
./srouter.exe -addr=":7371" `
-advertise="172.16.3.3:7371" `
-id=router-service-2 `
-routers="172.16.3.3:7370" `
-api-addr=":9001" `
-service="172.16.3.3:9001"
```
##### 注册2个web服务
```sh
# 注册第1个web服务,
# 注册的服务组是webservice-group
# 注册的服务ID是webserver1
# 注册的服务地址是172.16.3.3:8000
cd examples/service1
go build -o webservice1.exe webservice1.go
./webservice1.exe
# 注册第2个web服务
# 注册的服务组是webservice-group
# 注册的服务ID是webserver2
# 注册的服务地址是172.16.3.3:8001
cd examples/service2
go build -o webservice2.exe webservice2.go
./webservice2.exe
```

##### 客户端使用一致性哈希选择服务
```sh
cd examples/client
go build -o client.exe main.go
./client.exe
```

5 changes: 5 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 werbenhu
// SPDX-FileContributor: werbenhu

package srouter

// 路由服务器提供查询服务的接口
type Api interface {
Start(addr string) error
Stop()
Expand Down
64 changes: 0 additions & 64 deletions client/client.go

This file was deleted.

27 changes: 0 additions & 27 deletions client/register.go

This file was deleted.

82 changes: 82 additions & 0 deletions client/rpcclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 werbenhu
// SPDX-FileContributor: werbenhu
package client

import (
"context"
"time"

"github.com/werbenhu/srouter"
"google.golang.org/grpc"
)

// Grpc客户端的封装,方便客户端做一致性哈希查询
// 客户端如果需要查询单个key的所在的服务,直接使用此对象就行
type RpcClient struct {
// 路由服务器的Grpc地址
Addr string

// grpc客户端连接对象
conn *grpc.ClientConn

// grpc客户端对象
router srouter.RouterClient
}

// 创建一个Grpc客户端对象
// 该对象会连接路由服务器的grpc服务端
// 如果连接失败,会返回错误
func NewRpcClient(router string) (*RpcClient, error) {
client := &RpcClient{Addr: router}
// 连接到grpc服务端的地址
conn, err := grpc.Dial(client.Addr, grpc.WithInsecure())
if err != nil {
return nil, err
}

client.conn = conn
client.router = srouter.NewRouterClient(conn)
return client, nil
}

// 关闭grpc客户端连接对象
func (c *RpcClient) Close() {
c.conn.Close()
}

// 匹配某个key对应的一致性哈希服务
// group是组名,比如有3个mysql服务器同属一个db组,3个web服务器同属一个web组
func (c *RpcClient) Match(group string, key string) (*srouter.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

service, err := c.router.Match(ctx, &srouter.MatchRequest{
Group: group,
Key: key,
})
if err != nil {
return nil, err
}
//服务包含3个属性,服务ID、服务所属的组名以及服务的地址
return srouter.NewService(service.Id, service.Group, service.Addr), nil
}

// 列出某个组里所有的服务
func (c *RpcClient) Members(group string) ([]*srouter.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

services := make([]*srouter.Service, 0)
members, err := c.router.Members(ctx, &srouter.MembersRequest{
Group: group,
})
if err != nil {
return services, err
}

for _, member := range members.Services {
services = append(services, srouter.NewService(member.Id, member.Group, member.Addr))
}
return services, nil
}
21 changes: 14 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 werbenhu
// SPDX-FileContributor: werbenhu
package main

import (
Expand All @@ -11,14 +14,18 @@ import (
)

func main() {
id := flag.String("id", "node1", "")
addr := flag.String("addr", ":7370", "")
advertise := flag.String("advertise", ":7370", "")
routers := flag.String("routers", "", "")
apiAddr := flag.String("api-addr", ":8080", "")
apiAdvertise := flag.String("api-advertise", "", "")
id := flag.String("id", "", "服务ID,不能为空")
addr := flag.String("addr", ":7370", "服务发现通信的地址")
advertise := flag.String("advertise", ":7370", "对外公布的服务发现通信的地址")
routers := flag.String("routers", "", "路由服务器地址,如果是第一个可以为空,多个用逗号隔开")
apiAddr := flag.String("api-addr", ":8080", "查询服务器的地址")
service := flag.String("service", "", "对外公布的查询服务器的地址")

flag.Parse()
if *id == "" {
log.Fatal(srouter.ErrMemberIdEmpty)
}

sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -33,7 +40,7 @@ func main() {
srouter.OptAdvertise(*advertise),
srouter.OptRouters(*routers),
srouter.OptApiAddr(*apiAddr),
srouter.OptService(*apiAdvertise),
srouter.OptService(*service),
})

err := router.Serve()
Expand Down
20 changes: 20 additions & 0 deletions discovery.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 werbenhu
// SPDX-FileContributor: werbenhu

package srouter

// 自动发现事件通知接口
type Handler interface {
// 当有新服务注册过来的时候会触发
OnMemberJoin(*Member) error

// 当有新服务离开的时候会触发
OnMemberLeave(*Member) error

// 当有新服务更新的时候会触发
OnMemberUpdate(*Member) error
}

// 自动发现接口
type Discovery interface {
// 设置服务发现事件处理接口
SetHandler(Handler)

//获取所有服务列表
Members() []*Member

//获取当前自身服务
LocalMember() *Member

//启动发现服务
Start() error

//停止服务
Stop()
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (e err) Error() string {
}

var (
ErrMemberIdEmpty = err{Code: 10000, Msg: "id can't be empty"}
ErrReplicasParam = err{Code: 10000, Msg: "member replicas param error"}
ErrGroupNameEmpty = err{Code: 10001, Msg: "member group name empty"}
)
38 changes: 38 additions & 0 deletions examples/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"log"

"github.com/werbenhu/srouter/client"
)

func main() {
// 路由服务器中任意选择一个都可以
routerService := "172.16.3.3:9001"
group := "webservice-group"

client, err := client.NewRpcClient(routerService)
if err != nil {
panic(err)
}

// 根据用户ID使用一致性哈希分配服务
for i := 0; i < 1000; i++ {
key := fmt.Sprintf("user-id-%d", i)
service, err := client.Match(group, key)

if err != nil {
log.Printf("[ERROR] match key%s err:%s\n", key, err)
continue
}
log.Printf("[INFO] match key:%s, serviceId:%s, serviceAddr:%s\n", key, service.Id, service.Addr)
}

// 获取webservice-group组所有的服务
allService, err := client.Members(group)
if err != nil {
log.Printf("[ERROR] get all service err:%s\n", err)
}
log.Printf("[INFO] all service:%+v\n", allService)
}
Loading

0 comments on commit 1a9bb83

Please sign in to comment.