diff --git a/internal/app/app.go b/internal/app/app.go index 832e338..c7359e1 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -75,9 +75,6 @@ func (app *App) Run(ctx context.Context) { //main apiClient := newApiClient(app.config.GrpcCfg.ApiClientHost + ":" + app.config.GrpcCfg.ApiClientPort) - period := time.NewTicker(app.checkPeriod * time.Hour) - defer period.Stop() - sender := make(chan shared.TaskResult, app.numCPUs*10) defer close(sender) @@ -96,6 +93,7 @@ func (app *App) Run(ctx context.Context) { Enabled: app.config.OriginEnabled.Mandarake, ExternalBrowser: app.config.ExternalBrowser, GoroutinesNumber: app.numCPUs, + TaskTimeout: app.config.TasksCfg.TaskTimeout, }) } @@ -117,6 +115,9 @@ func (app *App) Run(ctx context.Context) { taskProcessor.StartWork(ctx, receivedTasks) } + period := time.NewTicker(app.checkPeriod * time.Hour) + defer period.Stop() + go func() { process() //immediate start for range period.C { diff --git a/internal/parsers/mandarake/handleTasks.go b/internal/parsers/mandarake/handleTasks.go index 1b566e7..1ddd10a 100644 --- a/internal/parsers/mandarake/handleTasks.go +++ b/internal/parsers/mandarake/handleTasks.go @@ -16,9 +16,6 @@ func (s *Parser) HandleTasks(ctx context.Context, tasks []shared.Task, sender ch allocCtx, allocCancel := chromedp.NewRemoteAllocator(ctx, s.externalBrowser) defer allocCancel() - sessionCtx, sessionCancel := chromedp.NewContext(allocCtx /* chromedp.WithLogf(log.Printf) */, chromedp.WithLogf(func(string, ...any) {})) - defer sessionCancel() - receiver := make(chan shared.Task, len(tasks)) for _, task := range tasks { receiver <- task @@ -31,7 +28,7 @@ func (s *Parser) HandleTasks(ctx context.Context, tasks []shared.Task, sender ch wg.Add(1) go func() { defer wg.Done() - s.worker(sessionCtx, receiver, sender) + s.worker(allocCtx, receiver, sender) }() } wg.Wait() @@ -42,14 +39,20 @@ func (s *Parser) HandleTasks(ctx context.Context, tasks []shared.Task, sender ch func (s *Parser) worker(ctx context.Context, receiver chan shared.Task, sender chan shared.TaskResult) { for task := range receiver { + taskCtx, taskCancel := chromedp.NewContext(ctx /* chromedp.WithLogf(log.Printf) */, chromedp.WithLogf(func(string, ...any) {})) + timeoutCtx, timeoutCancel := context.WithTimeout(taskCtx, s.taskTimeout) + log.WithField("task_uuid", task.MerchUuid).Infof("%v %v processing task", logHeader, logWorker) //price will be zeroPrice value in case of any error or if price not found - price := s.getMinimalPrice(ctx, task) + price := s.getMinimalPrice(timeoutCtx, task) sender <- shared.TaskResult{ MerchUuid: task.MerchUuid, Origin: task.Origin, Price: price, } + + timeoutCancel() + taskCancel() } } diff --git a/internal/parsers/mandarake/handler.go b/internal/parsers/mandarake/handler.go index cd6b272..0955206 100644 --- a/internal/parsers/mandarake/handler.go +++ b/internal/parsers/mandarake/handler.go @@ -2,6 +2,7 @@ package mandarake import ( log "github.com/sirupsen/logrus" + "time" ) const ( @@ -16,22 +17,25 @@ const ( type Parser struct { externalBrowser string goroutinesNumber int + taskTimeout time.Duration } type Deps struct { Enabled bool ExternalBrowser string GoroutinesNumber int + TaskTimeout int } func NewParser(deps Deps) *Parser { if !deps.Enabled { - log.Info(logHeader + "disabled") + log.Infof("%v disabled", logHeader) return nil } return &Parser{ externalBrowser: deps.ExternalBrowser, goroutinesNumber: deps.GoroutinesNumber, + taskTimeout: time.Minute * time.Duration(deps.TaskTimeout), } }