From c44709373ed723d3bdc0c264e521ea8223f66afc Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 21 Feb 2026 17:00:18 +0300 Subject: [PATCH] run processor --- cmd/main.go | 7 ++--- internal/app/handler.go | 61 +++++++++++++++++++++++------------------ 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 361aeaf..cb377f0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,12 +17,9 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() - appl := app.NewApp(app.Deps{ - Config: c, - RootCtx: ctx, - }) + appl := app.NewApp(ctx, c) - if err := appl.Run(); err != nil { + if err := appl.Run(ctx); err != nil { log.WithError(err).Fatal("Application run failed") } } diff --git a/internal/app/handler.go b/internal/app/handler.go index ed26e55..676bb81 100644 --- a/internal/app/handler.go +++ b/internal/app/handler.go @@ -5,75 +5,82 @@ import ( log "github.com/sirupsen/logrus" "net" "task-processor/config" + "task-processor/internal/processor" ta "task-processor/internal/taskAgent" "time" ) type App struct { - config config.Config - rootCtx context.Context - taskAgent ta.TaskAgent + config config.Config + taskAgent ta.TaskAgent + processor processor.Processor + sendChanLen uint } -type Deps struct { - Config config.Config - RootCtx context.Context -} - -func NewApp(deps Deps) *App { - cfg := deps.Config - +func NewApp(ctx context.Context, cfg config.Config) *App { taskAgent := ta.NewHandler(ta.Deps{ Addr: net.JoinHostPort(cfg.TasksSource.Host, cfg.TasksSource.Port), Timeout: cfg.TasksSource.Timeout, }) + proc := processor.NewHandler(processor.Deps{ + Ctx: ctx, + TA: taskAgent, + Addr: processor.Addr{ + Host: cfg.Rabbit.Host, + Port: cfg.Rabbit.Port, + User: cfg.Rabbit.User, + Pass: cfg.Rabbit.Pass, + Vhost: cfg.Rabbit.Vhost, + }, + ChanLen: cfg.App.ProcChanLen, + }) + return &App{ - config: cfg, - rootCtx: deps.RootCtx, - taskAgent: taskAgent, + config: cfg, + taskAgent: taskAgent, + processor: proc, + sendChanLen: cfg.App.ProcChanLen, } } -func (app *App) Run() error { +func (app *App) Run(ctx context.Context) error { log.Info("App started") - errChan := make(chan error, 3) mainLoop := time.NewTicker(app.config.App.CheckPeriod) defer mainLoop.Stop() go func() { - if err := app.processTasks(); err != nil { + if err := app.processor.ProcessTasks(ctx); err != nil { errChan <- err } for range mainLoop.C { - if err := app.processTasks(); err != nil { + if err := app.processor.ProcessTasks(ctx); err != nil { errChan <- err } } }() + if err := app.processor.SendResults(ctx, app.sendChanLen); err != nil { + errChan <- err + } + select { - case <-app.rootCtx.Done(): - return app.Shutdown() + case <-ctx.Done(): + return app.Shutdown(ctx) case err := <-errChan: return err } } -func (app *App) Shutdown() error { +func (app *App) Shutdown(ctx context.Context) error { log.Info("App shutting down") - ctx, cancel := context.WithTimeout(app.rootCtx, time.Second*10) + ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() _ = ctx return nil } - -func (app *App) processTasks() error { - log.Info("Processing tasks") - return nil -}