scrapper-mandarake/internal/tasks/handler.go

137 lines
2.8 KiB
Go
Raw Permalink Normal View History

2026-04-03 19:06:33 +03:00
package tasks
import (
"context"
"encoding/json"
log "github.com/sirupsen/logrus"
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
2026-04-03 20:53:54 +03:00
"scrapper-mandarake/internal/common"
2026-04-03 19:06:33 +03:00
)
const pkgLogHeader string = "Tasks client |"
type Handler struct {
consumer rabbit.Consumer
publisher rabbit.Publisher
chanLen uint
}
type Deps struct {
Username string
Password string
Host string
Port uint16
Vhost string
TaskSourceQueue string
TaskResultQueue string
LoggingEnabled bool
ChanLen uint
}
2026-04-03 20:53:54 +03:00
func New(deps Deps) TaskTransport {
2026-04-03 19:06:33 +03:00
var h Handler
h.chanLen = deps.ChanLen
if err := h.initConsumer(&deps); err != nil {
log.WithError(err).Fatalf("%v Failed to create consumer", pkgLogHeader)
panic(err)
}
if err := h.initProducer(&deps); err != nil {
log.WithError(err).Fatalf("%v Failed to create publisher", pkgLogHeader)
panic(err)
}
return &h
}
func (h *Handler) initConsumer(deps *Deps) error {
2026-04-08 12:17:25 +03:00
log.WithField("enabled", deps.LoggingEnabled).Debugf("%v rabbit mq logger", pkgLogHeader)
2026-04-03 19:06:33 +03:00
client, err := rabbit.NewClient(rabbit.Address{
Username: deps.Username,
Password: deps.Password,
Host: deps.Host,
Port: deps.Port,
Vhost: deps.Vhost,
}, deps.TaskSourceQueue, rabbit.WithLogging(deps.LoggingEnabled))
if err != nil {
return err
}
h.consumer = rabbit.NewConsumer(client)
return nil
}
func (h *Handler) initProducer(deps *Deps) error {
client, err := rabbit.NewClient(rabbit.Address{
Username: deps.Username,
Password: deps.Password,
Host: deps.Host,
Port: deps.Port,
Vhost: deps.Vhost,
}, deps.TaskResultQueue, rabbit.WithLogging(deps.LoggingEnabled))
if err != nil {
return err
}
h.publisher = rabbit.NewPublisher(client)
return nil
}
func (h *Handler) GetTasks(ctx context.Context) <-chan common.Task {
getTasks := make(chan common.Task, 100)
taskChan := h.consumer.Start(ctx, h.chanLen)
go func() {
defer close(getTasks)
for {
select {
case <-ctx.Done():
return
case bytes, ok := <-taskChan:
if !ok {
return
}
var task common.Task
if err := json.Unmarshal(bytes, &task); err != nil {
log.WithError(err).Errorf("%v Failed to unmarshal task", pkgLogHeader)
continue
}
getTasks <- task
}
}
}()
return getTasks
}
2026-04-03 20:53:54 +03:00
func (h *Handler) SendResult(ctx context.Context) chan common.Result {
2026-04-03 19:06:33 +03:00
pub := h.publisher.Start(ctx, h.chanLen)
2026-04-03 20:53:54 +03:00
resultChan := make(chan common.Result, h.chanLen)
2026-04-03 19:06:33 +03:00
go func() {
defer close(resultChan)
for {
select {
case <-ctx.Done():
return
case result, ok := <-resultChan:
if !ok {
return
}
bytes, err := json.Marshal(result)
if err != nil {
log.WithError(err).Errorf("%v Failed to marshal result", pkgLogHeader)
continue
}
pub <- bytes
}
}
}()
2026-04-03 20:53:54 +03:00
return resultChan
2026-04-03 19:06:33 +03:00
}