package app import ( "context" log "github.com/sirupsen/logrus" "net" "os" "os/signal" "parsing-service/config" "parsing-service/internal/appState" "parsing-service/internal/network" "parsing-service/internal/parsers" "parsing-service/internal/processor" "parsing-service/internal/shared" "runtime" "syscall" "time" ) type App struct { ClientAddress string ServerAddress string NumCPUs int CheckPeriod time.Duration StartTime time.Time RetryCount int RetryMinutes int State *appState.State Network *network.Network } func New(c *config.Config) *App { numCPUs := c.NumCPUs if numCPUs < 1 { numCPUs = runtime.NumCPU() } st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksConfig.RetryCount, c.TasksConfig.RetryMinutes) return &App{ ClientAddress: c.Host + ":" + c.ClientPort, ServerAddress: c.Host + ":" + c.ServerPort, NumCPUs: numCPUs, CheckPeriod: time.Duration(c.CheckPeriod), StartTime: time.Now(), RetryCount: c.TasksConfig.RetryCount, RetryMinutes: c.TasksConfig.RetryMinutes, State: st, Network: network.NewHandler(), } } func (app *App) Run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() log.Info("Application start") log.WithFields(log.Fields{ "ClientAddress": app.ClientAddress, "Number of CPUs": app.NumCPUs, }).Debug("App settings") server := newServer(app) client := newClient(app) period := time.NewTicker(app.CheckPeriod * time.Hour) defer period.Stop() sender := make(chan shared.TaskResult, app.NumCPUs*10) //task processor handlers := map[string]parsers.TaskHandler{ shared.OriginSurugaya: parsers.NewSurugayaParser(), shared.OriginMandarake: parsers.NewMandarakeParser(), } taskProcessor := processor.New(processor.Deps{ Handlers: handlers, Out: sender, State: app.State, Ctx: ctx, Client: client, NumCPUs: app.NumCPUs, }) process := func() { app.State.SetStatus(appState.StatusRequestTasks) log.Info("Requesting data for parsing") receivedTasks := app.Network.RequestTasks(ctx, client) log.WithField("length", len(receivedTasks)).Debug("End receiving") taskProcessor.StartWork(receivedTasks) } go func() { process() //immediate start for range period.C { process() } }() //done tasks sender go func() { ticker := time.NewTicker(time.Second * 2) defer ticker.Stop() var sendData []shared.TaskResult for { select { case task := <-sender: sendData = append(sendData, task) case <-ticker.C: l := len(sendData) if l > 0 { log.WithField("length", l).Debug("Sending parsed data") app.Network.SendResult(client, sendData) sendData = sendData[:0] } } } }() //gRPC Server for status response go func() { listener, err := net.Listen("tcp", app.ServerAddress) if err != nil { log.Fatalf("failed to listen: %v", err) } log.Infof("gRPC Server listening at %v", app.ServerAddress) if err := server.Serve(listener); err != nil { log.Fatalf("failed to serve: %v", err) } }() go func() { sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) <-sigint log.Info("Shutting down...") period.Stop() server.GracefulStop() cancel() }() <-ctx.Done() }