-
Notifications
You must be signed in to change notification settings - Fork 0
/
resolver.go
100 lines (88 loc) · 2.06 KB
/
resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package goutube
import (
"context"
"fmt"
"sync"
streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
type Resolver struct {
mu sync.Mutex
clientConn resolver.ClientConn
resolverConn *grpc.ClientConn
serviceConfig *serviceconfig.ParseResult
logger *zap.Logger
}
var _ resolver.Builder = (*Resolver)(nil)
const TestName = "goutube"
func (r *Resolver) Build(
target resolver.Target,
cc resolver.ClientConn,
opts resolver.BuildOptions,
) (
resolver.Resolver,
error,
) {
r.logger = zap.L().Named("resolver")
r.clientConn = cc
var dialOpts []grpc.DialOption
if opts.DialCreds != nil {
dialOpts = append(
dialOpts,
grpc.WithTransportCredentials(opts.DialCreds),
)
}
r.serviceConfig = r.clientConn.ParseServiceConfig(
fmt.Sprintf(`{"loadBalacingConfig": [{"%s": {}}]}`, TestName),
)
var err error
r.resolverConn, err = grpc.Dial(target.Endpoint, dialOpts...)
if err != nil {
return nil, err
}
r.ResolveNow(resolver.ResolveNowOptions{})
return r, nil
}
func (r *Resolver) Scheme() string {
return TestName
}
func init() {
resolver.Register(&Resolver{})
}
var _ resolver.Resolver = (*Resolver)(nil)
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
r.mu.Lock()
defer r.mu.Unlock()
client := streaming_api.NewLBResolverHelperClient(r.resolverConn)
// Get cluster and then set on cc attributed
ctx := context.Background()
res, err := client.GetServers(ctx, &streaming_api.GetServersRequest{})
if err != nil {
r.logger.Error(
"failed to resolve server",
zap.Error(err),
)
return
}
var addrs []resolver.Address
for _, server := range res.Servers {
addrs = append(addrs, resolver.Address{
Addr: server.RpcAddr,
})
}
r.clientConn.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: r.serviceConfig,
})
}
func (r *Resolver) Close() {
if err := r.resolverConn.Close(); err != nil {
r.logger.Error(
"failed to close conn",
zap.Error(err),
)
}
}