package processor import ( "context" "encoding/json" "fmt" log "github.com/sirupsen/logrus" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" "task-processor/internal/structs" "task-processor/internal/taskAgent" "time" ) type service struct { taskAgent taskAgent.TaskAgent brokerAddr string taskPublishers map[string]chan<- []byte } func newService(deps Deps, addr string) *service { return &service{ taskAgent: deps.TA, brokerAddr: addr, taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen), } } func (s *service) ProcessTasks(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) defer cancel() log.Infof("%v Processing tasks", pkgLogHeader) fetchTasks, err := s.taskAgent.FetchTasks(runCtx) if err != nil { log.WithError(err).Errorf("%v Failed to fetch tasks", pkgLogHeader) return err } if err = s.sendTasks(fetchTasks); err != nil { log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) return err } return nil } func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results") if err != nil { cancel() return err } resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) go func() { defer cancel() sendTicker := time.NewTicker(2 * time.Second) defer sendTicker.Stop() var sendResults []structs.Result for { select { case <-runCtx.Done(): return case result := <-resultChan: r := s.convertResult(result) if r == nil { continue } sendResults = append(sendResults, *r) case <-sendTicker.C: l := len(sendResults) if l > 0 { log.Debugf("%v Sending results: %v", pkgLogHeader, sendResults) if err := s.taskAgent.SendResults(runCtx, sendResults); err != nil { log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) } sendResults = sendResults[:0] } } } }() return nil } func (s *service) sendTasks(tasks []structs.Task) error { for _, tsk := range tasks { for origin, link := range tsk.Origins { if origin == "surugaya" { pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link) } if origin == "mandarake" { pushTask(s.taskPublishers["mandarake"], tsk.MerchUuid, link) } if origin == "amiami" { pushTask(s.taskPublishers["amiami"], tsk.MerchUuid, link) } } } return nil } func (s *service) convertResult(b []byte) *structs.Result { var res structs.Result if err := json.Unmarshal(b, &res); err != nil { log.WithError(err).Error("Failed to unmarshal result") return nil } return &res } func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte { origins := [...]string{ "surugaya", "mandarake", "amiami", } publishers := make(map[string]chan<- []byte) for _, origin := range origins { qn := fmt.Sprintf("task-publisher-%s", origin) pubClient, err := rabbit.NewClient(addr, qn) if err != nil { log.WithError(err).Error("Failed to create publisher") continue } publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen) log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) } return publishers } func pushTask(pubChan chan<- []byte, m, l string) { log.Debugf("%v Pushing task: %v", pkgLogHeader, m) t := task{ MerchUuid: m, Link: l, } bytes, err := json.Marshal(t) if err != nil { log.WithError(err).Errorf("%v Failed to marshal task", pkgLogHeader) return } pubChan <- bytes }