From 88db05604a96688407903b7725e3b9f57a50c734 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 19:06:33 +0300 Subject: [PATCH] tasks pkg --- internal/tasks/handler.go | 132 ++++++++++++++++++++++++++++++++++++ internal/tasks/interface.go | 11 +++ 2 files changed, 143 insertions(+) create mode 100644 internal/tasks/handler.go create mode 100644 internal/tasks/interface.go diff --git a/internal/tasks/handler.go b/internal/tasks/handler.go new file mode 100644 index 0000000..35ba365 --- /dev/null +++ b/internal/tasks/handler.go @@ -0,0 +1,132 @@ +package tasks + +import ( + "context" + "encoding/json" + log "github.com/sirupsen/logrus" + "parser-mandarake/internal/common" + rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" +) + +const pkgLogHeader string = "Tasks client |" + +type Handler struct { + consumer rabbit.Consumer + publisher rabbit.Publisher + chanLen uint +} + +type Deps struct { + Username string + Password string + Host string + Port uint16 + Vhost string + TaskSourceQueue string + TaskResultQueue string + LoggingEnabled bool + ChanLen uint +} + +func New(deps Deps) *Handler { + var h Handler + + h.chanLen = deps.ChanLen + + if err := h.initConsumer(&deps); err != nil { + log.WithError(err).Fatalf("%v Failed to create consumer", pkgLogHeader) + panic(err) + } + + if err := h.initProducer(&deps); err != nil { + log.WithError(err).Fatalf("%v Failed to create publisher", pkgLogHeader) + panic(err) + } + + return &h +} + +func (h *Handler) initConsumer(deps *Deps) error { + client, err := rabbit.NewClient(rabbit.Address{ + Username: deps.Username, + Password: deps.Password, + Host: deps.Host, + Port: deps.Port, + Vhost: deps.Vhost, + }, deps.TaskSourceQueue, rabbit.WithLogging(deps.LoggingEnabled)) + if err != nil { + return err + } + + h.consumer = rabbit.NewConsumer(client) + return nil +} + +func (h *Handler) initProducer(deps *Deps) error { + client, err := rabbit.NewClient(rabbit.Address{ + Username: deps.Username, + Password: deps.Password, + Host: deps.Host, + Port: deps.Port, + Vhost: deps.Vhost, + }, deps.TaskResultQueue, rabbit.WithLogging(deps.LoggingEnabled)) + if err != nil { + return err + } + + h.publisher = rabbit.NewPublisher(client) + return nil +} + +func (h *Handler) GetTasks(ctx context.Context) <-chan common.Task { + getTasks := make(chan common.Task, 100) + + taskChan := h.consumer.Start(ctx, h.chanLen) + + go func() { + defer close(getTasks) + for { + select { + case <-ctx.Done(): + return + case bytes, ok := <-taskChan: + if !ok { + return + } + var task common.Task + if err := json.Unmarshal(bytes, &task); err != nil { + log.WithError(err).Errorf("%v Failed to unmarshal task", pkgLogHeader) + continue + } + + getTasks <- task + } + } + }() + + return getTasks +} + +func (h *Handler) SendResult(ctx context.Context, resultChan chan common.Result) { + pub := h.publisher.Start(ctx, h.chanLen) + + go func() { + defer close(resultChan) + for { + select { + case <-ctx.Done(): + return + case result, ok := <-resultChan: + if !ok { + return + } + bytes, err := json.Marshal(result) + if err != nil { + log.WithError(err).Errorf("%v Failed to marshal result", pkgLogHeader) + continue + } + pub <- bytes + } + } + }() +} diff --git a/internal/tasks/interface.go b/internal/tasks/interface.go new file mode 100644 index 0000000..2e0c116 --- /dev/null +++ b/internal/tasks/interface.go @@ -0,0 +1,11 @@ +package tasks + +import ( + "context" + "parser-mandarake/internal/common" +) + +type Tasker interface { + GetTasks(ctx context.Context) <-chan common.Task + SendResult(ctx context.Context, resultChan chan common.Result) +}