From 1ad63677847683ec9461ed09dd61189ffd7e1ddb Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 18:01:36 +0300 Subject: [PATCH] new clinety init --- internal/processor/service.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/internal/processor/service.go b/internal/processor/service.go index 1a02ec8..80224e0 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -38,8 +38,6 @@ func (s *service) ProcessTasks(ctx context.Context) error { return err } - time.Sleep(time.Second * 5) //wait for connect - if err = s.sendTasks(fetchTasks); err != nil { log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) return err @@ -49,13 +47,20 @@ func (s *service) ProcessTasks(ctx context.Context) error { } func (s *service) SendResults(ctx context.Context, chanLen uint) error { + log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) - defer cancel() - resultsConsumer := rabbit.NewConsumer(rabbit.NewClient(s.brokerAddr, "tasks-results")) + consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results") + if err != nil { + cancel() + return err + } + + resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) go func() { + defer cancel() sendTicker := time.NewTicker(2 * time.Second) defer sendTicker.Stop() @@ -70,13 +75,12 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { if r == nil { continue } - sendResults = append(sendResults, *r) case <-sendTicker.C: l := len(sendResults) if l > 0 { - log.Printf("%v Sending results: %v", pkgLogHeader, sendResults) + log.Debugf("%v Sending results: %v", pkgLogHeader, sendResults) if err := s.taskAgent.SendResults(runCtx, sendResults); err != nil { log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) @@ -84,7 +88,6 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { sendResults = sendResults[:0] } - } } }() @@ -112,14 +115,14 @@ func (s *service) sendTasks(tasks []structs.Task) error { } func (s *service) convertResult(b []byte) *structs.Result { - var res *structs.Result + var res structs.Result - if err := json.Unmarshal(b, res); err != nil { + if err := json.Unmarshal(b, &res); err != nil { log.WithError(err).Error("Failed to unmarshal result") return nil } - return res + return &res } func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte { @@ -133,7 +136,13 @@ func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[stri for _, origin := range origins { qn := fmt.Sprintf("task-publisher-%s", origin) - publishers[origin] = rabbit.NewPublisher(rabbit.NewClient(addr, qn)).Start(ctx, chanLen) + pubClient, err := rabbit.NewClient(addr, qn) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + continue + } + + publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen) log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) }