本文介绍: 主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口也很多。另外,也要提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳(也是通过HTTP),默认周期比注册中心设置的过期时间少 1 min。首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,任何注册的服务超过 5 min,即视为不可用状态。而现在我们实现了注册中心,那这一节的服务发现就可以继承上一节的,并添加与注册中心相关的细节。

0.前言

这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体)这种就解决不了,也即是没有服务ip地址和服务实例的映射关系。

1.为什么需要注册中心

在上一节中,客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。这时可能会出问题,要是该服务实例ip改变了呢,该服务实例下线宕机了呢?这时如何是好。

// 调用单个服务实例
func clientCall(addr1, addr2 string) {
	d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()
	//省略其他......
}

这时,注册中心的重要性就出来了。

注册中心主要有三种角色:

  • 服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
  • 服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
  • 服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。

最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。

 当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcdzookeeperconsul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。

2.Gee Registry

主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口也很多。所以这里我们选择自己实现一个简单的支持心跳保活的注册中心。

GeeRegistry 的代码独立放置在子目录 registry 中。

首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,超过5min没有收到该注册的服务的心跳,即视其为不可用状态。

//registry.go
type ServerItem struct {
	Addr  string
	start time.Time //用于心跳时间计算
}

// GeeRegistry is a simple register center
type GeeRegistry struct {
	timeout time.Duration
	mutex   sync.Mutex //protcect servers
	servers map[string]*ServerItem
}

const (
	defaultPath    = "/_rpc_/registry"
	defaultTimeout = time.Minute * 5
)

func New(timeout time.Duration) *GeeRegistry {
	return &GeeRegistry{
		servers: make(map[string]*ServerItem),
		timeout: timeout,
	}
}

var DefalultGeeRegister = New(defaultTimeout)

然后,为 GeeRegistry 实现添加服务实例和返回服务列表的方法。

  • putServer:添加服务实例,如果服务已经存在,则更新 start。
  • aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
func (r *GeeRegistry) putServer(addr string) {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	s := r.servers[addr]
	if s == nil {
		r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}
	} else {
		s.start = time.Now() // if exists, update start time to keep alive
	}
}

func (r *GeeRegistry) aliveServers() []string {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	var alive []string
	for addr, s := range r.servers {
		if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {
			alive = append(alive, addr)
		} else {
			delete(r.servers, addr)
		}
	}
	sort.Strings(alive)
	return alive
}

 为了简单,那么rpc客户端通过HTTP去访问注册中心,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-rpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-rpc-Server 承载。
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	switch req.Method {
	case "GET":
		w.Header().Set("X-rpc-Servers", strings.Join(r.aliveServers(), ","))
	case "POST":
		addr := req.Header.Get("X-rpc-Servers")
		if addr == "" {
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		r.putServer(addr) //更新保存在注册中心的服务实例
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
	}
}

func (r *GeeRegistry) HandleHTTP(registryPath string) {
	http.Handle(registryPath, r)
}

func HandleHTTP() {
	DefalultGeeRegister.HandleHTTP(defaultPath)
}

另外,也要提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳(也是通过HTTP),默认周期比注册中心设置的过期时间少 1 min。

// only send once
func sendHeartbeat(registryURL, addr string) error {
	httpClient := &http.Client{Timeout: time.Second * 10}
	req, _ := http.NewRequest("POST", registryURL, nil)
	req.Header.Set("X-rpc-Servers", addr)
	resp, err := httpClient.Do(req)
	if err != nil {
		fmt.Println("rpc server: heart beat err:", err)
		return err
	}
	defer resp.Body.Close()
	return nil
}

// Heartbeat send a heartbeat message every once in a while
func Heartbeat(registryURL, addr string, duration time.Duration) {
	if duration == 0 {
		duration = defaultTimeout - time.Duration(1)*time.Minute
	}

	err := sendHeartbeat(registryURL, addr)
	go func() {
		//创建一个定时器
		t := time.NewTicker(duration)
		for err == nil {
			<-t.C
			err = sendHeartbeat(registryURL, addr)
		}
	}()
}

3.需要注册中心的服务发现

上一节我们实现了一个不需要注册中心,服务列表由手工维护的服务发现的结构体MultiServersDiscovery。

而现在我们实现了注册中心,那这一节的服务发现就可以继承上一节的,并添加与注册中心相关的细节

type GeeRegistryDiscovery struct {
	*MultiServerDiscovery
	registryAddr string
	timeout      time.Duration //服务列表的过期时间
	lastUpdate   time.Time
}

const defaultUpdateTimeout = time.Second * 10

func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {
	if timeout == 0 {
		timeout = defaultUpdateTimeout
	}
	return &GeeRegistryDiscovery{
		MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),
		registryAddr:         registerAddr,
		timeout:              timeout,
	}
}
  • GeeRegistryDiscovery 嵌套了 MultiServersDiscovery,很多能力可以复用。
  • registryAddr 即注册中心的地址
  • timeout 服务列表的过期时间
  • lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。

实现 Update 和 Refresh 方法,超时重新获取的逻辑在 Refresh 中实现: 

func (d *GeeRegistryDiscovery) Update(servers []string) error {
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
	d.servers = servers
	d.lastUpdate = time.Now()
	return nil
}

// 刷新,有了注册中心,在客户端每次获取服务实例时候,需要刷新注册中心的保存的服务实例
func (d *GeeRegistryDiscovery) Refresh() error {
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
    //注册中心保存的服务实例还没超时,不用更新
	if d.lastUpdate.Add(d.timeout).After(time.Now()) {
		return nil
	}
	httpClient := http.Client{Timeout: time.Second * 10} //http客户端最好有个超时
	resp, err := httpClient.Get(d.registryAddr)
	if err != nil {
		fmt.Println("rpc registry refresh err:", err)
		return err
	}

	defer resp.Body.Close()
	servers := strings.Split(resp.Header.Get("X-rpc-Servers"), ",")
	d.servers = make([]string, 0, len(servers))
	for _, server := range servers {
		//返回一个string类型,并将最前面和最后面的ASCII定义的空格去掉,中间的空格不会去掉
		s := strings.TrimSpace(server)
		if s != "" {
			d.servers = append(d.servers, s)
		}
	}

	d.lastUpdate = time.Now()
	return nil
}

 Get 和 GetAll 与 MultiServersDiscovery 相似,唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期

func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {
	if err := d.Refresh(); err != nil {
		return "", err
	}
	//d.Get(mode) 表示调用的是(GeeRegistryDiscovery).Get
	return d.MultiServerDiscovery.Get(mode) //d.MultiServerDiscovery是调用MultiServerDiscovery的Get()
}

func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {
	if err := d.Refresh(); err != nil {
		return nil, err
	}
	return d.MultiServerDiscovery.GetAll()
}

4.测试

添加函数 startRegistry,之后需要稍微修改 startServer,定期向注册中心发送心跳保活(Heartbeat)。

这里使用sync.WaitGroup是为了等待该操作执行完毕才会往后执行,因为这些函数都是新开协程运行。

func startServer(registryAddr string, wg *sync.WaitGroup) {
	var myServie My

	l, _ := net.Listen("tcp", "localhost:0") //端口是0表示端口随机
	server := geerpc.NewServer()
	//这里一定要用&myServie,因为前面Sum方法的接受者是*My;若接受者是My,myServie或者&myServie都可以
	server.Register(&myServie)
	registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)  //定时发送心跳
	wg.Done()
	server.Accept(l)
}

func startRegistry(wg *sync.WaitGroup) {
	l, _ := net.Listen("tcp", "localhost:9999")
	registry.HandleHTTP()
	wg.Done()
	http.Serve(l, nil)
}

接下来,将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery,不再需要硬编码服务列表。 

这里就重点对比下NewGeeRegistryDiscovery方法和之前的不同之处。

// 调用单个服务实例
func clientCall(registryAddr string) {
	// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				log.Println("call Foo.Sum error:", err)
			}
			fmt.Println("reply: ", reply)
		}(i)
	}
	wg.Wait()
}

func broadcast(registryAddr string) {
	// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("Broadcast reply: ", reply)

			ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
			defer cancel()
			var replyTimeout int = 1324
			if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("timeout Broadcast reply: ", replyTimeout)

		}(i)
	}
	wg.Wait()
}

最后是main函数。

确保注册中心启动后,再启动 RPC 服务端,最后客户端远程调用。

func main() {
	registryAddr := "http://localhost:9999/_rpc_/registry"

	var wg sync.WaitGroup
	wg.Add(1)
	go startRegistry(&wg) //开启注册中心服务
	wg.Wait()
	time.Sleep(time.Second)

	wg.Add(2)
	go startServer(registryAddr, &wg)
	go startServer(registryAddr, &wg)
	wg.Wait()

	time.Sleep(time.Second)
	clientCall(registryAddr)
	broadcast(registryAddr)
}

运行结果:

代码: https://github.com/liwook/Go-projects/tree/main/geerpc/7-registry

原文地址:https://blog.csdn.net/m0_57408211/article/details/135630144

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_64227.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注