package processor import ( "context" "encoding/json" log "github.com/sirupsen/logrus" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" "task-processor/internal/structs" "task-processor/internal/taskAgent" "time" ) type service struct { taskAgent taskAgent.TaskAgent brokerAddr rabbit.Address taskPublishers map[string]chan<- []byte rabbitLoggingEnabled bool } func newService(deps Deps) *service { ba := rabbit.Address{ Username: deps.Addr.User, Password: deps.Addr.Pass, Host: deps.Addr.Host, Port: deps.Addr.Port, Vhost: deps.Addr.Vhost, } s := &service{ taskAgent: deps.TA, brokerAddr: ba, rabbitLoggingEnabled: deps.LoggingEnabled, } s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen) return s } func (s *service) ProcessTasks(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) defer cancel() log.Infof("%v Processing tasks", pkgLogHeader) fetchTasks, err := s.taskAgent.FetchTasks(runCtx) if err != nil { log.WithError(err).Errorf("%v Failed to fetch tasks", pkgLogHeader) return err } if err = s.sendTasks(fetchTasks); err != nil { log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) return err } return nil } func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) taskResults := rabbit.QueueOpts{ QueueName: "tasks-results", Durable: false, AutoDelete: false, Exclusive: false, NoWait: false, Args: nil, } consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { cancel() return err } resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName) go func() { defer cancel() sendTicker := time.NewTicker(2 * time.Second) defer sendTicker.Stop() var sendResults []structs.Result for { select { case <-runCtx.Done(): return case result := <-resultChan: r := s.convertResult(result) if r == nil { continue } sendResults = append(sendResults, *r) case <-sendTicker.C: l := len(sendResults) if l > 0 { log.Debugf("%v Sending results: %v", pkgLogHeader, sendResults) if err := s.taskAgent.SendResults(runCtx, sendResults); err != nil { log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) } sendResults = sendResults[:0] } } } }() return nil } func (s *service) sendTasks(tasks []structs.Task) error { for _, tsk := range tasks { for origin, link := range tsk.Origins { if !linkIsValid(link) { continue } if origin == "surugaya" { pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link) } if origin == "mandarake" { pushTask(s.taskPublishers["mandarake"], tsk.MerchUuid, link) } if origin == "amiami" { pushTask(s.taskPublishers["amiami"], tsk.MerchUuid, link) } } } return nil } func (s *service) convertResult(b []byte) *structs.Result { var res structs.Result if err := json.Unmarshal(b, &res); err != nil { log.WithError(err).Error("Failed to unmarshal result") return nil } return &res } // TODO refactor this later: get origins from merch api and remove ctx pass via deps func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { publishers := make(map[string]chan<- []byte) // surugaya surugayaOpts := rabbit.QueueOpts{ QueueName: "task-publisher-surugaya", Durable: false, AutoDelete: false, Exclusive: false, NoWait: false, Args: nil, } surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { log.WithError(err).Error("Failed to create publisher") } publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen) log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName) //mandarake mandarakeOpts := rabbit.QueueOpts{ QueueName: "task-publisher-mandarake", Durable: false, AutoDelete: false, Exclusive: false, NoWait: false, Args: nil, } mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { log.WithError(err).Error("Failed to create publisher") } publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen) log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName) //amiami amiamiOpts := rabbit.QueueOpts{ QueueName: "task-publisher-amiami", Durable: true, AutoDelete: false, Exclusive: false, NoWait: false, Args: nil, } amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { log.WithError(err).Error("Failed to create publisher") } publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen) log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName) s.taskPublishers = publishers } func pushTask(pubChan chan<- []byte, m, l string) { log.Debugf("%v Pushing task: %v", pkgLogHeader, m) t := task{ MerchUuid: m, Link: l, } bytes, err := json.Marshal(t) if err != nil { log.WithError(err).Errorf("%v Failed to marshal task", pkgLogHeader) return } pubChan <- bytes }