Compare commits
4 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9361319f0a | ||
|
|
c8a384efcb | ||
|
|
98bb6a5bc0 | ||
|
|
b29e60b712 |
8 changed files with 108 additions and 47 deletions
|
|
@ -19,3 +19,4 @@ RABBIT_PORT=
|
|||
RABBIT_USER=
|
||||
RABBIT_PASS=
|
||||
RABBIT_VHOST=
|
||||
RABBIT_LOGGING_ENABLED=
|
||||
|
|
@ -33,6 +33,7 @@ type Rabbit struct {
|
|||
User string
|
||||
Pass string
|
||||
Vhost string
|
||||
LoggingEnabled bool
|
||||
}
|
||||
|
||||
func NewConfig() Config {
|
||||
|
|
@ -50,8 +51,10 @@ func NewConfig() Config {
|
|||
},
|
||||
|
||||
TasksSource: TasksSource{
|
||||
Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"),
|
||||
Port: getEnv("TASK_SOURCE_PORT", "61000"),
|
||||
//Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"),
|
||||
//Port: getEnv("TASK_SOURCE_PORT", "61000"),
|
||||
Host: getEnv("TASK_SOURCE_HOST", "10.0.0.1"),
|
||||
Port: getEnv("TASK_SOURCE_PORT", "9099"),
|
||||
Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60),
|
||||
},
|
||||
|
||||
|
|
@ -61,6 +64,7 @@ func NewConfig() Config {
|
|||
User: getEnv("RABBIT_USER", "taskProcessorDev"),
|
||||
Pass: getEnv("RABBIT_PASS", "pass1234"),
|
||||
Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"),
|
||||
LoggingEnabled: getEnvBool("RABBIT_LOGGING_ENABLED", true),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,3 +50,14 @@ func getEnvPort(key string, fallback uint16) uint16 {
|
|||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func getEnvBool(key string, fallback bool) bool {
|
||||
if value, ok := os.LookupEnv(key); ok {
|
||||
val, err := strconv.ParseBool(value)
|
||||
if err != nil {
|
||||
return fallback
|
||||
}
|
||||
return val
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
|
|
|||
4
go.mod
4
go.mod
|
|
@ -6,7 +6,7 @@ require (
|
|||
github.com/sirupsen/logrus v1.9.4
|
||||
google.golang.org/grpc v1.80.0
|
||||
google.golang.org/protobuf v1.36.11
|
||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16
|
||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
@ -15,5 +15,5 @@ require (
|
|||
golang.org/x/sys v0.42.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
golang.org/x/time v0.15.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect
|
||||
)
|
||||
|
|
|
|||
8
go.sum
8
go.sum
|
|
@ -44,13 +44,13 @@ golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
|
|||
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
|
||||
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
|
||||
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d h1:wT2n40TBqFY6wiwazVK9/iTWbsQrgk5ZfCSVFLO9LQA=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
||||
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
|
||||
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16 h1:bqoQZr4kblRyGQFjqBItakGm/p2PP0+68U12lLnuIYM=
|
||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=
|
||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY=
|
||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=
|
||||
|
|
|
|||
|
|
@ -33,8 +33,8 @@ func NewApp(ctx context.Context, cfg config.Config) *App {
|
|||
Pass: cfg.Rabbit.Pass,
|
||||
Vhost: cfg.Rabbit.Vhost,
|
||||
},
|
||||
LoggingEnabled: cfg.Rabbit.LoggingEnabled,
|
||||
ChanLen: cfg.App.ProcChanLen,
|
||||
LoggerEnabled: false,
|
||||
})
|
||||
|
||||
return &App{
|
||||
|
|
@ -53,11 +53,13 @@ func (app *App) Run(ctx context.Context) error {
|
|||
defer mainLoop.Stop()
|
||||
|
||||
go func() {
|
||||
log.Info("Process tasks after start")
|
||||
if err := app.processor.ProcessTasks(ctx); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
for range mainLoop.C {
|
||||
log.WithField("period", app.config.App.CheckPeriod).Info("Repeated process tasks")
|
||||
if err := app.processor.ProcessTasks(ctx); err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ type Deps struct {
|
|||
TA taskAgent.TaskAgent
|
||||
Addr Addr
|
||||
ChanLen uint
|
||||
LoggerEnabled bool
|
||||
LoggingEnabled bool
|
||||
}
|
||||
|
||||
func NewHandler(deps Deps) Processor {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package processor
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
|
||||
"task-processor/internal/structs"
|
||||
|
|
@ -30,7 +29,7 @@ func newService(deps Deps) *service {
|
|||
s := &service{
|
||||
taskAgent: deps.TA,
|
||||
brokerAddr: ba,
|
||||
rabbitLoggingEnabled: deps.LoggerEnabled,
|
||||
rabbitLoggingEnabled: deps.LoggingEnabled,
|
||||
}
|
||||
|
||||
s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen)
|
||||
|
|
@ -62,8 +61,16 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
|||
log.Debugf("%v Results sender start", pkgLogHeader)
|
||||
runCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
qn := "tasks-results"
|
||||
consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||
taskResults := rabbit.QueueOpts{
|
||||
QueueName: "tasks-results",
|
||||
Durable: false,
|
||||
AutoDelete: false,
|
||||
Exclusive: false,
|
||||
NoWait: false,
|
||||
Args: nil,
|
||||
}
|
||||
|
||||
consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
|
|
@ -71,7 +78,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
|||
|
||||
resultsConsumer := rabbit.NewConsumer(consumerClient)
|
||||
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
||||
log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn)
|
||||
log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName)
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
|
|
@ -145,26 +152,62 @@ func (s *service) convertResult(b []byte) *structs.Result {
|
|||
|
||||
// TODO refactor this later: get origins from merch api and remove ctx pass via deps
|
||||
func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) {
|
||||
origins := [...]string{
|
||||
"surugaya",
|
||||
"mandarake",
|
||||
"amiami",
|
||||
}
|
||||
|
||||
publishers := make(map[string]chan<- []byte)
|
||||
|
||||
for _, origin := range origins {
|
||||
qn := fmt.Sprintf("task-publisher-%s", origin)
|
||||
pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create publisher")
|
||||
continue
|
||||
// surugaya
|
||||
surugayaOpts := rabbit.QueueOpts{
|
||||
QueueName: "task-publisher-surugaya",
|
||||
Durable: false,
|
||||
AutoDelete: false,
|
||||
Exclusive: false,
|
||||
NoWait: false,
|
||||
Args: nil,
|
||||
}
|
||||
|
||||
publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen)
|
||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
|
||||
surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create publisher")
|
||||
}
|
||||
|
||||
publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen)
|
||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName)
|
||||
|
||||
//mandarake
|
||||
mandarakeOpts := rabbit.QueueOpts{
|
||||
QueueName: "task-publisher-mandarake",
|
||||
Durable: false,
|
||||
AutoDelete: false,
|
||||
Exclusive: false,
|
||||
NoWait: false,
|
||||
Args: nil,
|
||||
}
|
||||
|
||||
mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create publisher")
|
||||
}
|
||||
|
||||
publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen)
|
||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName)
|
||||
|
||||
//amiami
|
||||
amiamiOpts := rabbit.QueueOpts{
|
||||
QueueName: "task-publisher-amiami",
|
||||
Durable: true,
|
||||
AutoDelete: false,
|
||||
Exclusive: false,
|
||||
NoWait: false,
|
||||
Args: nil,
|
||||
}
|
||||
|
||||
amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create publisher")
|
||||
}
|
||||
|
||||
publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen)
|
||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName)
|
||||
|
||||
s.taskPublishers = publishers
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue