package app import ( "context" log "github.com/sirupsen/logrus" "google.golang.org/grpc" "net" "runtime" "task-processor/config" "task-processor/internal/appState" "task-processor/internal/parsers" "task-processor/internal/parsers/mandarake" "task-processor/internal/processor" "task-processor/internal/remote" "task-processor/internal/shared" "task-processor/pkg/router" "time" ) type App struct { config *config.Config checkPeriod time.Duration startTime time.Time retryCount int retryMinutes int state *appState.State network *remote.Network numCPUs int metricsSrv *router.Handler taskApiSrv *grpc.Server } func New(c *config.Config) *App { numCPUs := c.NumCPUs if numCPUs < 1 { numCPUs = runtime.NumCPU() } st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksCfg.RetryCount, c.TasksCfg.RetryMinutes) server := newServer(st) //metrics mSrv := router.NewHandler(router.Deps{ Addr: net.JoinHostPort(c.Metrics.Host, c.Metrics.Port), GinMode: c.Metrics.GinMode, }) return &App{ config: c, checkPeriod: time.Duration(c.CheckPeriod), startTime: time.Now(), retryCount: c.TasksCfg.RetryCount, retryMinutes: c.TasksCfg.RetryMinutes, state: st, network: remote.NewHandler(), numCPUs: numCPUs, metricsSrv: mSrv, taskApiSrv: server, } } func (app *App) Run(ctx context.Context) { log.Info("Application start") addr := net.JoinHostPort(app.config.GrpcCfg.ServerHost, app.config.GrpcCfg.ServerPort) log.WithFields(log.Fields{ "Service address": addr, "Number of CPUs": app.numCPUs, }).Debug("App settings") errChan := make(chan error, 16) //main apiClient := newApiClient(app.config.GrpcCfg.ApiClientHost + ":" + app.config.GrpcCfg.ApiClientPort) sender := make(chan shared.TaskResult, app.numCPUs*10) defer close(sender) // external scrapper surugayaScrapper := newSurugayaScrapperClient(app.config.GrpcCfg.SurugayaScrapperHost + ":" + app.config.GrpcCfg.SurugayaScrapperPort) //task processor handlers := make(map[string]parsers.TaskHandler) if app.config.OriginEnabled.Surugaya { handlers[shared.OriginSurugaya] = parsers.NewSurugayaParser(surugayaScrapper) } if app.config.OriginEnabled.Mandarake { handlers[shared.OriginMandarake] = mandarake.NewParser(mandarake.Deps{ Enabled: app.config.OriginEnabled.Mandarake, ExternalBrowser: app.config.ExternalBrowser, GoroutinesNumber: app.numCPUs, TaskTimeout: app.config.TasksCfg.TaskTimeout, }) } taskProcessor := processor.New(processor.Deps{ Handlers: handlers, Out: sender, State: app.state, Ctx: ctx, NumCPUs: app.numCPUs, }) process := func() { app.state.SetStatus(appState.StatusRequestTasks) log.Info("Requesting data for parsing") receivedTasks := app.network.RequestTasks(ctx, apiClient) log.WithField("length", len(receivedTasks)).Debug("End receiving") taskProcessor.StartWork(ctx, receivedTasks) } period := time.NewTicker(app.checkPeriod * time.Hour) defer period.Stop() go func() { process() //immediate start for range period.C { process() } }() //done tasks sender go func() { ticker := time.NewTicker(time.Second * 2) defer ticker.Stop() var sendData []shared.TaskResult for { select { case task := <-sender: sendData = append(sendData, task) case <-ticker.C: l := len(sendData) if l > 0 { log.WithField("length", l).Debug("Sending parsed data") app.network.SendResult(ctx, apiClient, sendData) sendData = sendData[:0] } } } }() //start metrics server go func() { if err := app.metricsSrv.Run(); err != nil { errChan <- err } }() //gRPC Server for status response go func() { listener, err := net.Listen("tcp", addr) if err != nil { errChan <- err } log.Infof("gRPC Server listening at %v", addr) if err = app.taskApiSrv.Serve(listener); err != nil { errChan <- err } }() select { case <-ctx.Done(): app.shutdown(ctx) case err := <-errChan: log.WithError(err).Fatal("Application run error") } } func (app *App) shutdown(ctx context.Context) { log.Info("Shutting down...") app.taskApiSrv.GracefulStop() if err := app.metricsSrv.Shutdown(ctx); err != nil { log.WithError(err).Error("Failed to shutdown server") } }