package main import ( "context" "flag" "fmt" "github.com/go-kit/kit/log" kitprometheus "github.com/go-kit/kit/metrics/prometheus" kitzipkin "github.com/go-kit/kit/tracing/zipkin" "github.com/openzipkin/zipkin-go" zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http" stdprometheus "github.com/prometheus/client_golang/prometheus" "github.com/tal-tech/go-zero/core/logx" "gorm.io/gorm" "job_risk_third/config" "job_risk_third/discover" "job_risk_third/internal/endpoints" "job_risk_third/internal/services" "job_risk_third/internal/transports" "job_risk_third/loggers" "job_risk_third/model" "net/http" "os" "os/signal" "syscall" ) var configFile = flag.String("f", "config/job_risk_third.json", "the config file") func main() { flag.Parse() errChan := make(chan error) ctx := context.Background() config.Cfg = config.LoadConfig(*configFile) logConf := logx.LogConf{ ServiceName: config.Cfg.Get("name").(string), Mode: config.Cfg.Get("log.mode").(string), Path: config.Cfg.Get("log.path").(string), Level: config.Cfg.Get("log.level").(string), Compress: config.Cfg.Get("log.compress").(bool), KeepDays: config.Cfg.GetInt("log.keep_days"), StackCooldownMillis: config.Cfg.GetInt("log.stack_cooldown_millis"), } zipkinConf := config.ZipkinConfig{ HostConfig: config.HostConfig{ Address: config.Cfg.Get("zipkin.host.address").(string), Port: config.Cfg.GetInt("zipkin.host.port"), }, ZipkinURL: config.Cfg.Get("zipkin.url").(string), } consulConf := config.ConsulConfig{ HostConfig: config.HostConfig{ Address: config.Cfg.Get("consul.host.address").(string), Port: config.Cfg.GetInt("consul.host.port"), }, ServiceId: config.Cfg.Get("consul.service_id").(string), } logx.MustSetup(logConf) var logger = loggers.New() dbConf := &gorm.Config{ SkipDefaultTransaction: false, Logger: logger, } //初始化数据库和连接缓存 model.New(config.Cfg.GetString("connstring"), dbConf) fieldKeys := []string{"method"} requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: config.Cfg.Get("name").(string), Subsystem: config.Cfg.Get("name").(string) + "_service", Name: "request_count", Help: "收到的请求数", }, fieldKeys) requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ Namespace: config.Cfg.Get("name").(string), Subsystem: config.Cfg.Get("name").(string) + "_service", Name: "request_latency", Help: "请求的总持续时间(以微秒为单位)", }, fieldKeys) var zipkinTracer *zipkin.Tracer { var ( err error hostPort = fmt.Sprintf("%s:%d", zipkinConf.Address, zipkinConf.Port) serviceName = config.Cfg.Get("name").(string) + "_service" useNoopTracer = zipkinConf.ZipkinURL == "" reporters = zipkinhttp.NewReporter(zipkinConf.ZipkinURL) ) defer reporters.Close() zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) zipkinTracer, err = zipkin.NewTracer( reporters, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer), ) if err != nil { logx.Error(err) os.Exit(1) } logx.Slow("Zipkin", "type", "Native", "URL", zipkinConf.ZipkinURL) } go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) errChan <- fmt.Errorf("%s", <-c) }() var svr services.JobRiskThirdServices svr = services.JobRiskThirdService{} svr = services.Metrics(requestCount, requestLatency)(svr) pushFeatureDealEndpoint := endpoints.MakePushFeatureDealEndpoint(svr) pushFeatureDealEndpoint = kitzipkin.TraceEndpoint(zipkinTracer, "push_feature_deal")(pushFeatureDealEndpoint) pushUserInfoEndpoint := endpoints.MakePushUserInfoEndpoint(svr) pushUserInfoEndpoint = kitzipkin.TraceEndpoint(zipkinTracer, "push_user_info")(pushUserInfoEndpoint) pushFeatureInfoEndpoint := endpoints.MakePushFeatureInfoEndpoint(svr) pushFeatureInfoEndpoint = kitzipkin.TraceEndpoint(zipkinTracer, "push_feature_info")(pushFeatureInfoEndpoint) //创建健康检查的Endpoint healthEndpoint := endpoints.MakeHealthCheckEndpoint(svr) healthEndpoint = kitzipkin.TraceEndpoint(zipkinTracer, "third_health_endpoint")(healthEndpoint) endpts := endpoints.JobRiskThirdEndpoint{ PushFeatureDealEndpoint: pushFeatureDealEndpoint, PushUserInfoEndpoint: pushUserInfoEndpoint, PushFeatureInfoEndpoint: pushFeatureInfoEndpoint, HealthCheckEndpoint: healthEndpoint, } r := transports.MakeHttpHandler(ctx, endpts, zipkinTracer) var l log.Logger { l = log.NewLogfmtLogger(os.Stderr) l = log.With(l, "ts", log.DefaultTimestampUTC) l = log.With(l, "caller", log.DefaultCaller) } //创建注册对象 registar := discover.Register(consulConf.Address, fmt.Sprintf("%d", consulConf.Port), config.Cfg.Get("http.domain.address").(string), fmt.Sprintf("%d", config.Cfg.GetInt("http.domain.port")), consulConf.ServiceId, config.Cfg.Get("name").(string), l) go func() { fmt.Printf("Http Server start at port %.f \n", config.Cfg.Get("http.host.port")) //启动前执行注册 registar.Register() errChan <- http.ListenAndServe(fmt.Sprintf(":%.f", config.Cfg.Get("http.host.port")), r) }() err := <-errChan //服务退出取消注册 registar.Deregister() logx.Error(err) }