Qi
go-kit服务注册发现与负载均衡

go-kit服务注册发现与负载均衡

作者: Qi 发表时间: 2024-02-22 82

服务注册

Go-Kit框架本身不提供服务注册中心的实现,但Go-Kit提供了集成第三方服务注册中心的支持。

Consul:Consul是一种开源的服务发现和配置工具。Go-Kit提供了consul包,用于与Consul集成,支持服务注册、服务发现、健康检查等功能。
etcd:etcd是一个分布式键值存储系统,也可以用于服务注册和服务发现。Go-Kit提供了etcd包,用于与etcd集成,支持服务注册、服务发现、健康检查等功能。
ZooKeeper:ZooKeeper是一个分布式协调服务,也可以用于服务注册和服务发现。Go-Kit提供了zookeeper包,用于与ZooKeeper集成,支持服务注册、服务发现、健康检查等功能。
Eureka:Eureka是Netflix开源的服务发现框架,Go-Kit提供了eureka包,用于与Eureka集成,支持服务注册、服务发现、健康检查等功能。
Nacos:Nacos是阿里巴巴开源的服务发现和配置管理平台,Go-Kit提供了nacos包,用于与Nacos集成,支持服务注册、服务发现、健康检查等功能。

Go-Kit还提供了sd包,该包提供了一个标准的接口,可以与不同的服务注册中心进行集成。因此,如果你使用的服务注册中心不在上述列表中,你也可以使用sd包进行集成。

1) go-kit集成etcd示例

register_service 代码

package register_service

import (
	"context"
	"fmt"
	// "github.com/go-kit/kit/sd/etcd"
	"time"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/etcdv3"
	"os"
)

func RegisterETCD(etcdHost, etcdPort, svcHost, svcPort string, logger log.Logger) (registrar sd.Registrar) {

	// etcd的地址
	etcdServerAddr := []string{fmt.Sprintf("%s:%s", etcdHost, etcdPort)}

	// 服务的名称和端口
	serviceName := "my-service"

	ttl := 5 * time.Second
	options := etcdv3.ClientOptions{
		DialTimeout:   ttl,
		DialKeepAlive: ttl,
	}

	// 创建etcd客户端
	etcdClient, err := etcdv3.NewClient(context.Background(), etcdServerAddr, options)
	if err != nil {
		fmt.Printf("Failed to create etcd client: %v", err)
		os.Exit(1)
	}

	// 创建一个etcd实例
	registrar = etcdv3.NewRegistrar(etcdClient, etcdv3.Service{
		Key:   serviceName,
		Value: fmt.Sprintf("%s:%s", svcHost, svcPort),
		TTL:   etcdv3.NewTTLOption(10*time.Second, 5*time.Second),
	}, logger)

	return registrar
}

main.go

/**
 * @date: 2023/2/22
 * @desc:
 */

package main

import (
	"github.com/go-kit/kit/log/level"
	"github.com/go-kit/log"
	"google.golang.org/grpc"
	"my-kit-register-demo/endpoints"
	"my-kit-register-demo/middlewares"
	"my-kit-register-demo/proto"
	"my-kit-register-demo/register_service"
	"my-kit-register-demo/services"
	"my-kit-register-demo/transports"
	"net"
	"os"
	"os/signal"
	"syscall"
)

func main() {
	logger := log.NewJSONLogger(os.Stdout)

	listener, err := net.Listen("tcp", "0.0.0.0:8080")
	if err != nil {
		level.Error(logger).Log("listen error: ", err.Error())
		os.Exit(1)
	}

	pingService := services.NewPingService()
	pingService = middlewares.LoggingPingServiceMiddleware(logger)(pingService)
	pingEndPoints := endpoints.NewPingEndpoint(pingService)
	pingGRPCServer := transports.NewPingGRPCServer(pingEndPoints, logger)

	helloEndPoint := endpoints.NewHelloEndpoint(services.NewHelloService())
	helloGRPCServer := transports.NewHelloGRPCServer(helloEndPoint, logger)

	grpcServer := grpc.NewServer()

	proto.RegisterHelloServer(grpcServer, helloGRPCServer)
	proto.RegisterPingServer(grpcServer, pingGRPCServer)
	// r := register_service.RegisterConsul("172.24.25.123", "8500", "172.24.25.123", "8080", logger)
	r := register_service.RegisterETCD("172.24.25.123", "2379", "172.24.25.123", "8080", logger)

	go func() {
		r.Register()
		err = grpcServer.Serve(listener)
		if err != nil {
			level.Error(logger).Log("grpc error:", err)
		}

	}()

	// 处理终止信号
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
	<-signalChan

	// 取消注册服务
	r.Deregister()
}

2)go-kit集成consul示例

register_service 代码

package register_service

import (
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/consul"
	"github.com/hashicorp/consul/api"
	"os"
	"strconv"
	"time"
)

func RegisterConsole(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registrar sd.Registrar) {

	// 创建Consul客户端连接
	var client consul.Client
	{
		consulCfg := api.DefaultConfig()
		consulCfg.Address = consulHost + ":" + consulPort
		consulClient, err := api.NewClient(consulCfg)
		if err != nil {
			logger.Log("create consul client error:", err)
			os.Exit(1)
		}

		client = consul.NewClient(consulClient)
	}

	// 设置Consul对 http 服务健康检查的参数
	// check := api.AgentServiceCheck{
	// 	HTTP:     "http://" + svcHost + ":" + svcPort + "/health",  // 此接口需要自己实现
	// 	Interval: "10s",
	// 	Timeout:  "1s",
	// 	Notes:    "Consul check service health status.",
	// }
    
    // 设置Consul对 GRPC 服务健康检查的参数 
	check := api.AgentServiceCheck{
		GRPC:     svcHost + ":" + svcPort  ,  
		Interval: "5s",
		Timeout:  "1s",
		Notes:    "Consul check service health status.",
	}

	port, _ := strconv.Atoi(svcPort)

	// 设置微服务想Consul的注册信息
	reg := api.AgentServiceRegistration{
		ID:      "my-kit-demo" + time.Now().String(),
		Name:    "my-kit-demo",
		Address: svcHost,
		Port:    port,
		Tags:    []string{"my-kit", "register-demo"},
		// Check:   &check,
	}

	// 执行注册
	registrar = consul.NewRegistrar(client, &reg, logger)
	return
}

man.go

 * @date: 2023/2/22
 * @desc:
 */

package main

import (
	"github.com/go-kit/kit/log/level"
	"github.com/go-kit/log"
	"google.golang.org/grpc"
	"google.golang.org/grpc/health"
	"google.golang.org/grpc/health/grpc_health_v1"
	"my-kit-register-demo/endpoints"
	"my-kit-register-demo/middlewares"
	"my-kit-register-demo/proto"
	"my-kit-register-demo/register_service"
	"my-kit-register-demo/services"
	"my-kit-register-demo/transports"
	"net"
	"os"
	"os/signal"
	"syscall"
)

func main() {
	defer func() {
	}()
	logger := log.NewJSONLogger(os.Stdout)

	listener, err := net.Listen("tcp", "0.0.0.0:8080")
	if err != nil {
		level.Error(logger).Log("listen error: ", err.Error())
		os.Exit(1)
	}

	pingService := services.NewPingService()
	pingService = middlewares.LoggingPingServiceMiddleware(logger)(pingService)
	pingEndPoints := endpoints.NewPingEndpoint(pingService)
	pingGRPCServer := transports.NewPingGRPCServer(pingEndPoints, logger)

	helloEndPoint := endpoints.NewHelloEndpoint(services.NewHelloService())
	helloGRPCServer := transports.NewHelloGRPCServer(helloEndPoint, logger)

	grpcServer := grpc.NewServer()

	// 注册健康检查服务
	healthCheckServiceName := "my-grpc-service"
	healthServer := health.NewServer()
	healthServer.SetServingStatus(healthCheckServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
	grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)  // 注册健康检查服务


	proto.RegisterHelloServer(grpcServer, helloGRPCServer)
	proto.RegisterPingServer(grpcServer, pingGRPCServer)
	r := register_service.RegisterConsole("172.24.25.123", "8500", "172.24.25.123", "8080", logger)

	go func() {
		r.Register()
		err = grpcServer.Serve(listener)
		if err != nil {
			level.Error(logger).Log("grpc error:", err)
		}

	}()

	// 处理终止信号
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
	<-signalChan

	// 取消注册服务
	r.Deregister()

}

服务发现与负载均衡

go-kit 为不同的服务发现系统(eureka、zookeeper、consul、etcd等)提供适配器,Endpointer负责监听服务发现系统,并根据需要生成一组相同的endpoint

type Endpointer interface {
	Endpoints() ([]endpoint.Endpoint, error)
}

go-kit 提供了工厂函数——Factory, 它是一个将实例字符串(例如host:port)转换为特定端点的函数。提供多个端点的实例需要多个工厂函数。工厂函数还返回一个当实例消失并需要清理时调用的io.Closer。

type Factory func(instance string) (endpoint.Endpoint, io.Closer, error)

定义如下工厂函数,它会根据传入的实例地址创建一个 gRPC 客户端的 endpoint。

func factory(instance string) (endpoint.Endpoint, io.Closer, error) {
	conn, err := grpc.Dial(instance, grpc.WithInsecure())
	if err != nil {
		return nil, nil, err
	}
	e := makeEndpoint(conn)
	return e, conn, err
}

demo:

package register_service

import (
	"context"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/consul"
	"github.com/go-kit/kit/sd/lb"
	"google.golang.org/grpc"
	"io"
	"time"
)

func MakeConsulDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint {
	serviceName := "my-kit-demo--grpc-check"
	tags := []string{"my-kit", "register-demo"}
	duration := 500 * time.Millisecond

	// 创建consul的连接实例
	instance := consul.NewInstancer(client, logger, serviceName, tags, true)

	// 使用consul连接实例(发现服务系统)、factory创建sd.Factory
	endpointer := sd.NewEndpointer(instance, factory, logger)

	// 创建RoundRibbon负载均衡器
	balancer := lb.NewRoundRobin(endpointer)

	// 为负载均衡器增加重试功能,同时该对象为endpoint.Endpoint
	retry := lb.Retry(1, duration, balancer)

	return retry
}

func factory(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
	conn, err := grpc.Dial(instance, grpc.WithInsecure())
	if err != nil {
		return nil, nil, err
	}
	defer conn.Close()

	return NewEndpoint(), nil, nil
}
go-kit

本文作者: Qi

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

我也是有底线的哦!

联系我呀

技术支持

学习网站

博主寄诗

学习是一辈子的事情,保持好奇心和学习的态度。

面对生活的挑战,要保持乐观的心态和勇气,相信自己能够克服困难。