From 71e2e1b7b140faa5614a2f30db996aededde1442 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 21 Feb 2026 16:57:42 +0300 Subject: [PATCH] package created --- internal/processor/handler.go | 42 +++++ internal/processor/interface.go | 10 ++ internal/processor/model.go | 6 + internal/processor/service.go | 157 ++++++++++++++++++ .../{taskAgent/struct.go => structs/tasks.go} | 2 +- 5 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 internal/processor/handler.go create mode 100644 internal/processor/interface.go create mode 100644 internal/processor/model.go create mode 100644 internal/processor/service.go rename internal/{taskAgent/struct.go => structs/tasks.go} (89%) diff --git a/internal/processor/handler.go b/internal/processor/handler.go new file mode 100644 index 0000000..b008dba --- /dev/null +++ b/internal/processor/handler.go @@ -0,0 +1,42 @@ +package processor + +import ( + "context" + "fmt" + "net" + "task-processor/internal/taskAgent" +) + +const pkgLogHeader string = "Processor |" + +type handler struct { + *service +} + +type Addr struct { + Host string + Port string + User string + Pass string + Vhost string +} + +type Deps struct { + Ctx context.Context + TA taskAgent.TaskAgent + Addr Addr + ChanLen uint +} + +func NewHandler(deps Deps) Processor { + addr := makeAddr(deps.Addr) + + return &handler{ + service: newService(deps, addr), + } +} + +func makeAddr(addr Addr) string { + //"amqp://username:password@host:port/vhost" + return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost) +} diff --git a/internal/processor/interface.go b/internal/processor/interface.go new file mode 100644 index 0000000..fc729d4 --- /dev/null +++ b/internal/processor/interface.go @@ -0,0 +1,10 @@ +package processor + +import ( + "context" +) + +type Processor interface { + ProcessTasks(ctx context.Context) error + SendResults(ctx context.Context, chanLen uint) error +} diff --git a/internal/processor/model.go b/internal/processor/model.go new file mode 100644 index 0000000..1b49219 --- /dev/null +++ b/internal/processor/model.go @@ -0,0 +1,6 @@ +package processor + +type task struct { + MerchUuid string `json:"merch_uuid"` + Link string `json:"link"` +} diff --git a/internal/processor/service.go b/internal/processor/service.go new file mode 100644 index 0000000..1a02ec8 --- /dev/null +++ b/internal/processor/service.go @@ -0,0 +1,157 @@ +package processor + +import ( + "context" + "encoding/json" + "fmt" + log "github.com/sirupsen/logrus" + rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" + "task-processor/internal/structs" + "task-processor/internal/taskAgent" + "time" +) + +type service struct { + taskAgent taskAgent.TaskAgent + brokerAddr string + taskPublishers map[string]chan<- []byte +} + +func newService(deps Deps, addr string) *service { + + return &service{ + taskAgent: deps.TA, + brokerAddr: addr, + taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen), + } +} + +func (s *service) ProcessTasks(ctx context.Context) error { + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + log.Infof("%v Processing tasks", pkgLogHeader) + + fetchTasks, err := s.taskAgent.FetchTasks(runCtx) + if err != nil { + log.WithError(err).Errorf("%v Failed to fetch tasks", pkgLogHeader) + return err + } + + time.Sleep(time.Second * 5) //wait for connect + + if err = s.sendTasks(fetchTasks); err != nil { + log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) + return err + } + + return nil +} + +func (s *service) SendResults(ctx context.Context, chanLen uint) error { + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + resultsConsumer := rabbit.NewConsumer(rabbit.NewClient(s.brokerAddr, "tasks-results")) + resultChan := resultsConsumer.Start(runCtx, chanLen) + + go func() { + sendTicker := time.NewTicker(2 * time.Second) + defer sendTicker.Stop() + + var sendResults []structs.Result + + for { + select { + case <-runCtx.Done(): + return + case result := <-resultChan: + r := s.convertResult(result) + if r == nil { + continue + } + + sendResults = append(sendResults, *r) + + case <-sendTicker.C: + l := len(sendResults) + if l > 0 { + log.Printf("%v Sending results: %v", pkgLogHeader, sendResults) + + if err := s.taskAgent.SendResults(runCtx, sendResults); err != nil { + log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) + } + + sendResults = sendResults[:0] + } + + } + } + }() + + return nil +} + +func (s *service) sendTasks(tasks []structs.Task) error { + for _, tsk := range tasks { + for origin, link := range tsk.Origins { + if origin == "surugaya" { + pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link) + } + + if origin == "mandarake" { + pushTask(s.taskPublishers["mandarake"], tsk.MerchUuid, link) + } + + if origin == "amiami" { + pushTask(s.taskPublishers["amiami"], tsk.MerchUuid, link) + } + } + } + return nil +} + +func (s *service) convertResult(b []byte) *structs.Result { + var res *structs.Result + + if err := json.Unmarshal(b, res); err != nil { + log.WithError(err).Error("Failed to unmarshal result") + return nil + } + + return res +} + +func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte { + origins := [...]string{ + "surugaya", + "mandarake", + "amiami", + } + + publishers := make(map[string]chan<- []byte) + + for _, origin := range origins { + qn := fmt.Sprintf("task-publisher-%s", origin) + publishers[origin] = rabbit.NewPublisher(rabbit.NewClient(addr, qn)).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) + } + + return publishers +} + +func pushTask(pubChan chan<- []byte, m, l string) { + log.Debugf("%v Pushing task: %v", pkgLogHeader, m) + t := task{ + MerchUuid: m, + Link: l, + } + + bytes, err := json.Marshal(t) + if err != nil { + log.WithError(err).Errorf("%v Failed to marshal task", pkgLogHeader) + return + } + + pubChan <- bytes +} diff --git a/internal/taskAgent/struct.go b/internal/structs/tasks.go similarity index 89% rename from internal/taskAgent/struct.go rename to internal/structs/tasks.go index 441500f..bf24834 100644 --- a/internal/taskAgent/struct.go +++ b/internal/structs/tasks.go @@ -1,4 +1,4 @@ -package taskAgent +package structs type Task struct { MerchUuid string