Compare commits

..

No commits in common. "main" and "v0.2.6" have entirely different histories.
main ... v0.2.6

8 changed files with 47 additions and 108 deletions

View file

@ -19,4 +19,3 @@ RABBIT_PORT=
RABBIT_USER= RABBIT_USER=
RABBIT_PASS= RABBIT_PASS=
RABBIT_VHOST= RABBIT_VHOST=
RABBIT_LOGGING_ENABLED=

View file

@ -33,7 +33,6 @@ type Rabbit struct {
User string User string
Pass string Pass string
Vhost string Vhost string
LoggingEnabled bool
} }
func NewConfig() Config { func NewConfig() Config {
@ -51,10 +50,8 @@ func NewConfig() Config {
}, },
TasksSource: TasksSource{ TasksSource: TasksSource{
//Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"), Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"),
//Port: getEnv("TASK_SOURCE_PORT", "61000"), Port: getEnv("TASK_SOURCE_PORT", "61000"),
Host: getEnv("TASK_SOURCE_HOST", "10.0.0.1"),
Port: getEnv("TASK_SOURCE_PORT", "9099"),
Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60), Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60),
}, },
@ -64,7 +61,6 @@ func NewConfig() Config {
User: getEnv("RABBIT_USER", "taskProcessorDev"), User: getEnv("RABBIT_USER", "taskProcessorDev"),
Pass: getEnv("RABBIT_PASS", "pass1234"), Pass: getEnv("RABBIT_PASS", "pass1234"),
Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"), Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"),
LoggingEnabled: getEnvBool("RABBIT_LOGGING_ENABLED", true),
}, },
} }
} }

View file

@ -50,14 +50,3 @@ func getEnvPort(key string, fallback uint16) uint16 {
} }
return fallback return fallback
} }
func getEnvBool(key string, fallback bool) bool {
if value, ok := os.LookupEnv(key); ok {
val, err := strconv.ParseBool(value)
if err != nil {
return fallback
}
return val
}
return fallback
}

4
go.mod
View file

@ -6,7 +6,7 @@ require (
github.com/sirupsen/logrus v1.9.4 github.com/sirupsen/logrus v1.9.4
google.golang.org/grpc v1.80.0 google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11 google.golang.org/protobuf v1.36.11
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16
) )
require ( require (
@ -15,5 +15,5 @@ require (
golang.org/x/sys v0.42.0 // indirect golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect golang.org/x/text v0.35.0 // indirect
golang.org/x/time v0.15.0 // indirect golang.org/x/time v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
) )

8
go.sum
View file

@ -44,13 +44,13 @@ golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d h1:wT2n40TBqFY6wiwazVK9/iTWbsQrgk5ZfCSVFLO9LQA= google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY= repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16 h1:bqoQZr4kblRyGQFjqBItakGm/p2PP0+68U12lLnuIYM=
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=

View file

@ -33,8 +33,8 @@ func NewApp(ctx context.Context, cfg config.Config) *App {
Pass: cfg.Rabbit.Pass, Pass: cfg.Rabbit.Pass,
Vhost: cfg.Rabbit.Vhost, Vhost: cfg.Rabbit.Vhost,
}, },
LoggingEnabled: cfg.Rabbit.LoggingEnabled,
ChanLen: cfg.App.ProcChanLen, ChanLen: cfg.App.ProcChanLen,
LoggerEnabled: false,
}) })
return &App{ return &App{
@ -53,13 +53,11 @@ func (app *App) Run(ctx context.Context) error {
defer mainLoop.Stop() defer mainLoop.Stop()
go func() { go func() {
log.Info("Process tasks after start")
if err := app.processor.ProcessTasks(ctx); err != nil { if err := app.processor.ProcessTasks(ctx); err != nil {
errChan <- err errChan <- err
} }
for range mainLoop.C { for range mainLoop.C {
log.WithField("period", app.config.App.CheckPeriod).Info("Repeated process tasks")
if err := app.processor.ProcessTasks(ctx); err != nil { if err := app.processor.ProcessTasks(ctx); err != nil {
errChan <- err errChan <- err
} }

View file

@ -24,7 +24,7 @@ type Deps struct {
TA taskAgent.TaskAgent TA taskAgent.TaskAgent
Addr Addr Addr Addr
ChanLen uint ChanLen uint
LoggingEnabled bool LoggerEnabled bool
} }
func NewHandler(deps Deps) Processor { func NewHandler(deps Deps) Processor {

View file

@ -3,6 +3,7 @@ package processor
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
"task-processor/internal/structs" "task-processor/internal/structs"
@ -29,7 +30,7 @@ func newService(deps Deps) *service {
s := &service{ s := &service{
taskAgent: deps.TA, taskAgent: deps.TA,
brokerAddr: ba, brokerAddr: ba,
rabbitLoggingEnabled: deps.LoggingEnabled, rabbitLoggingEnabled: deps.LoggerEnabled,
} }
s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen) s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen)
@ -61,16 +62,8 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
log.Debugf("%v Results sender start", pkgLogHeader) log.Debugf("%v Results sender start", pkgLogHeader)
runCtx, cancel := context.WithCancel(ctx) runCtx, cancel := context.WithCancel(ctx)
taskResults := rabbit.QueueOpts{ qn := "tasks-results"
QueueName: "tasks-results", consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil { if err != nil {
cancel() cancel()
return err return err
@ -78,7 +71,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
resultsConsumer := rabbit.NewConsumer(consumerClient) resultsConsumer := rabbit.NewConsumer(consumerClient)
resultChan := resultsConsumer.Start(runCtx, chanLen) resultChan := resultsConsumer.Start(runCtx, chanLen)
log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName) log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn)
go func() { go func() {
defer cancel() defer cancel()
@ -152,62 +145,26 @@ func (s *service) convertResult(b []byte) *structs.Result {
// TODO refactor this later: get origins from merch api and remove ctx pass via deps // TODO refactor this later: get origins from merch api and remove ctx pass via deps
func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) {
origins := [...]string{
"surugaya",
"mandarake",
"amiami",
}
publishers := make(map[string]chan<- []byte) publishers := make(map[string]chan<- []byte)
// surugaya for _, origin := range origins {
surugayaOpts := rabbit.QueueOpts{ qn := fmt.Sprintf("task-publisher-%s", origin)
QueueName: "task-publisher-surugaya", pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil { if err != nil {
log.WithError(err).Error("Failed to create publisher") log.WithError(err).Error("Failed to create publisher")
continue
} }
publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen) publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen)
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName) log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
//mandarake
mandarakeOpts := rabbit.QueueOpts{
QueueName: "task-publisher-mandarake",
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
} }
mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil {
log.WithError(err).Error("Failed to create publisher")
}
publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen)
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName)
//amiami
amiamiOpts := rabbit.QueueOpts{
QueueName: "task-publisher-amiami",
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil {
log.WithError(err).Error("Failed to create publisher")
}
publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen)
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName)
s.taskPublishers = publishers s.taskPublishers = publishers
} }