2026-04-03 19:06:33 +03:00
|
|
|
package tasks
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
|
2026-04-03 20:53:54 +03:00
|
|
|
"scrapper-mandarake/internal/common"
|
2026-04-03 19:06:33 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const pkgLogHeader string = "Tasks client |"
|
|
|
|
|
|
|
|
|
|
type Handler struct {
|
|
|
|
|
consumer rabbit.Consumer
|
|
|
|
|
publisher rabbit.Publisher
|
|
|
|
|
chanLen uint
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Deps struct {
|
|
|
|
|
Username string
|
|
|
|
|
Password string
|
|
|
|
|
Host string
|
|
|
|
|
Port uint16
|
|
|
|
|
Vhost string
|
|
|
|
|
TaskSourceQueue string
|
|
|
|
|
TaskResultQueue string
|
|
|
|
|
LoggingEnabled bool
|
|
|
|
|
ChanLen uint
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 20:53:54 +03:00
|
|
|
func New(deps Deps) TaskTransport {
|
2026-04-03 19:06:33 +03:00
|
|
|
var h Handler
|
|
|
|
|
|
|
|
|
|
h.chanLen = deps.ChanLen
|
|
|
|
|
|
|
|
|
|
if err := h.initConsumer(&deps); err != nil {
|
|
|
|
|
log.WithError(err).Fatalf("%v Failed to create consumer", pkgLogHeader)
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := h.initProducer(&deps); err != nil {
|
|
|
|
|
log.WithError(err).Fatalf("%v Failed to create publisher", pkgLogHeader)
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &h
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *Handler) initConsumer(deps *Deps) error {
|
|
|
|
|
client, err := rabbit.NewClient(rabbit.Address{
|
|
|
|
|
Username: deps.Username,
|
|
|
|
|
Password: deps.Password,
|
|
|
|
|
Host: deps.Host,
|
|
|
|
|
Port: deps.Port,
|
|
|
|
|
Vhost: deps.Vhost,
|
|
|
|
|
}, deps.TaskSourceQueue, rabbit.WithLogging(deps.LoggingEnabled))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h.consumer = rabbit.NewConsumer(client)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *Handler) initProducer(deps *Deps) error {
|
|
|
|
|
client, err := rabbit.NewClient(rabbit.Address{
|
|
|
|
|
Username: deps.Username,
|
|
|
|
|
Password: deps.Password,
|
|
|
|
|
Host: deps.Host,
|
|
|
|
|
Port: deps.Port,
|
|
|
|
|
Vhost: deps.Vhost,
|
|
|
|
|
}, deps.TaskResultQueue, rabbit.WithLogging(deps.LoggingEnabled))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h.publisher = rabbit.NewPublisher(client)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *Handler) GetTasks(ctx context.Context) <-chan common.Task {
|
|
|
|
|
getTasks := make(chan common.Task, 100)
|
|
|
|
|
|
|
|
|
|
taskChan := h.consumer.Start(ctx, h.chanLen)
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
defer close(getTasks)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case bytes, ok := <-taskChan:
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var task common.Task
|
|
|
|
|
if err := json.Unmarshal(bytes, &task); err != nil {
|
|
|
|
|
log.WithError(err).Errorf("%v Failed to unmarshal task", pkgLogHeader)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
getTasks <- task
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
return getTasks
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-03 20:53:54 +03:00
|
|
|
func (h *Handler) SendResult(ctx context.Context) chan common.Result {
|
2026-04-03 19:06:33 +03:00
|
|
|
pub := h.publisher.Start(ctx, h.chanLen)
|
2026-04-03 20:53:54 +03:00
|
|
|
resultChan := make(chan common.Result, h.chanLen)
|
2026-04-03 19:06:33 +03:00
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
defer close(resultChan)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case result, ok := <-resultChan:
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
bytes, err := json.Marshal(result)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Errorf("%v Failed to marshal result", pkgLogHeader)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
pub <- bytes
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
2026-04-03 20:53:54 +03:00
|
|
|
return resultChan
|
2026-04-03 19:06:33 +03:00
|
|
|
}
|