package app import ( "context" log "github.com/sirupsen/logrus" "net" "os" "os/signal" "runtime" "syscall" "task-processor/config" "task-processor/internal/appState" "task-processor/internal/network" "task-processor/internal/parsers" "task-processor/internal/processor" "task-processor/internal/shared" "time" ) type App struct { config *config.Config checkPeriod time.Duration startTime time.Time retryCount int retryMinutes int state *appState.State network *network.Network numCPUs int } func New(c *config.Config) *App { numCPUs := c.NumCPUs if numCPUs < 1 { numCPUs = runtime.NumCPU() } st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksCfg.RetryCount, c.TasksCfg.RetryMinutes) return &App{ config: c, checkPeriod: time.Duration(c.CheckPeriod), startTime: time.Now(), retryCount: c.TasksCfg.RetryCount, retryMinutes: c.TasksCfg.RetryMinutes, state: st, network: network.NewHandler(), numCPUs: numCPUs, } } func (app *App) Run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() log.Info("Application start") log.WithFields(log.Fields{ "Service address": app.config.GrpcCfg.ServerHost + ":" + app.config.GrpcCfg.ServerPort, "Number of CPUs": app.numCPUs, }).Debug("App settings") server := newServer(app) apiClient := newApiClient(app.config.GrpcCfg.ApiClientHost + ":" + app.config.GrpcCfg.ApiClientPort) period := time.NewTicker(app.checkPeriod * time.Hour) defer period.Stop() sender := make(chan shared.TaskResult, app.numCPUs*10) // external scrapper surugayaScrapper := newSurugayaScrapperClient(app.config.GrpcCfg.SurugayaScrapperHost + ":" + app.config.GrpcCfg.SurugayaScrapperPort) //task processor handlers := map[string]parsers.TaskHandler{ shared.OriginSurugaya: parsers.NewSurugayaParser(ctx, surugayaScrapper), shared.OriginMandarake: parsers.NewMandarakeParser(app.numCPUs), } taskProcessor := processor.New(processor.Deps{ Handlers: handlers, Out: sender, State: app.state, Ctx: ctx, NumCPUs: app.numCPUs, }) process := func() { app.state.SetStatus(appState.StatusRequestTasks) log.Info("Requesting data for parsing") receivedTasks := app.network.RequestTasks(ctx, apiClient) log.WithField("length", len(receivedTasks)).Debug("End receiving") taskProcessor.StartWork(receivedTasks) } go func() { process() //immediate start for range period.C { process() } }() //done tasks sender go func() { ticker := time.NewTicker(time.Second * 2) defer ticker.Stop() var sendData []shared.TaskResult for { select { case task := <-sender: sendData = append(sendData, task) case <-ticker.C: l := len(sendData) if l > 0 { log.WithField("length", l).Debug("Sending parsed data") app.network.SendResult(apiClient, sendData) sendData = sendData[:0] } } } }() //gRPC Server for status response go func() { listener, err := net.Listen("tcp", app.config.GrpcCfg.ServerHost+":"+app.config.GrpcCfg.ServerPort) if err != nil { log.Fatalf("failed to listen: %v", err) } log.Infof("gRPC Server listening at %v", app.config.GrpcCfg.ServerHost+":"+app.config.GrpcCfg.ServerPort) if err := server.Serve(listener); err != nil { log.Fatalf("failed to serve: %v", err) } }() go func() { sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) <-sigint log.Info("Shutting down...") period.Stop() server.GracefulStop() cancel() }() <-ctx.Done() }