diff --git a/internal/app/app.go b/internal/app/app.go index 6353e7c..832e338 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -3,11 +3,9 @@ package app import ( "context" log "github.com/sirupsen/logrus" + "google.golang.org/grpc" "net" - "os" - "os/signal" "runtime" - "syscall" "task-processor/config" "task-processor/internal/appState" "task-processor/internal/parsers" @@ -28,6 +26,8 @@ type App struct { state *appState.State network *remote.Network numCPUs int + metricsSrv *router.Handler + taskApiSrv *grpc.Server } func New(c *config.Config) *App { @@ -38,6 +38,14 @@ func New(c *config.Config) *App { st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksCfg.RetryCount, c.TasksCfg.RetryMinutes) + server := newServer(st) + + //metrics + mSrv := router.NewHandler(router.Deps{ + Addr: net.JoinHostPort(c.Metrics.Host, c.Metrics.Port), + GinMode: c.Metrics.GinMode, + }) + return &App{ config: c, checkPeriod: time.Duration(c.CheckPeriod), @@ -47,33 +55,31 @@ func New(c *config.Config) *App { state: st, network: remote.NewHandler(), numCPUs: numCPUs, + metricsSrv: mSrv, + taskApiSrv: server, } } -func (app *App) Run() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - +func (app *App) Run(ctx context.Context) { log.Info("Application start") + + addr := net.JoinHostPort(app.config.GrpcCfg.ServerHost, app.config.GrpcCfg.ServerPort) + log.WithFields(log.Fields{ - "Service address": app.config.GrpcCfg.ServerHost + ":" + app.config.GrpcCfg.ServerPort, + "Service address": addr, "Number of CPUs": app.numCPUs, }).Debug("App settings") - //metrics - mSrv := router.NewHandler(router.Deps{ - Addr: net.JoinHostPort(app.config.Metrics.Host, app.config.Metrics.Port), - GinMode: app.config.Metrics.GinMode, - }) + errChan := make(chan error, 16) //main - server := newServer(app) apiClient := newApiClient(app.config.GrpcCfg.ApiClientHost + ":" + app.config.GrpcCfg.ApiClientPort) period := time.NewTicker(app.checkPeriod * time.Hour) defer period.Stop() sender := make(chan shared.TaskResult, app.numCPUs*10) + defer close(sender) // external scrapper surugayaScrapper := newSurugayaScrapperClient(app.config.GrpcCfg.SurugayaScrapperHost + ":" + app.config.GrpcCfg.SurugayaScrapperPort) @@ -82,11 +88,11 @@ func (app *App) Run() { handlers := make(map[string]parsers.TaskHandler) if app.config.OriginEnabled.Surugaya { - handlers[shared.OriginSurugaya] = parsers.NewSurugayaParser(ctx, surugayaScrapper) + handlers[shared.OriginSurugaya] = parsers.NewSurugayaParser(surugayaScrapper) } if app.config.OriginEnabled.Mandarake { - handlers[shared.OriginMandarake] = mandarake.NewParser(mandarake.ParserDeps{ + handlers[shared.OriginMandarake] = mandarake.NewParser(mandarake.Deps{ Enabled: app.config.OriginEnabled.Mandarake, ExternalBrowser: app.config.ExternalBrowser, GoroutinesNumber: app.numCPUs, @@ -108,7 +114,7 @@ func (app *App) Run() { receivedTasks := app.network.RequestTasks(ctx, apiClient) log.WithField("length", len(receivedTasks)).Debug("End receiving") - taskProcessor.StartWork(receivedTasks) + taskProcessor.StartWork(ctx, receivedTasks) } go func() { @@ -143,37 +149,38 @@ func (app *App) Run() { //start metrics server go func() { - if err := mSrv.Run(); err != nil { - log.WithError(err).Error("Metrics server run failed") + if err := app.metricsSrv.Run(); err != nil { + errChan <- err } }() //gRPC Server for status response go func() { - listener, err := net.Listen("tcp", app.config.GrpcCfg.ServerHost+":"+app.config.GrpcCfg.ServerPort) + listener, err := net.Listen("tcp", addr) if err != nil { - log.Fatalf("failed to listen: %v", err) + errChan <- err } - log.Infof("gRPC Server listening at %v", app.config.GrpcCfg.ServerHost+":"+app.config.GrpcCfg.ServerPort) - if err := server.Serve(listener); err != nil { - log.Fatalf("failed to serve: %v", err) + log.Infof("gRPC Server listening at %v", addr) + if err = app.taskApiSrv.Serve(listener); err != nil { + errChan <- err } }() - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) - <-sigint - log.Info("Shutting down...") - - period.Stop() - server.GracefulStop() - cancel() - if err := mSrv.Shutdown(ctx); err != nil { - log.WithError(err).Error("Failed to shutdown server") - } - }() - - <-ctx.Done() + select { + case <-ctx.Done(): + app.shutdown(ctx) + case err := <-errChan: + log.WithError(err).Fatal("Application run error") + } +} + +func (app *App) shutdown(ctx context.Context) { + log.Info("Shutting down...") + + app.taskApiSrv.GracefulStop() + + if err := app.metricsSrv.Shutdown(ctx); err != nil { + log.WithError(err).Error("Failed to shutdown server") + } } diff --git a/internal/app/server.go b/internal/app/server.go index f837430..9c9dadc 100644 --- a/internal/app/server.go +++ b/internal/app/server.go @@ -12,10 +12,10 @@ type Server struct { state *appState.State } -func newServer(app *App) *grpc.Server { +func newServer(state *appState.State) *grpc.Server { s := grpc.NewServer() srv := &Server{ - state: app.state, + state: state, } pb.RegisterTaskProcessorServer(s, srv) return s