From 56309cafb914254b42e741b10bd588e8f2d6f9ba Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 28 Feb 2026 23:32:47 +0300 Subject: [PATCH 1/3] env added --- config.env | 1 + config/config.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/config.env b/config.env index 7633de5..b22e762 100644 --- a/config.env +++ b/config.env @@ -14,6 +14,7 @@ GRPC_SURUGAYA_SCRAPPER_PORT=9070 TASK_RETRY_COUNT=3 TASK_RETRY_MINUTES=5 +TASK_TIMEOUT_MINUTES=5 ORIGIN_SURUGAYA_ENABLED=false ORIGIN_MANDARAKE_ENABLED=false diff --git a/config/config.go b/config/config.go index b19ffe0..f2ebaa4 100644 --- a/config/config.go +++ b/config/config.go @@ -29,6 +29,7 @@ type GrpcConfig struct { type TasksConfig struct { RetryCount int RetryMinutes int + TaskTimeout int } type OriginEnabled struct { @@ -62,6 +63,7 @@ func NewConfig() *Config { TasksCfg: TasksConfig{ RetryCount: getEnvInt("TASK_RETRY_COUNT", 3), RetryMinutes: getEnvInt("TASK_RETRY_MINUTES", 5), + TaskTimeout: getEnvInt("TASK_TIMEOUT_MINUTES", 5), }, OriginEnabled: OriginEnabled{ From f4665665056ba1f984998735a5abb4253a10caa2 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 28 Feb 2026 23:33:22 +0300 Subject: [PATCH 2/3] page timeout added --- internal/app/app.go | 7 ++++--- internal/parsers/mandarake/handleTasks.go | 13 ++++++++----- internal/parsers/mandarake/handler.go | 6 +++++- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 832e338..c7359e1 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -75,9 +75,6 @@ func (app *App) Run(ctx context.Context) { //main apiClient := newApiClient(app.config.GrpcCfg.ApiClientHost + ":" + app.config.GrpcCfg.ApiClientPort) - period := time.NewTicker(app.checkPeriod * time.Hour) - defer period.Stop() - sender := make(chan shared.TaskResult, app.numCPUs*10) defer close(sender) @@ -96,6 +93,7 @@ func (app *App) Run(ctx context.Context) { Enabled: app.config.OriginEnabled.Mandarake, ExternalBrowser: app.config.ExternalBrowser, GoroutinesNumber: app.numCPUs, + TaskTimeout: app.config.TasksCfg.TaskTimeout, }) } @@ -117,6 +115,9 @@ func (app *App) Run(ctx context.Context) { taskProcessor.StartWork(ctx, receivedTasks) } + period := time.NewTicker(app.checkPeriod * time.Hour) + defer period.Stop() + go func() { process() //immediate start for range period.C { diff --git a/internal/parsers/mandarake/handleTasks.go b/internal/parsers/mandarake/handleTasks.go index 1b566e7..1ddd10a 100644 --- a/internal/parsers/mandarake/handleTasks.go +++ b/internal/parsers/mandarake/handleTasks.go @@ -16,9 +16,6 @@ func (s *Parser) HandleTasks(ctx context.Context, tasks []shared.Task, sender ch allocCtx, allocCancel := chromedp.NewRemoteAllocator(ctx, s.externalBrowser) defer allocCancel() - sessionCtx, sessionCancel := chromedp.NewContext(allocCtx /* chromedp.WithLogf(log.Printf) */, chromedp.WithLogf(func(string, ...any) {})) - defer sessionCancel() - receiver := make(chan shared.Task, len(tasks)) for _, task := range tasks { receiver <- task @@ -31,7 +28,7 @@ func (s *Parser) HandleTasks(ctx context.Context, tasks []shared.Task, sender ch wg.Add(1) go func() { defer wg.Done() - s.worker(sessionCtx, receiver, sender) + s.worker(allocCtx, receiver, sender) }() } wg.Wait() @@ -42,14 +39,20 @@ func (s *Parser) HandleTasks(ctx context.Context, tasks []shared.Task, sender ch func (s *Parser) worker(ctx context.Context, receiver chan shared.Task, sender chan shared.TaskResult) { for task := range receiver { + taskCtx, taskCancel := chromedp.NewContext(ctx /* chromedp.WithLogf(log.Printf) */, chromedp.WithLogf(func(string, ...any) {})) + timeoutCtx, timeoutCancel := context.WithTimeout(taskCtx, s.taskTimeout) + log.WithField("task_uuid", task.MerchUuid).Infof("%v %v processing task", logHeader, logWorker) //price will be zeroPrice value in case of any error or if price not found - price := s.getMinimalPrice(ctx, task) + price := s.getMinimalPrice(timeoutCtx, task) sender <- shared.TaskResult{ MerchUuid: task.MerchUuid, Origin: task.Origin, Price: price, } + + timeoutCancel() + taskCancel() } } diff --git a/internal/parsers/mandarake/handler.go b/internal/parsers/mandarake/handler.go index cd6b272..0955206 100644 --- a/internal/parsers/mandarake/handler.go +++ b/internal/parsers/mandarake/handler.go @@ -2,6 +2,7 @@ package mandarake import ( log "github.com/sirupsen/logrus" + "time" ) const ( @@ -16,22 +17,25 @@ const ( type Parser struct { externalBrowser string goroutinesNumber int + taskTimeout time.Duration } type Deps struct { Enabled bool ExternalBrowser string GoroutinesNumber int + TaskTimeout int } func NewParser(deps Deps) *Parser { if !deps.Enabled { - log.Info(logHeader + "disabled") + log.Infof("%v disabled", logHeader) return nil } return &Parser{ externalBrowser: deps.ExternalBrowser, goroutinesNumber: deps.GoroutinesNumber, + taskTimeout: time.Minute * time.Duration(deps.TaskTimeout), } } From 8395cf71b4a7d751987569c1e78c1eee97b1561c Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 28 Feb 2026 23:33:37 +0300 Subject: [PATCH 3/3] pprof --- cmd/main.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cmd/main.go b/cmd/main.go index 92ae164..409b141 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,9 @@ package main import ( "context" + log "github.com/sirupsen/logrus" + "net/http" + _ "net/http/pprof" "os" "os/signal" "syscall" @@ -18,6 +21,12 @@ func main() { logging.LogSetup(c.LogLevel) + if c.Metrics.GinMode != "release" { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + } + appl := app.New(c) appl.Run(ctx)