package processor import ( "context" "encoding/json" "fmt" log "github.com/sirupsen/logrus" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" "task-processor/internal/structs" "task-processor/internal/taskAgent" "time" ) type service struct { taskAgent taskAgent.TaskAgent brokerAddr rabbit.Address taskPublishers map[string]chan<- []byte rabbitLoggingEnabled bool } func newService(deps Deps) *service { ba := rabbit.Address{ Username: deps.Addr.User, Password: deps.Addr.Pass, Host: deps.Addr.Host, Port: deps.Addr.Port, Vhost: deps.Addr.Vhost, } s := &service{ taskAgent: deps.TA, brokerAddr: ba, rabbitLoggingEnabled: deps.LoggerEnabled, } s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen) return s } func (s *service) ProcessTasks(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) defer cancel() log.Infof("%v Processing tasks", pkgLogHeader) fetchTasks, err := s.taskAgent.FetchTasks(runCtx) if err != nil { log.WithError(err).Errorf("%v Failed to fetch tasks", pkgLogHeader) return err } if err = s.sendTasks(fetchTasks); err != nil { log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) return err } return nil } func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) qn := "tasks-results" consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { cancel() return err } resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn) go func() { defer cancel() sendTicker := time.NewTicker(2 * time.Second) defer sendTicker.Stop() var sendResults []structs.Result for { select { case <-runCtx.Done(): return case result := <-resultChan: r := s.convertResult(result) if r == nil { continue } sendResults = append(sendResults, *r) case <-sendTicker.C: l := len(sendResults) if l > 0 { log.Debugf("%v Sending results: %v", pkgLogHeader, sendResults) if err := s.taskAgent.SendResults(runCtx, sendResults); err != nil { log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) } sendResults = sendResults[:0] } } } }() return nil } func (s *service) sendTasks(tasks []structs.Task) error { for _, tsk := range tasks { for origin, link := range tsk.Origins { if !linkIsValid(link) { continue } if origin == "surugaya" { pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link) } if origin == "mandarake" { pushTask(s.taskPublishers["mandarake"], tsk.MerchUuid, link) } if origin == "amiami" { pushTask(s.taskPublishers["amiami"], tsk.MerchUuid, link) } } } return nil } func (s *service) convertResult(b []byte) *structs.Result { var res structs.Result if err := json.Unmarshal(b, &res); err != nil { log.WithError(err).Error("Failed to unmarshal result") return nil } return &res } // TODO refactor this later: get origins from merch api and remove ctx pass via deps func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { origins := [...]string{ "surugaya", "mandarake", "amiami", } publishers := make(map[string]chan<- []byte) for _, origin := range origins { qn := fmt.Sprintf("task-publisher-%s", origin) pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { log.WithError(err).Error("Failed to create publisher") continue } publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen) log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) } s.taskPublishers = publishers } func pushTask(pubChan chan<- []byte, m, l string) { log.Debugf("%v Pushing task: %v", pkgLogHeader, m) t := task{ MerchUuid: m, Link: l, } bytes, err := json.Marshal(t) if err != nil { log.WithError(err).Errorf("%v Failed to marshal task", pkgLogHeader) return } pubChan <- bytes }