Compare commits
No commits in common. "main" and "v0.2.8" have entirely different histories.
3 changed files with 23 additions and 66 deletions
2
go.mod
2
go.mod
|
|
@ -6,7 +6,7 @@ require (
|
||||||
github.com/sirupsen/logrus v1.9.4
|
github.com/sirupsen/logrus v1.9.4
|
||||||
google.golang.org/grpc v1.80.0
|
google.golang.org/grpc v1.80.0
|
||||||
google.golang.org/protobuf v1.36.11
|
google.golang.org/protobuf v1.36.11
|
||||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19
|
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -52,5 +52,5 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN
|
||||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY=
|
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 h1:y2oQOoQApDXUdZc/9T5fA80pkq2O+0aw3AKbSNrY1aE=
|
||||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=
|
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package processor
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
|
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
|
||||||
"task-processor/internal/structs"
|
"task-processor/internal/structs"
|
||||||
|
|
@ -61,16 +62,8 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||||
log.Debugf("%v Results sender start", pkgLogHeader)
|
log.Debugf("%v Results sender start", pkgLogHeader)
|
||||||
runCtx, cancel := context.WithCancel(ctx)
|
runCtx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
taskResults := rabbit.QueueOpts{
|
qn := "tasks-results"
|
||||||
QueueName: "tasks-results",
|
consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||||
Durable: false,
|
|
||||||
AutoDelete: false,
|
|
||||||
Exclusive: false,
|
|
||||||
NoWait: false,
|
|
||||||
Args: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return err
|
return err
|
||||||
|
|
@ -78,7 +71,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||||
|
|
||||||
resultsConsumer := rabbit.NewConsumer(consumerClient)
|
resultsConsumer := rabbit.NewConsumer(consumerClient)
|
||||||
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
||||||
log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName)
|
log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
@ -152,62 +145,26 @@ func (s *service) convertResult(b []byte) *structs.Result {
|
||||||
|
|
||||||
// TODO refactor this later: get origins from merch api and remove ctx pass via deps
|
// TODO refactor this later: get origins from merch api and remove ctx pass via deps
|
||||||
func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) {
|
func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) {
|
||||||
|
origins := [...]string{
|
||||||
|
"surugaya",
|
||||||
|
"mandarake",
|
||||||
|
"amiami",
|
||||||
|
}
|
||||||
|
|
||||||
publishers := make(map[string]chan<- []byte)
|
publishers := make(map[string]chan<- []byte)
|
||||||
|
|
||||||
// surugaya
|
for _, origin := range origins {
|
||||||
surugayaOpts := rabbit.QueueOpts{
|
qn := fmt.Sprintf("task-publisher-%s", origin)
|
||||||
QueueName: "task-publisher-surugaya",
|
pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
||||||
Durable: false,
|
if err != nil {
|
||||||
AutoDelete: false,
|
log.WithError(err).Error("Failed to create publisher")
|
||||||
Exclusive: false,
|
continue
|
||||||
NoWait: false,
|
}
|
||||||
Args: nil,
|
|
||||||
|
publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen)
|
||||||
|
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
|
||||||
}
|
}
|
||||||
|
|
||||||
surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Failed to create publisher")
|
|
||||||
}
|
|
||||||
|
|
||||||
publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen)
|
|
||||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName)
|
|
||||||
|
|
||||||
//mandarake
|
|
||||||
mandarakeOpts := rabbit.QueueOpts{
|
|
||||||
QueueName: "task-publisher-mandarake",
|
|
||||||
Durable: false,
|
|
||||||
AutoDelete: false,
|
|
||||||
Exclusive: false,
|
|
||||||
NoWait: false,
|
|
||||||
Args: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Failed to create publisher")
|
|
||||||
}
|
|
||||||
|
|
||||||
publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen)
|
|
||||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName)
|
|
||||||
|
|
||||||
//amiami
|
|
||||||
amiamiOpts := rabbit.QueueOpts{
|
|
||||||
QueueName: "task-publisher-amiami",
|
|
||||||
Durable: true,
|
|
||||||
AutoDelete: false,
|
|
||||||
Exclusive: false,
|
|
||||||
NoWait: false,
|
|
||||||
Args: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Failed to create publisher")
|
|
||||||
}
|
|
||||||
|
|
||||||
publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen)
|
|
||||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName)
|
|
||||||
|
|
||||||
s.taskPublishers = publishers
|
s.taskPublishers = publishers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue