From c8a384efcbfeb1395792af85d650c55e5b8132df Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:28:56 +0300 Subject: [PATCH 1/2] update --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d16b4dc..d25d4a7 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/sirupsen/logrus v1.9.4 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 - repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17 + repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 ) require ( diff --git a/go.sum b/go.sum index 23b2483..80b33d5 100644 --- a/go.sum +++ b/go.sum @@ -52,5 +52,5 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17 h1:f+oJRW0t894F9TjqG4UJAHWLcg1RffoHe5olsmYoltQ= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 h1:y2oQOoQApDXUdZc/9T5fA80pkq2O+0aw3AKbSNrY1aE= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= From 9361319f0a334b0f3d28de953222124cf1a23e2a Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 27 Apr 2026 00:18:39 +0300 Subject: [PATCH 2/2] declare queue with opts quick fix --- go.mod | 2 +- go.sum | 4 +- internal/processor/service.go | 83 ++++++++++++++++++++++++++--------- 3 files changed, 66 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index d25d4a7..9bfcfe1 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/sirupsen/logrus v1.9.4 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 - repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 + repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 ) require ( diff --git a/go.sum b/go.sum index 80b33d5..0ad66c6 100644 --- a/go.sum +++ b/go.sum @@ -52,5 +52,5 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 h1:y2oQOoQApDXUdZc/9T5fA80pkq2O+0aw3AKbSNrY1aE= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= diff --git a/internal/processor/service.go b/internal/processor/service.go index 1cebf06..c7ea017 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -3,7 +3,6 @@ package processor import ( "context" "encoding/json" - "fmt" log "github.com/sirupsen/logrus" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" "task-processor/internal/structs" @@ -62,8 +61,16 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) - qn := "tasks-results" - consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) + taskResults := rabbit.QueueOpts{ + QueueName: "tasks-results", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + } + + consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { cancel() return err @@ -71,7 +78,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) - log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn) + log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName) go func() { defer cancel() @@ -145,26 +152,62 @@ func (s *service) convertResult(b []byte) *structs.Result { // TODO refactor this later: get origins from merch api and remove ctx pass via deps func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { - origins := [...]string{ - "surugaya", - "mandarake", - "amiami", - } - publishers := make(map[string]chan<- []byte) - for _, origin := range origins { - qn := fmt.Sprintf("task-publisher-%s", origin) - pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) - if err != nil { - log.WithError(err).Error("Failed to create publisher") - continue - } - - publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen) - log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) + // surugaya + surugayaOpts := rabbit.QueueOpts{ + QueueName: "task-publisher-surugaya", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, } + surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + } + + publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName) + + //mandarake + mandarakeOpts := rabbit.QueueOpts{ + QueueName: "task-publisher-mandarake", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + } + + mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + } + + publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName) + + //amiami + amiamiOpts := rabbit.QueueOpts{ + QueueName: "task-publisher-amiami", + Durable: true, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + } + + amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + } + + publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName) + s.taskPublishers = publishers }