address format and deps change + logger option
This commit is contained in:
parent
f70d53c75c
commit
5fe38af035
2 changed files with 35 additions and 28 deletions
|
|
@ -2,8 +2,6 @@ package processor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"task-processor/internal/taskAgent"
|
"task-processor/internal/taskAgent"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -15,28 +13,22 @@ type handler struct {
|
||||||
|
|
||||||
type Addr struct {
|
type Addr struct {
|
||||||
Host string
|
Host string
|
||||||
Port string
|
Port uint16
|
||||||
User string
|
User string
|
||||||
Pass string
|
Pass string
|
||||||
Vhost string
|
Vhost string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Deps struct {
|
type Deps struct {
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
TA taskAgent.TaskAgent
|
TA taskAgent.TaskAgent
|
||||||
Addr Addr
|
Addr Addr
|
||||||
ChanLen uint
|
ChanLen uint
|
||||||
|
LoggerEnabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandler(deps Deps) Processor {
|
func NewHandler(deps Deps) Processor {
|
||||||
addr := makeAddr(deps.Addr)
|
|
||||||
|
|
||||||
return &handler{
|
return &handler{
|
||||||
service: newService(deps, addr),
|
service: newService(deps),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeAddr(addr Addr) string {
|
|
||||||
//"amqp://username:password@host:port/vhost"
|
|
||||||
return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -12,18 +12,30 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
taskAgent taskAgent.TaskAgent
|
taskAgent taskAgent.TaskAgent
|
||||||
brokerAddr string
|
brokerAddr rabbit.Address
|
||||||
taskPublishers map[string]chan<- []byte
|
taskPublishers map[string]chan<- []byte
|
||||||
|
rabbitLoggingEnabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newService(deps Deps, addr string) *service {
|
func newService(deps Deps) *service {
|
||||||
|
ba := rabbit.Address{
|
||||||
return &service{
|
Username: deps.Addr.User,
|
||||||
taskAgent: deps.TA,
|
Password: deps.Addr.Pass,
|
||||||
brokerAddr: addr,
|
Host: deps.Addr.Host,
|
||||||
taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen),
|
Port: deps.Addr.Port,
|
||||||
|
Vhost: deps.Addr.Vhost,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s := &service{
|
||||||
|
taskAgent: deps.TA,
|
||||||
|
brokerAddr: ba,
|
||||||
|
rabbitLoggingEnabled: deps.LoggerEnabled,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen)
|
||||||
|
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) ProcessTasks(ctx context.Context) error {
|
func (s *service) ProcessTasks(ctx context.Context) error {
|
||||||
|
|
@ -50,7 +62,8 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||||
log.Debugf("%v Results sender start", pkgLogHeader)
|
log.Debugf("%v Results sender start", pkgLogHeader)
|
||||||
runCtx, cancel := context.WithCancel(ctx)
|
runCtx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results")
|
qn := "tasks-results"
|
||||||
|
consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return err
|
return err
|
||||||
|
|
@ -58,6 +71,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||||
|
|
||||||
resultsConsumer := rabbit.NewConsumer(consumerClient)
|
resultsConsumer := rabbit.NewConsumer(consumerClient)
|
||||||
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
||||||
|
log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
@ -125,7 +139,8 @@ func (s *service) convertResult(b []byte) *structs.Result {
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte {
|
// TODO refactor this later: get origins from merch api and remove ctx pass via deps
|
||||||
|
func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) {
|
||||||
origins := [...]string{
|
origins := [...]string{
|
||||||
"surugaya",
|
"surugaya",
|
||||||
"mandarake",
|
"mandarake",
|
||||||
|
|
@ -136,7 +151,7 @@ func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[stri
|
||||||
|
|
||||||
for _, origin := range origins {
|
for _, origin := range origins {
|
||||||
qn := fmt.Sprintf("task-publisher-%s", origin)
|
qn := fmt.Sprintf("task-publisher-%s", origin)
|
||||||
pubClient, err := rabbit.NewClient(addr, qn)
|
pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to create publisher")
|
log.WithError(err).Error("Failed to create publisher")
|
||||||
continue
|
continue
|
||||||
|
|
@ -146,7 +161,7 @@ func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[stri
|
||||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
|
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
|
||||||
}
|
}
|
||||||
|
|
||||||
return publishers
|
s.taskPublishers = publishers
|
||||||
}
|
}
|
||||||
|
|
||||||
func pushTask(pubChan chan<- []byte, m, l string) {
|
func pushTask(pubChan chan<- []byte, m, l string) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue