Skip to content

Commit

Permalink
删除tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyu mao committed Jul 23, 2023
1 parent d4d79da commit 3ee15ce
Show file tree
Hide file tree
Showing 81 changed files with 256 additions and 8,095 deletions.
20 changes: 15 additions & 5 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (

// AppInfo is application context value.
type AppInfo interface {
ID() string
Name() string
Version() string
Metadata() map[string]string
Endpoint() []string
ID() string // app id
Name() string // 应用名
Version() string // 应用版本
Metadata() map[string]string // meta信息
Endpoint() []string // 暴露端点
}

type App struct {
Expand Down Expand Up @@ -83,12 +83,16 @@ func (a *App) Run() error {
server := srv
eg.Go(func() error {
<-ctx.Done() // wait for stop signal , 收到信号后走Stop方法,然后把app的context cancel掉,那么它的子context就会关闭。
// 现在是收到信号后,里面进行关闭 todo 确认是否能这样。
stopCtx, cancel := context.WithTimeout(NewContext(a.opts.ctx, a), a.opts.stopTimeout)
defer cancel()
return server.Stop(stopCtx) //执行server的stop 下面的start 方法会停止阻塞。
})
wg.Add(1)

// 在go程中异步start
eg.Go(func() error {
// 服务启动 是不用一定一定先于注册, 注册中心自己会自己进行探测
wg.Done() // here is to ensure server start has begun running before register, so defer is not needed
return server.Start(sctx)
})
Expand All @@ -98,10 +102,13 @@ func (a *App) Run() error {
if a.opts.registrar != nil {
rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
defer rcancel()
// 进行注册
if err = a.opts.registrar.Register(rctx, instance); err != nil {
return err
}
}

// after start 钩子函数
for _, fn := range a.opts.afterStart {
if err = fn(sctx); err != nil {
return err
Expand All @@ -118,6 +125,8 @@ func (a *App) Run() error {
return a.Stop()
}
})

// 阻塞等待 直到有错误发生。 非canceld的 则是错的
if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
Expand All @@ -133,6 +142,7 @@ func (a *App) Stop() (err error) {
for _, fn := range a.opts.beforeStop {
err = fn(sctx)
}
// 启动和关闭其实是异步的 ,加锁比较好。
a.locker.Lock()
instance := a.instance
a.locker.Unlock()
Expand Down
16 changes: 8 additions & 8 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ type options struct {
name string
version string
metadata map[string]string
endpoints []*url.URL
endpoints []*url.URL // 暴露的地址

ctx context.Context
sigs []os.Signal
ctx context.Context // 上下文,可以传入进来,一般是不需要的。应该就是background context 出发的
sigs []os.Signal // 注册信号

logger log.Logger
registrar registry.Registrar
registrarTimeout time.Duration
stopTimeout time.Duration
servers []transport.Server
registrar registry.Registrar // 服务注册
registrarTimeout time.Duration // 服务注册超时
stopTimeout time.Duration // 停止超时时间,可以给很多的值,看情况自己顶
servers []transport.Server // 有哪些server

// Before and After funcs
// Before and After funcs 钩子函数
beforeStart []func(context.Context) error
beforeStop []func(context.Context) error
afterStart []func(context.Context) error
Expand Down
19 changes: 10 additions & 9 deletions transport/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"github.com/cr-mao/lori/metric"
"github.com/cr-mao/lori/registry"
"github.com/cr-mao/lori/transport/grpc/resolver/discovery"
"google.golang.org/grpc/credentials"

grpcinsecure "google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -143,15 +144,15 @@ func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.Clien
grpc.WithChainStreamInterceptor(streamInts...),
}

//TODO 服务发现的选项
//if options.discovery != nil {
// grpcOpts = append(grpcOpts, grpc.WithResolvers(
// discovery.NewBuilder(
// options.discovery,
// discovery.WithInsecure(insecure),
// ),
// ))
//}
// 服务发现的选项
if options.discovery != nil {
grpcOpts = append(grpcOpts, grpc.WithResolvers(
discovery.NewBuilder(
options.discovery,
discovery.WithInsecure(insecure),
),
))
}
if options.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf)))
}
Expand Down
85 changes: 85 additions & 0 deletions transport/grpc/resolver/discovery/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package discovery

import (
"context"
"errors"
"strings"
"time"

"github.com/cr-mao/lori/registry"
"google.golang.org/grpc/resolver"
)

const name = "discovery"

// Option is builder option.
type Option func(o *builder)

// WithTimeout with timeout option.
func WithTimeout(timeout time.Duration) Option {
return func(b *builder) {
b.timeout = timeout
}
}

// WithInsecure with isSecure option.
func WithInsecure(insecure bool) Option {
return func(b *builder) {
b.insecure = insecure
}
}

type builder struct {
discoverer registry.Discovery
timeout time.Duration
insecure bool
}

// NewBuilder creates a builder which is used to factory registry resolvers.
func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder {
b := &builder{
discoverer: d,
timeout: time.Second * 10,
insecure: false,
}
for _, o := range opts {
o(b)
}
return b
}

func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
var (
err error
w registry.Watcher
)
done := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
w, err = b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/"))
close(done)
}()
select {
case <-done:
case <-time.After(b.timeout):
err = errors.New("discovery create watcher overtime")
}
if err != nil {
cancel()
return nil, err
}
r := &discoveryResolver{
w: w,
cc: cc,
ctx: ctx,
cancel: cancel,
insecure: b.insecure,
}
go r.watch()
return r, nil
}

// Scheme return scheme of discovery
func (*builder) Scheme() string {
return name
}
138 changes: 138 additions & 0 deletions transport/grpc/resolver/discovery/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package discovery

import (
"context"
"encoding/json"
"errors"
"net/url"
"strconv"
"time"

"github.com/cr-mao/lori/log"
"github.com/cr-mao/lori/registry"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)

type discoveryResolver struct {
w registry.Watcher
cc resolver.ClientConn

ctx context.Context
cancel context.CancelFunc

insecure bool
}

func (r *discoveryResolver) watch() {
for {
select {
case <-r.ctx.Done():
return
default:
}
ins, err := r.w.Next()
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Errorf("[resolver] Failed to watch discovery endpoint: %v", err)
time.Sleep(time.Second)
continue
}
r.update(ins)
}
}

func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
addrs := make([]resolver.Address, 0)
endpoints := make(map[string]struct{})
for _, in := range ins {
endpoint, err := ParseEndpoint(in.Endpoints, "grpc", !r.insecure)
if err != nil {
log.Errorf("[resolver] Failed to parse discovery endpoint: %v", err)
continue
}
if endpoint == "" {
continue
}
// filter redundant endpoints
if _, ok := endpoints[endpoint]; ok {
continue
}
endpoints[endpoint] = struct{}{}
addr := resolver.Address{
ServerName: in.Name,
Attributes: parseAttributes(in.Metadata),
Addr: endpoint,
}
addr.Attributes = addr.Attributes.WithValue("rawServiceInstance", in)
addrs = append(addrs, addr)
}
if len(addrs) == 0 {
log.Warnf("[resolver] Zero endpoint found,refused to write, instances: %v", ins)
return
}
err := r.cc.UpdateState(resolver.State{Addresses: addrs})
if err != nil {
log.Errorf("[resolver] failed to update state: %s", err)
}
b, _ := json.Marshal(ins)
log.Infof("[resolver] update instances: %s", b)
}

func (r *discoveryResolver) Close() {
r.cancel()
err := r.w.Stop()
if err != nil {
log.Errorf("[resolver] failed to watch top: %s", err)
}
}

func (r *discoveryResolver) ResolveNow(options resolver.ResolveNowOptions) {}

func parseAttributes(md map[string]string) *attributes.Attributes {
var a *attributes.Attributes
for k, v := range md {
if a == nil {
a = attributes.New(k, v)
} else {
a = a.WithValue(k, v)
}
}
return a
}

// NewEndpoint new an Endpoint URL.
func NewEndpoint(scheme, host string, isSecure bool) *url.URL {
var query string
if isSecure {
query = "isSecure=true"
}
return &url.URL{Scheme: scheme, Host: host, RawQuery: query}
}

// ParseEndpoint parses an Endpoint URL.
func ParseEndpoint(endpoints []string, scheme string, isSecure bool) (string, error) {
for _, e := range endpoints {
u, err := url.Parse(e)
if err != nil {
return "", err
}
if u.Scheme == scheme {
if IsSecure(u) == isSecure {
return u.Host, nil
}
}
}
return "", nil
}

// IsSecure parses isSecure for Endpoint URL.
func IsSecure(u *url.URL) bool {
ok, err := strconv.ParseBool(u.Query().Get("isSecure"))
if err != nil {
return false
}
return ok
}
14 changes: 0 additions & 14 deletions transport/network/README.md

This file was deleted.

12 changes: 0 additions & 12 deletions transport/network/client.go

This file was deleted.

Loading

0 comments on commit 3ee15ce

Please sign in to comment.