package tasks import ( "context" "encoding/json" log "github.com/sirupsen/logrus" "parser-mandarake/internal/common" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" ) const pkgLogHeader string = "Tasks client |" type Handler struct { consumer rabbit.Consumer publisher rabbit.Publisher chanLen uint } type Deps struct { Username string Password string Host string Port uint16 Vhost string TaskSourceQueue string TaskResultQueue string LoggingEnabled bool ChanLen uint } func New(deps Deps) *Handler { var h Handler h.chanLen = deps.ChanLen if err := h.initConsumer(&deps); err != nil { log.WithError(err).Fatalf("%v Failed to create consumer", pkgLogHeader) panic(err) } if err := h.initProducer(&deps); err != nil { log.WithError(err).Fatalf("%v Failed to create publisher", pkgLogHeader) panic(err) } return &h } func (h *Handler) initConsumer(deps *Deps) error { client, err := rabbit.NewClient(rabbit.Address{ Username: deps.Username, Password: deps.Password, Host: deps.Host, Port: deps.Port, Vhost: deps.Vhost, }, deps.TaskSourceQueue, rabbit.WithLogging(deps.LoggingEnabled)) if err != nil { return err } h.consumer = rabbit.NewConsumer(client) return nil } func (h *Handler) initProducer(deps *Deps) error { client, err := rabbit.NewClient(rabbit.Address{ Username: deps.Username, Password: deps.Password, Host: deps.Host, Port: deps.Port, Vhost: deps.Vhost, }, deps.TaskResultQueue, rabbit.WithLogging(deps.LoggingEnabled)) if err != nil { return err } h.publisher = rabbit.NewPublisher(client) return nil } func (h *Handler) GetTasks(ctx context.Context) <-chan common.Task { getTasks := make(chan common.Task, 100) taskChan := h.consumer.Start(ctx, h.chanLen) go func() { defer close(getTasks) for { select { case <-ctx.Done(): return case bytes, ok := <-taskChan: if !ok { return } var task common.Task if err := json.Unmarshal(bytes, &task); err != nil { log.WithError(err).Errorf("%v Failed to unmarshal task", pkgLogHeader) continue } getTasks <- task } } }() return getTasks } func (h *Handler) SendResult(ctx context.Context, resultChan chan common.Result) { pub := h.publisher.Start(ctx, h.chanLen) go func() { defer close(resultChan) for { select { case <-ctx.Done(): return case result, ok := <-resultChan: if !ok { return } bytes, err := json.Marshal(result) if err != nil { log.WithError(err).Errorf("%v Failed to marshal result", pkgLogHeader) continue } pub <- bytes } } }() }