diff --git a/internal/processor/handler.go b/internal/processor/handler.go index b008dba..3252672 100644 --- a/internal/processor/handler.go +++ b/internal/processor/handler.go @@ -2,8 +2,6 @@ package processor import ( "context" - "fmt" - "net" "task-processor/internal/taskAgent" ) @@ -15,28 +13,22 @@ type handler struct { type Addr struct { Host string - Port string + Port uint16 User string Pass string Vhost string } type Deps struct { - Ctx context.Context - TA taskAgent.TaskAgent - Addr Addr - ChanLen uint + Ctx context.Context + TA taskAgent.TaskAgent + Addr Addr + ChanLen uint + LoggerEnabled bool } func NewHandler(deps Deps) Processor { - addr := makeAddr(deps.Addr) - return &handler{ - service: newService(deps, addr), + service: newService(deps), } } - -func makeAddr(addr Addr) string { - //"amqp://username:password@host:port/vhost" - return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost) -} diff --git a/internal/processor/service.go b/internal/processor/service.go index 80224e0..01e4ef8 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -12,18 +12,30 @@ import ( ) type service struct { - taskAgent taskAgent.TaskAgent - brokerAddr string - taskPublishers map[string]chan<- []byte + taskAgent taskAgent.TaskAgent + brokerAddr rabbit.Address + taskPublishers map[string]chan<- []byte + rabbitLoggingEnabled bool } -func newService(deps Deps, addr string) *service { - - return &service{ - taskAgent: deps.TA, - brokerAddr: addr, - taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen), +func newService(deps Deps) *service { + ba := rabbit.Address{ + Username: deps.Addr.User, + Password: deps.Addr.Pass, + Host: deps.Addr.Host, + Port: deps.Addr.Port, + Vhost: deps.Addr.Vhost, } + + s := &service{ + taskAgent: deps.TA, + brokerAddr: ba, + rabbitLoggingEnabled: deps.LoggerEnabled, + } + + s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen) + + return s } func (s *service) ProcessTasks(ctx context.Context) error { @@ -50,7 +62,8 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) - consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results") + qn := "tasks-results" + consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { cancel() return err @@ -58,6 +71,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) + log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn) go func() { defer cancel() @@ -125,7 +139,8 @@ func (s *service) convertResult(b []byte) *structs.Result { return &res } -func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte { +// TODO refactor this later: get origins from merch api and remove ctx pass via deps +func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { origins := [...]string{ "surugaya", "mandarake", @@ -136,7 +151,7 @@ func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[stri for _, origin := range origins { qn := fmt.Sprintf("task-publisher-%s", origin) - pubClient, err := rabbit.NewClient(addr, qn) + pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { log.WithError(err).Error("Failed to create publisher") continue @@ -146,7 +161,7 @@ func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[stri log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) } - return publishers + s.taskPublishers = publishers } func pushTask(pubChan chan<- []byte, m, l string) {