.net core grpc consul 实现服务注册 服务发现 负载均衡(二)

.net core grpc Consul 服务注册 服务发现 负载均衡 Consul Api 依赖注入 切面编程 管道 2019-12-09 3814  

在上一篇[.net core grpc 实现通信(一)](http://www.snaill.net/post/1)中,我们实现的grpc通信在.net core中的可行性,但要在微服务中真正使用,还缺少 服务注册,服务发现及负载均衡等,本篇我们将在 .net core grpc 通信 的基础上加上 服务注册,服务发现,负载均衡。 如对.net core grpc 通信不太熟悉的,可以看上一篇[.net core grpc 实现通信(一)](http://www.snaill.net/post/1),然后再看本篇。 grpc([https://grpc.io/](https://grpc.io/))是google发布的一个开源、高性能、通用RPC(Remote Procedure Call)框架,使用HTTP/2协议,支持多路复用,并用ProtoBuf作为序列化工具,提供跨语言、跨平台支持。 Consul([https://www.consul.io](https://www.consul.io))是一个分布式,高可用、支持多数据中心的服务注册、发现、健康检查和配置共享的服务软件,由 HashiCorp 公司用 Go 语言开发。 本次服务注册、发现 通过 Consul Api 来实现,开发过程中结合.net core 依赖注入,切面管道思想等。 软件版本 .net core:2.0 grpc:1.11.0 Consul:1.1.0 Consul Nuget注册组件:0.7.2.5 #### 项目结构 #### .net core 代码部分: Snai.GrpcClient 客户端 .net core 2.0控制台程序 Snai.GrpcService.Hosting 服务端宿主,Api服务注册,asp.net core 2.0网站程序 Snai.GrpcService.Impl 协议方法实现 .net standard 2.0类库 Snai.GrpcService.Protocol 生成协议方法 .net standard 2.0类库 ![Image](/sitedata/image/dotnet_2_1.png) Consul: conf 配置目录,本次用api注册服务,可以删除 data 缓存数据目录,可清空里面内容 dist Consul UI目录,本次用默认的UI,可以删除 consul.exe 注册软件 startup.bat 执行脚本 ![Image](/sitedata/image/dotnet_2_2.png) #### 项目实现 #### **一、服务端** 服务端主要包括Grpc服务端,Consul Api服务注册、健康检查等。 新建Snai.GrpcService解决方案,由于这次加入了 Consul Api 服务注册,所以我们先从 Api 服务注册开始。 1、实现 Consul Api 服务注册 新建 Snai.GrpcService.Hosting 基于Asp.net Core 2.0空网站,在 依赖项 右击 管理NuGet程序包 浏览 找到 Consul 版本0.7.2.5安装,用于Api服务注册使用 新建 appsettings.json 配置文件,配置 GrpcService Grpc服务端IP和端口,HealthService健康检测名称、IP和地址,ConsulService Consul的IP和端口,代码如下 ``` { "GrpcService": { "IP": "localhost", "Port": "5031" }, "HealthService": { "Name": "GrpcService", "IP": "localhost", "Port": "5021" }, "ConsulService": { "IP": "localhost", "Port": "8500" } } ``` 新建Consul目录,用于放Api注册相关代码 在Consul目录下新建Entity目录,在Entity目录下新建HealthService.cs,ConsulService.cs类,分别对应HealthService,ConsulService两个配置项,代码如下 HealthService.cs ``` using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Snai.GrpcService.Hosting.Consul.Entity { public class HealthService { public string Name { get; set; } public string IP { get; set; } public int Port { get; set; } } } ``` ConsulService.cs ``` using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Snai.GrpcService.Hosting.Consul.Entity { public class ConsulService { public string IP { get; set; } public int Port { get; set; } } } ``` 在 Consul 目录下新建 AppRregister.cs 类,添加 IApplicationBuilder 扩展方法 RegisterConsul,来调用 Consul Api 实现服务注册,代码如下 ``` using Consul; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Options; using Snai.GrpcService.Hosting.Consul.Entity; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Snai.GrpcService.Hosting.Consul { public static class AppRregister { // 服务注册 public static IApplicationBuilder RegisterConsul(this IApplicationBuilder app, IApplicationLifetime lifetime, IOptions<HealthService> healthService, IOptions<ConsulService> consulService) { var consulClient = new ConsulClient(x => x.Address = new Uri($"http://{consulService.Value.IP}:{consulService.Value.Port}"));//请求注册的 Consul 地址 var httpCheck = new AgentServiceCheck() { DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务启动多久后注册 Interval = TimeSpan.FromSeconds(10),//健康检查时间间隔,或者称为心跳间隔 HTTP = $"http://{healthService.Value.IP}:{healthService.Value.Port}/health",//健康检查地址 Timeout = TimeSpan.FromSeconds(5) }; // Register service with consul var registration = new AgentServiceRegistration() { Checks = new[] { httpCheck }, ID = healthService.Value.Name + "_" + healthService.Value.Port, Name = healthService.Value.Name, Address = healthService.Value.IP, Port = healthService.Value.Port, Tags = new[] { $"urlprefix-/{healthService.Value.Name}" }//添加 urlprefix-/servicename 格式的 tag 标签,以便 Fabio 识别 }; consulClient.Agent.ServiceRegister(registration).Wait();//服务启动时注册,内部实现其实就是使用 Consul API 进行注册(HttpClient发起) lifetime.ApplicationStopping.Register(() => { consulClient.Agent.ServiceDeregister(registration.ID).Wait();//服务停止时取消注册 }); return app; } } } ``` 修改 Startup.cs 代码 加入 Startup(IConfiguration configuration) 构造函数,实现配置注入,如果建的是Web Api或MVC网站,默认是有的 修改 ConfigureServices(IServiceCollection services) 方法,注册全局配置 修改 Configure() 方法,添加健康检查路由地址 app.Map("/health", HealthMap),调用 RegisterConsul 扩展方法实现服务注册 添加 HealthMap(IApplicationBuilder app) 实现health路由。由于只有一个健康检查地址,所以没有建Web Api网站,只建了个空网站 代码如下,注册配置GrpcService 、 注册Rpc服务、启动Rpc服务 后面用到等下讲 ``` using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Snai.GrpcService.Hosting.Consul; using Snai.GrpcService.Hosting.Consul.Entity; using Snai.GrpcService.Impl; namespace Snai.GrpcService.Hosting { public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 public void ConfigureServices(IServiceCollection services) { //注册全局配置 services.AddOptions(); services.Configure<Impl.Entity.GrpcService>(Configuration.GetSection(nameof(Impl.Entity.GrpcService))); services.Configure<HealthService>(Configuration.GetSection(nameof(HealthService))); services.Configure<ConsulService>(Configuration.GetSection(nameof(ConsulService))); //注册Rpc服务 services.AddSingleton<IRpcConfig, RpcConfig>(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime, IOptions<HealthService> healthService, IOptions<ConsulService> consulService, IRpcConfig rpc) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } // 添加健康检查路由地址 app.Map("/health", HealthMap); // 服务注册 app.RegisterConsul(lifetime, healthService, consulService); // 启动Rpc服务 rpc.Start(); } private static void HealthMap(IApplicationBuilder app) { app.Run(async context => { await context.Response.WriteAsync("OK"); }); } } } ``` 修改 Program.cs 代码,调置网站地址为 .UseUrls("http://localhost:5021"),代码如下 ``` using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; namespace Snai.GrpcService.Hosting { public class Program { public static void Main(string[] args) { BuildWebHost(args).Run(); } public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseUrls("http://localhost:5021") .UseStartup<Startup>() .Build(); } } ``` 到此 Consul Api 服务注册 已完成,最终项目结构如下: ![Image](/sitedata/image/dotnet_2_3.png) 2、协议编写,将协议生成C#代码 由于在上一篇[.net core grpc 实现通信(一)](http://www.snaill.net/post/1)有过介绍,这里就简单说下 新建 Snai.GrpcService.Protocol协议类库项目,在 依赖项 右击 管理NuGet程序包 浏览 找到 Grpc.Core 版本1.11.0,Google.Protobuf 版本3.5.1 包下载安装 在根目录下新建msg.proto 文件,编写基于proto3语言的协议代码,用于生成各语言协议,msg.proto 代码如下 ``` syntax = "proto3"; package Snai.GrpcService.Protocol; service MsgService{ rpc GetSum(GetMsgNumRequest) returns (GetMsgSumReply){} } message GetMsgNumRequest { int32 Num1 = 1; int32 Num2 = 2; } message GetMsgSumReply { int32 Sum = 1; } ``` 新建.net framework 项目类库,引用安装 Grpc.Tools、Google.Protobuf.Tools 组件程序包,分别得到 grpc_csharp_plugin.exe、protoc.exe 工具 到package目录下,找到与系统相应的grpc_csharp_plugin.exe、protoc.exe工具,拷到 Snai.GrpcService.Protocol 项目下 在Snai.GrpcService.Protocol根目录下新建 ProtocGenerate.cmd 文件,在其中输入以下指令 ``` protoc -I . --csharp_out . --grpc_out . --plugin=protoc-gen-grpc=grpc_csharp_plugin.exe msg.proto ``` 然后直接双击运行,项目下生成了“Msg.cs”和“MsgGrpc.cs”两个文件,这样协议部分的所有工作就完成了,最终项目结构如下: ![Image](/sitedata/image/dotnet_2_4.png) 3、编写协议实现代码 新建 Snai.GrpcService.Impl 实现类库项目,在 依赖项 下载安装Grpc.Core 包,项目引用 Snai.GrpcService.Protocol 新建 Entity 目录,在Entity目录下新建 GrpcService.cs 类,对应 Snai.GrpcService.Hosting 项目下 appsettings.json 配置文件的 GrpcService 配置项,代码如下 ``` using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Snai.GrpcService.Impl.Entity { public class GrpcService { public string IP { get; set; } public int Port { get; set; } } } ``` 在根目录下新建 RpcService 目录,在 RpcService 目录下新建 MsgServiceImpl.cs 类,继承 MsgService.MsgServiceBase 协议类,实现服务方法,代码如下 ``` using Grpc.Core; using Snai.GrpcService.Protocol; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Snai.GrpcService.Impl.RpcService { public class MsgServiceImpl : MsgService.MsgServiceBase { public override async Task<GetMsgSumReply> GetSum(GetMsgNumRequest request, ServerCallContext context) { var result = new GetMsgSumReply(); result.Sum = request.Num1 + request.Num2; Console.WriteLine(request.Num1 + "+" + request.Num2 + "=" + result.Sum); return result; } } } ``` 在根目录下新建IRpcConfig.cs接口,定义 Start() 用于Rpc启动基方法,代码如下 ``` using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcService.Impl { public interface IRpcConfig { void Start(); } } ``` 在根目录下新建 RpcConfig.cs 类,用于实现 IRpcConfig.cs 接口,启动Rpc服务,代码如下 ``` using Grpc.Core; using Microsoft.Extensions.Options; using Snai.GrpcService.Impl.RpcService; using Snai.GrpcService.Protocol; using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcService.Impl { public class RpcConfig: IRpcConfig { private static Server _server; static IOptions<Entity.GrpcService> GrpcSettings; public RpcConfig(IOptions<Entity.GrpcService> grpcSettings) { GrpcSettings = grpcSettings; } public void Start() { _server = new Server { Services = { MsgService.BindService(new MsgServiceImpl()) }, Ports = { new ServerPort(GrpcSettings.Value.IP, GrpcSettings.Value.Port, ServerCredentials.Insecure) } }; _server.Start(); Console.WriteLine($"Grpc ServerListening On Port {GrpcSettings.Value.Port}"); } } } ``` 在回到Snai.GrpcService.Hosting项目中,在 Startup.cs 中 ConfigureServices 中注册 GrpcService 配置、注册Rpc服务,在 Configure 中 启动Rpc服务 就是上面说到的,如图 ![Image](/sitedata/image/dotnet_2_5.png) 最终项目结构如下: ![Image](/sitedata/image/dotnet_2_6.png) 到此服务端的代码实现已完成,下面我们启动Consul和服务端,验证 Api 注册和Grpc启动。 **二、Consul和服务端启动** 启动Consul,启动Grpc服务、注册服务到Consul 1、启动Consul 首先下载Consul:[https://www.consul.io/downloads.html](https://www.consul.io/downloads.html) ,本项目是windows下进行测试,得到consul.exe ![Image](/sitedata/image/dotnet_2_7.png) 由于本次用Api注册,用Consul默认自带UI,所以conf和dist可删除 清除Consul/data 内容,新建startup.bat文件,输入下面代码,双击启动Consul,本项目测试时一台机器,所以把 本机IP 改成 127.0.0.1 ``` consul agent -server -datacenter=grpc-consul -bootstrap -data-dir ./data -ui -node=grpc-consul1 -bind 本机IP -client=0.0.0.0 ``` 再在Consul目录下启动另一个cmd命令行窗口,输入命令:consul operator raft list-peers 查看状态查看状态,结果如下 ![Image](/sitedata/image/dotnet_2_8.png) 打开Consul UI:http://localhost:8500 查看情况 ![Image](/sitedata/image/dotnet_2_9.png) Consul 启动成功。 在 [.net core Ocelot Consul 实现API网关 服务注册 服务发现 负载均衡](http://www.snaill.net/post/5) 中后面 Consul 部分,有 Consul 集群搭建等其他介绍,可以去参考看下。 2、启动服务端,启动Grpc服务、注册服务到Consul 由于客户端要实现负载,所以把 Snai.GrpcService.Hosting 项目生成两次,启动两个一样的服务端,只是端口不同 服务5021 地址为5021: .UseUrls("http://localhost:5021"),GrpcService:5031,如下图 ![Image](/sitedata/image/dotnet_2_10.png) ![Image](/sitedata/image/dotnet_2_11.png) 服务5022 修改地址为5022: .UseUrls("http://localhost:5022"),GrpcService:5032,如下图 ![Image](/sitedata/image/dotnet_2_12.png) ![Image](/sitedata/image/dotnet_2_13.png) 启动 服务5021和服务5022两个服务端,如下面 ![Image](/sitedata/image/dotnet_2_14.png) ![Image](/sitedata/image/dotnet_2_15.png) ![Image](/sitedata/image/dotnet_2_16.png) 看到 Grpc ServerListening On Port 5031,Grpc ServerListening On Port 5032 说明 Grpc 服务端启动成功 看到 Request starting HTTP/1.1 GET http://localhost:5021/health 说明 Consul 健康检查成功 打开Consul服务查看地址 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看,两个GrpcService注册成功,健康检查状态正常 ![Image](/sitedata/image/dotnet_2_17.png) **三、客户端** 客户端主要包括Grpc客户端,Consul Api服务发现、负载均衡等。 新建Snai.GrpcClient 控制台程序,在 依赖项 下载安装Grpc.Core 包,项目引用Snai.GrpcService.Protocol,在依赖项下载安装下面工具组件包 用于读取 json配置:Microsoft.Extensions.Configuration,Microsoft.Extensions.Configuration.Json 用于依赖注入:Microsoft.Extensions.DependencyInjection 用于注入全局配置:Microsoft.Extensions.Options,Microsoft.Extensions.Options.ConfigurationExtensions 在项目根目录下新建 Utils 目录,在 Utils 目录下新建 HttpHelper.cs 类,用于程序内发送http请求,代码如下 ``` using System; using System.Collections.Generic; using System.Net.Http; using System.Text; using System.Threading.Tasks; /* * 参考 pudefu https://www.cnblogs.com/pudefu/p/7581956.html ,在此表示感谢 */ namespace Snai.GrpcClient.Utils { public class HttpHelper { /// <summary> /// 同步GET请求 /// </summary> /// <param name="url"></param> /// <param name="headers"></param> /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param> /// <returns></returns> public static string HttpGet(string url, Dictionary<string, string> headers = null, int timeout = 0) { using (HttpClient client = new HttpClient()) { if (headers != null) { foreach (KeyValuePair<string, string> header in headers) { client.DefaultRequestHeaders.Add(header.Key, header.Value); } } if (timeout > 0) { client.Timeout = new TimeSpan(0, 0, timeout); } Byte[] resultBytes = client.GetByteArrayAsync(url).Result; return Encoding.UTF8.GetString(resultBytes); } } /// <summary> /// 异步GET请求 /// </summary> /// <param name="url"></param> /// <param name="headers"></param> /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param> /// <returns></returns> public static async Task<string> HttpGetAsync(string url, Dictionary<string, string> headers = null, int timeout = 0) { using (HttpClient client = new HttpClient()) { if (headers != null) { foreach (KeyValuePair<string, string> header in headers) { client.DefaultRequestHeaders.Add(header.Key, header.Value); } } if (timeout > 0) { client.Timeout = new TimeSpan(0, 0, timeout); } Byte[] resultBytes = await client.GetByteArrayAsync(url); return Encoding.Default.GetString(resultBytes); } } /// <summary> /// 同步POST请求 /// </summary> /// <param name="url"></param> /// <param name="postData"></param> /// <param name="headers"></param> /// <param name="contentType"></param> /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param> /// <param name="encoding">默认UTF8</param> /// <returns></returns> public static string HttpPost(string url, string postData, Dictionary<string, string> headers = null, string contentType = null, int timeout = 0, Encoding encoding = null) { using (HttpClient client = new HttpClient()) { if (headers != null) { foreach (KeyValuePair<string, string> header in headers) { client.DefaultRequestHeaders.Add(header.Key, header.Value); } } if (timeout > 0) { client.Timeout = new TimeSpan(0, 0, timeout); } using (HttpContent content = new StringContent(postData ?? "", encoding ?? Encoding.UTF8)) { if (contentType != null) { content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(contentType); } using (HttpResponseMessage responseMessage = client.PostAsync(url, content).Result) { Byte[] resultBytes = responseMessage.Content.ReadAsByteArrayAsync().Result; return Encoding.UTF8.GetString(resultBytes); } } } } /// <summary> /// 异步POST请求 /// </summary> /// <param name="url"></param> /// <param name="postData"></param> /// <param name="headers"></param> /// <param name="contentType"></param> /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param> /// <param name="encoding">默认UTF8</param> /// <returns></returns> public static async Task<string> HttpPostAsync(string url, string postData, Dictionary<string, string> headers = null, string contentType = null, int timeout = 0, Encoding encoding = null) { using (HttpClient client = new HttpClient()) { if (headers != null) { foreach (KeyValuePair<string, string> header in headers) { client.DefaultRequestHeaders.Add(header.Key, header.Value); } } if (timeout > 0) { client.Timeout = new TimeSpan(0, 0, timeout); } using (HttpContent content = new StringContent(postData ?? "", encoding ?? Encoding.UTF8)) { if (contentType != null) { content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(contentType); } using (HttpResponseMessage responseMessage = await client.PostAsync(url, content)) { Byte[] resultBytes = await responseMessage.Content.ReadAsByteArrayAsync(); return Encoding.UTF8.GetString(resultBytes); } } } } } } ``` 在项目根目录下新建 Consul 目录,在 Consul 目录下新建 Entity 目录,在 Entity 目录下新建 HealthCheck.cs 类,用于接收 Consul Api发现的信息实体,代码如下 ``` using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcClient.Consul.Entity { public class HealthCheck { public string Node { get; set; } public string CheckID { get; set; } public string Name { get; set; } public string Status { get; set; } public string Notes { get; set; } public string Output { get; set; } public string ServiceID { get; set; } public string ServiceName { get; set; } public string[] ServiceTags { get; set; } public dynamic Definition { get; set; } public int CreateIndex { get; set; } public int ModifyIndex { get; set; } } } ``` 在 Consul 目录下新建 IAppFind.cs 接口,定义 FindConsul() 用于 Consul 服务发现基方法,代码如下 ``` using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcClient.Consul { public interface IAppFind { IEnumerable<string> FindConsul(string ServiceName); } } ``` 在 Consul 目录下新建 AppFind.cs 类,用于实现 IAppFind.cs 接口,实现 Consul 服务发现方法,代码如下 ``` using Microsoft.Extensions.Options; using Newtonsoft.Json; using Snai.GrpcClient.Consul.Entity; using Snai.GrpcClient.Framework.Entity; using Snai.GrpcClient.Utils; using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace Snai.GrpcClient.Consul { /* * 服务发现 * (服务和健康信息)http://localhost:8500/v1/health/service/GrpcService * (健康信息)http://localhost:8500/v1/health/checks/GrpcService */ public class AppFind: IAppFind { static IOptions<GrpcServiceSettings> GrpcSettings; static IOptions<ConsulService> ConsulSettings; public AppFind(IOptions<GrpcServiceSettings> grpcSettings, IOptions<ConsulService> consulSettings) { GrpcSettings = grpcSettings; ConsulSettings = consulSettings; } public IEnumerable<string> FindConsul(string ServiceName) { Dictionary<string, string> headers = new Dictionary<string, string>(); var consul = ConsulSettings.Value; string findUrl = $"http://{consul.IP}:{consul.Port}/v1/health/checks/{ServiceName}"; string findResult = HttpHelper.HttpGet(findUrl, headers, 5); if (findResult.Equals("")) { var grpcServices = GrpcSettings.Value.GrpcServices; return grpcServices.Where(g=>g.ServiceName.Equals(ServiceName,StringComparison.CurrentCultureIgnoreCase)).Select(s => s.ServiceID); } var findCheck = JsonConvert.DeserializeObject<List<HealthCheck>>(findResult); return findCheck.Where(g => g.Status.Equals("passing", StringComparison.CurrentCultureIgnoreCase)).Select(g => g.ServiceID); } } } ``` 在项目根目录下新建 LoadBalance 目录,在 LoadBalance 目录下新建 ILoadBalance.cs 接口,定义 GetGrpcService() 用于负载均衡基方法,代码如下 ``` using Snai.GrpcClient.Framework.Entity; using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcClient.LoadBalance { /* * 负载均衡接口 */ public interface ILoadBalance { string GetGrpcService(string ServiceName); } } ``` 在 LoadBalance 目录下新建 WeightRoundBalance.cs 类,用于实现 ILoadBalance.cs 接口,实现 GetGrpcService() 负载均衡方法,本次负载均衡实现权重轮询算法,代码如下 ``` using Snai.GrpcClient.Consul; using Snai.GrpcClient.Utils; using System; using System.Collections.Generic; using System.Text; using System.Linq; using Snai.GrpcClient.Framework.Entity; using Microsoft.Extensions.Options; namespace Snai.GrpcClient.LoadBalance { /* * 权重轮询 */ public class WeightRoundBalance : ILoadBalance { int Balance; IOptions<GrpcServiceSettings> GrpcSettings; IAppFind AppFind; public WeightRoundBalance(IOptions<GrpcServiceSettings> grpcSettings, IAppFind appFind) { Balance = 0; GrpcSettings = grpcSettings; AppFind = appFind; } public string GetGrpcService(string ServiceName) { var grpcServices = GrpcSettings.Value.GrpcServices; var healthServiceID = AppFind.FindConsul(ServiceName); if (grpcServices == null || grpcServices.Count() == 0 || healthServiceID == null || healthServiceID.Count() == 0) { return ""; } //健康的服务 var healthServices = new List<Framework.Entity.GrpcService>(); foreach (var service in grpcServices) { foreach (var health in healthServiceID) { if (service.ServiceID.Equals(health, StringComparison.CurrentCultureIgnoreCase)) { healthServices.Add(service); break; } } } if (healthServices == null || healthServices.Count() == 0) { return ""; } //加权轮询 var services = new List<string>(); foreach (var service in healthServices) { services.AddRange(Enumerable.Repeat(service.IP + ":" + service.Port, service.Weight)); } var servicesArray = services.ToArray(); Balance = Balance % servicesArray.Length; var grpcUrl = servicesArray[Balance]; Balance = Balance + 1; return grpcUrl; } } } ``` 在项目根目录下新建 RpcClient 目录,在 RpcClient 目录下新建 IMsgClient.cs 接口,定义 GetSum() 用于Grpc客户端调用基方法,代码如下 ``` using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcClient.RpcClient { public interface IMsgClient { void GetSum(int num1, int num2); } } ``` 在 RpcClient 目录下新建 MsgClient.cs 类,用于实现 IMsgClient.cs 接口,实现 GetSum() 方法用于Grpc客户端调用,代码如下 ``` using Grpc.Core; using Microsoft.Extensions.DependencyInjection; using Snai.GrpcClient.LoadBalance; using Snai.GrpcService.Protocol; using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcClient.RpcClient { public class MsgClient: IMsgClient { ILoadBalance LoadBalance; Channel GrpcChannel; MsgService.MsgServiceClient GrpcClient; public MsgClient(ILoadBalance loadBalance) { LoadBalance = loadBalance; var grpcUrl = LoadBalance.GetGrpcService("GrpcService"); if (!grpcUrl.Equals("")) { Console.WriteLine($"Grpc Service:{grpcUrl}"); GrpcChannel = new Channel(grpcUrl, ChannelCredentials.Insecure); GrpcClient = new MsgService.MsgServiceClient(GrpcChannel); } } public void GetSum(int num1, int num2) { if (GrpcClient != null) { GetMsgSumReply msgSum = GrpcClient.GetSum(new GetMsgNumRequest { Num1 = num1, Num2 = num2 }); Console.WriteLine("Grpc Client Call GetSum():" + msgSum.Sum); } else { Console.WriteLine("所有负载都挂掉了!"); } } } } ``` 在项目根目录下新建 Framework 目录,在 Framework 目录下新建 Entity 目录,在 Entity 目录下新建 ConsulService.cs 和 GrpcServiceSettings.cs 类,分别对应配置appsettings.json的 ConsulService,GrpcServiceSettings 两个配置项,代码如下 ConsulService.cs ``` using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcClient.Framework.Entity { public class ConsulService { public string IP { get; set; } public int Port { get; set; } } } ``` GrpcServiceSettings.cs ``` using System; using System.Collections.Generic; using System.Text; namespace Snai.GrpcClient.Framework.Entity { public class GrpcServiceSettings { public List<GrpcService> GrpcServices { get; set; } } public class GrpcService { public string ServiceName { get; set; } public string ServiceID { get; set; } public string IP { get; set; } public int Port { get; set; } public int Weight { get; set; } } } ``` 在 Framework 目录下新建 DependencyInitialize.cs 类,定义 AddImplement() 方法用于注册全局配置和类到容器,实现依赖注入,代码如下 ``` using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Snai.GrpcClient.Consul; using Snai.GrpcClient.Framework.Entity; using Snai.GrpcClient.LoadBalance; using Snai.GrpcClient.RpcClient; using System; using System.Collections.Generic; using System.IO; using System.Text; namespace Snai.GrpcClient.Framework { /* * IServiceCollection 依赖注入生命周期 * AddTransient 每次都是全新的 * AddScoped 在一个范围之内只有同一个实例(同一个线程,同一个浏览器请求只有一个实例) * AddSingleton 单例 */ public static class DependencyInitialize { /// <summary> /// 注册对象 /// </summary> /// <param name="services">The services.</param> /* * IAppFind AppFind; * 构造函数注入使用 IAppFind appFind * AppFind = appFind; */ public static void AddImplement(this IServiceCollection services) { //添加 json 文件路径 var builder = new ConfigurationBuilder().SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json"); //创建配置根对象 var configurationRoot = builder.Build(); //注册全局配置 services.AddConfigImplement(configurationRoot); //注册服务发现 services.AddScoped<IAppFind, AppFind>(); //注册负载均衡 if (configurationRoot["LoadBalancer"].Equals("WeightRound", StringComparison.CurrentCultureIgnoreCase)) { services.AddSingleton<ILoadBalance, WeightRoundBalance>(); } //注册Rpc客户端 services.AddTransient<IMsgClient, MsgClient>(); } /// <summary> /// 注册全局配置 /// </summary> /// <param name="services">The services.</param> /// <param name="configurationRoot">The configurationRoot.</param> /* * IOptions<GrpcServiceSettings> GrpcSettings; * 构造函数注入使用 IOptions<GrpcServiceSettings> grpcSettings * GrpcSettings = grpcSettings; */ public static void AddConfigImplement(this IServiceCollection services, IConfigurationRoot configurationRoot) { //注册配置对象 services.AddOptions(); services.Configure<GrpcServiceSettings>(configurationRoot.GetSection(nameof(GrpcServiceSettings))); services.Configure<ConsulService>(configurationRoot.GetSection(nameof(ConsulService))); } } } ``` 在根目录下新建 appsettings.json 配置文件,配置 GrpcServiceSettings 的 GrpcServices 为服务端发布的两个服务5021和5022,LoadBalancer 负载均衡为 WeightRound 权重轮询(如实现其他负载方法可做相应配置,注册负载均衡时也做相应修改),ConsulService Consul的IP和端口,代码如下 ``` { "GrpcServiceSettings": { "GrpcServices": [ { "ServiceName": "GrpcService", "ServiceID": "GrpcService_5021", "IP": "localhost", "Port": "5031", "Weight": "2" }, { "ServiceName": "GrpcService", "ServiceID": "GrpcService_5022", "IP": "localhost", "Port": "5032", "Weight": "1" } ] }, "LoadBalancer": "WeightRound", "ConsulService": { "IP": "localhost", "Port": "8500" } } ``` GrpcServices Grpc服务列表   ServiceName:服务名称,负载同一服务名称相同   ServiceID:服务ID,保持唯一   IP:服务IP   Port:端口   Weight:服务权重 修改 Program.cs 的 Main() 方法,调用 AddImplement(),注册全局配置和类到容器,注入使用 MsgClient 类的 GetSum() 方法,实现 Grpc 调用,代码如下 ``` using Microsoft.Extensions.DependencyInjection; using Snai.GrpcClient.Framework; using Snai.GrpcClient.RpcClient; using System; namespace Snai.GrpcClient { class Program { static void Main(string[] args) { IServiceCollection service = new ServiceCollection(); //注册对象 service.AddImplement(); //注入使用对象 var provider = service.BuildServiceProvider(); string exeArg = string.Empty; Console.WriteLine("Grpc调用!"); Console.WriteLine("-c\t调用Grpc服务;"); Console.WriteLine("-q\t退出服务;"); while (true) { exeArg = Console.ReadKey().KeyChar.ToString(); Console.WriteLine(); if (exeArg.ToLower().Equals("c", StringComparison.CurrentCultureIgnoreCase)) { //调用服务 var rpcClient = provider.GetService<IMsgClient>(); rpcClient.GetSum(10, 2); } else if (exeArg.ToLower().Equals("q", StringComparison.CurrentCultureIgnoreCase)) { break; } else { Console.WriteLine("参数异常!"); } } } } } ``` 右击项目生成,最终项目结构如下: ![Image](/sitedata/image/dotnet_2_18.png) 到此客户端的代码实现已完成,下面运行测试 Grpc+Consul 服务注册、服务发现和负载均衡。 **四、运行测试 Grpc+Consul 服务注册、服务发现和负载均衡** 双击 startup.bat 启动 Consul,再启动服务5021和5022,启动成功打开 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看服务情况 ![Image](/sitedata/image/dotnet_2_19.png) ![Image](/sitedata/image/dotnet_2_20.png) ![Image](/sitedata/image/dotnet_2_21.png) 启动 Snai.GrpcClient 客户端 ![Image](/sitedata/image/dotnet_2_22.png) 输入 c 调用Grpc服务,调用3次,5031调用2次,5032调用1次,成功实现负载均衡 ![Image](/sitedata/image/dotnet_2_23.png) 关掉服务5022,等10秒左右(因为设置健康检查时间间隔10秒),再输入 c 调用Grpc服务,只调用5031 ![Image](/sitedata/image/dotnet_2_24.png) 打开 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看,5022 状态失败,或消失 ![Image](/sitedata/image/dotnet_2_25.png) Grpc+Consul实现服务注册、服务发现、健康检查和负载均衡已完成 Github源码地址:[https://github.com/Liu-Alan/Grpc-Consul](https://github.com/Liu-Alan/Grpc-Consul)