From 979c7c4a4fb090c81de9b58325fae37270db93f1 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:44:44 +0300 Subject: [PATCH 01/13] update --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1bfc9f5..d3f6288 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/sirupsen/logrus v1.9.4 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 - repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15 + repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16 ) require ( diff --git a/go.sum b/go.sum index dfd472e..07fdb4a 100644 --- a/go.sum +++ b/go.sum @@ -52,5 +52,5 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15 h1:w+jvbwEnpGXxYI37iJvfTEYYNJ9ROUjBwlEOY7LJGxM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16 h1:bqoQZr4kblRyGQFjqBItakGm/p2PP0+68U12lLnuIYM= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= From f70d53c75cfc429cca5878d43fb808d005a121a5 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:45:22 +0300 Subject: [PATCH 02/13] uint16 type for port + env getter --- config/config.go | 6 +++--- config/helper.go | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index a4f88e4..79d9d1f 100644 --- a/config/config.go +++ b/config/config.go @@ -29,7 +29,7 @@ type TasksSource struct { type Rabbit struct { Host string - Port string + Port uint16 User string Pass string Vhost string @@ -40,7 +40,7 @@ func NewConfig() Config { App: App{ Mode: getEnv("APP_MODE", "dev"), LogLvl: getEnv("APP_LOG_LVL", "debug"), - CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 20), + CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 60*60*6), ProcChanLen: getEnvUint("APP_PROCESSOR_CHANNEL_SIZE", 100), }, @@ -57,7 +57,7 @@ func NewConfig() Config { Rabbit: Rabbit{ Host: getEnv("RABBIT_HOST", "10.0.0.4"), - Port: getEnv("RABBIT_PORT", "5672"), + Port: getEnvPort("RABBIT_PORT", 5672), User: getEnv("RABBIT_USER", "taskProcessorDev"), Pass: getEnv("RABBIT_PASS", "pass1234"), Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"), diff --git a/config/helper.go b/config/helper.go index e17dc02..1771dbe 100644 --- a/config/helper.go +++ b/config/helper.go @@ -39,3 +39,14 @@ func getEnvUint(key string, fallback uint) uint { } return fallback } + +func getEnvPort(key string, fallback uint16) uint16 { + if value, ok := os.LookupEnv(key); ok { + num, err := strconv.ParseUint(value, 10, 16) + if err != nil { + return fallback + } + return uint16(num) + } + return fallback +} From 5fe38af03591edd13bbeb1cd3ba3ab905a918f61 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:46:29 +0300 Subject: [PATCH 03/13] address format and deps change + logger option --- internal/processor/handler.go | 22 ++++++------------- internal/processor/service.go | 41 ++++++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/internal/processor/handler.go b/internal/processor/handler.go index b008dba..3252672 100644 --- a/internal/processor/handler.go +++ b/internal/processor/handler.go @@ -2,8 +2,6 @@ package processor import ( "context" - "fmt" - "net" "task-processor/internal/taskAgent" ) @@ -15,28 +13,22 @@ type handler struct { type Addr struct { Host string - Port string + Port uint16 User string Pass string Vhost string } type Deps struct { - Ctx context.Context - TA taskAgent.TaskAgent - Addr Addr - ChanLen uint + Ctx context.Context + TA taskAgent.TaskAgent + Addr Addr + ChanLen uint + LoggerEnabled bool } func NewHandler(deps Deps) Processor { - addr := makeAddr(deps.Addr) - return &handler{ - service: newService(deps, addr), + service: newService(deps), } } - -func makeAddr(addr Addr) string { - //"amqp://username:password@host:port/vhost" - return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost) -} diff --git a/internal/processor/service.go b/internal/processor/service.go index 80224e0..01e4ef8 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -12,18 +12,30 @@ import ( ) type service struct { - taskAgent taskAgent.TaskAgent - brokerAddr string - taskPublishers map[string]chan<- []byte + taskAgent taskAgent.TaskAgent + brokerAddr rabbit.Address + taskPublishers map[string]chan<- []byte + rabbitLoggingEnabled bool } -func newService(deps Deps, addr string) *service { - - return &service{ - taskAgent: deps.TA, - brokerAddr: addr, - taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen), +func newService(deps Deps) *service { + ba := rabbit.Address{ + Username: deps.Addr.User, + Password: deps.Addr.Pass, + Host: deps.Addr.Host, + Port: deps.Addr.Port, + Vhost: deps.Addr.Vhost, } + + s := &service{ + taskAgent: deps.TA, + brokerAddr: ba, + rabbitLoggingEnabled: deps.LoggerEnabled, + } + + s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen) + + return s } func (s *service) ProcessTasks(ctx context.Context) error { @@ -50,7 +62,8 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) - consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results") + qn := "tasks-results" + consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { cancel() return err @@ -58,6 +71,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) + log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn) go func() { defer cancel() @@ -125,7 +139,8 @@ func (s *service) convertResult(b []byte) *structs.Result { return &res } -func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte { +// TODO refactor this later: get origins from merch api and remove ctx pass via deps +func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { origins := [...]string{ "surugaya", "mandarake", @@ -136,7 +151,7 @@ func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[stri for _, origin := range origins { qn := fmt.Sprintf("task-publisher-%s", origin) - pubClient, err := rabbit.NewClient(addr, qn) + pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { log.WithError(err).Error("Failed to create publisher") continue @@ -146,7 +161,7 @@ func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[stri log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) } - return publishers + s.taskPublishers = publishers } func pushTask(pubChan chan<- []byte, m, l string) { From 2a3ba0e7977d17fd683659cf8dcec120312c02a4 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:46:39 +0300 Subject: [PATCH 04/13] deps --- internal/app/handler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/app/handler.go b/internal/app/handler.go index 676bb81..5f050f6 100644 --- a/internal/app/handler.go +++ b/internal/app/handler.go @@ -33,7 +33,8 @@ func NewApp(ctx context.Context, cfg config.Config) *App { Pass: cfg.Rabbit.Pass, Vhost: cfg.Rabbit.Vhost, }, - ChanLen: cfg.App.ProcChanLen, + ChanLen: cfg.App.ProcChanLen, + LoggerEnabled: false, }) return &App{ From a5aec1fca3e2d4050e1e3ac300c49f3d7ebf09af Mon Sep 17 00:00:00 2001 From: nquidox Date: Sun, 5 Apr 2026 20:16:44 +0300 Subject: [PATCH 05/13] build files --- .forgejo/workflows/make-image.yml | 2 ++ Dockerfile | 31 +++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 Dockerfile diff --git a/.forgejo/workflows/make-image.yml b/.forgejo/workflows/make-image.yml index 392037e..087e050 100644 --- a/.forgejo/workflows/make-image.yml +++ b/.forgejo/workflows/make-image.yml @@ -33,8 +33,10 @@ jobs: echo "VERSION=${VERSION}" >> $GITHUB_ENV - name: Make image + GIT_CREDENTIALS: https://${{ secrets.MAINTAINER_USERNAME }}:${{ secrets.MAINTAINER_TOKEN }}@repo.nqws.ru/ run: | docker buildx build --platform linux/amd64 \ + --secret id=git_creds,env=GIT_CREDENTIALS \ --tag repo.nqws.ru/${{ github.repository }}:latest \ --tag repo.nqws.ru/${{ github.repository }}:${{ env.VERSION }} \ --push . diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c8fe906 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,31 @@ +FROM golang:1.26.1-alpine3.23 AS builder + +RUN apk add --no-cache git + +ENV GOPRIVATE=repo.nqws.ru/* +ENV GONOSUMDB=repo.nqws.ru/* + +WORKDIR /app + +COPY go.mod go.sum ./ + +RUN --mount=type=secret,id=netrc,dst=/root/.netrc,mode=0600 \ + git config --global credential.helper 'store --file=/root/.netrc' && \ + go mod download + +COPY . . +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags="-s -w" -o main "./cmd" + + +FROM alpine:3.23 + +RUN apk add --no-cache \ + tzdata \ + ca-certificates + +COPY --from=builder /app/main /usr/local/bin/app + + +RUN chmod +x /usr/local/bin/app + +ENTRYPOINT ["app"] From 8cf8ff70d728d774cf2d893e93e73c114159e901 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sun, 5 Apr 2026 20:21:16 +0300 Subject: [PATCH 06/13] fix --- .forgejo/workflows/make-image.yml | 3 ++- Dockerfile | 6 ++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.forgejo/workflows/make-image.yml b/.forgejo/workflows/make-image.yml index 087e050..842c851 100644 --- a/.forgejo/workflows/make-image.yml +++ b/.forgejo/workflows/make-image.yml @@ -33,7 +33,8 @@ jobs: echo "VERSION=${VERSION}" >> $GITHUB_ENV - name: Make image - GIT_CREDENTIALS: https://${{ secrets.MAINTAINER_USERNAME }}:${{ secrets.MAINTAINER_TOKEN }}@repo.nqws.ru/ + env: + GIT_CREDENTIALS: https://${{ secrets.MAINTAINER_USERNAME }}:${{ secrets.MAINTAINER_TOKEN }}@repo.nqws.ru/ run: | docker buildx build --platform linux/amd64 \ --secret id=git_creds,env=GIT_CREDENTIALS \ diff --git a/Dockerfile b/Dockerfile index c8fe906..0254a2b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,17 +6,15 @@ ENV GOPRIVATE=repo.nqws.ru/* ENV GONOSUMDB=repo.nqws.ru/* WORKDIR /app - COPY go.mod go.sum ./ -RUN --mount=type=secret,id=netrc,dst=/root/.netrc,mode=0600 \ - git config --global credential.helper 'store --file=/root/.netrc' && \ +RUN --mount=type=secret,id=git_creds,dst=/root/.git-credentials,mode=0600 \ + git config --global credential.helper store && \ go mod download COPY . . RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags="-s -w" -o main "./cmd" - FROM alpine:3.23 RUN apk add --no-cache \ From 047b61c9a45cb6118ba574d7150d7939b12d1d1c Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 6 Apr 2026 12:03:03 +0300 Subject: [PATCH 07/13] update --- Dockerfile | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0254a2b..2fafaf6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,20 +1,12 @@ FROM golang:1.26.1-alpine3.23 AS builder -RUN apk add --no-cache git - -ENV GOPRIVATE=repo.nqws.ru/* -ENV GONOSUMDB=repo.nqws.ru/* - WORKDIR /app COPY go.mod go.sum ./ - -RUN --mount=type=secret,id=git_creds,dst=/root/.git-credentials,mode=0600 \ - git config --global credential.helper store && \ - go mod download - +RUN go mod download COPY . . RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags="-s -w" -o main "./cmd" + FROM alpine:3.23 RUN apk add --no-cache \ @@ -26,4 +18,4 @@ COPY --from=builder /app/main /usr/local/bin/app RUN chmod +x /usr/local/bin/app -ENTRYPOINT ["app"] +ENTRYPOINT ["app"] \ No newline at end of file From af1710c3fe00afd7ddc6c0f969580c37ebc5d248 Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 6 Apr 2026 12:15:32 +0300 Subject: [PATCH 08/13] env name fix --- config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index 79d9d1f..d0528ba 100644 --- a/config/config.go +++ b/config/config.go @@ -50,8 +50,8 @@ func NewConfig() Config { }, TasksSource: TasksSource{ - Host: getEnv("TASK_API_HOST", "127.0.0.1"), - Port: getEnv("TASK_API_PORT", "61000"), + Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"), + Port: getEnv("TASK_SOURCE_PORT", "61000"), Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60), }, From c5013c03ff331c60bd6208a7da8af3578fb58516 Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 6 Apr 2026 17:05:00 +0300 Subject: [PATCH 09/13] links validator --- internal/processor/helper.go | 10 ++++++++++ internal/processor/service.go | 4 ++++ 2 files changed, 14 insertions(+) create mode 100644 internal/processor/helper.go diff --git a/internal/processor/helper.go b/internal/processor/helper.go new file mode 100644 index 0000000..cb46a12 --- /dev/null +++ b/internal/processor/helper.go @@ -0,0 +1,10 @@ +package processor + +import "strings" + +func linkIsValid(link string) bool { + if strings.HasPrefix(link, "http://") || strings.HasPrefix(link, "https://") { + return true + } + return false +} diff --git a/internal/processor/service.go b/internal/processor/service.go index 01e4ef8..4544487 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -112,6 +112,10 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { func (s *service) sendTasks(tasks []structs.Task) error { for _, tsk := range tasks { for origin, link := range tsk.Origins { + if !linkIsValid(link) { + continue + } + if origin == "surugaya" { pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link) } From b29e60b7123822d556e654f823e5b70bd79602cd Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:15:24 +0300 Subject: [PATCH 10/13] log enabled dep --- config.env | 3 ++- config/config.go | 28 ++++++++++++++++------------ config/helper.go | 11 +++++++++++ internal/app/handler.go | 6 ++++-- internal/processor/handler.go | 10 +++++----- internal/processor/service.go | 2 +- 6 files changed, 39 insertions(+), 21 deletions(-) diff --git a/config.env b/config.env index 42f4f76..a5a9766 100644 --- a/config.env +++ b/config.env @@ -18,4 +18,5 @@ RABBIT_HOST= RABBIT_PORT= RABBIT_USER= RABBIT_PASS= -RABBIT_VHOST= \ No newline at end of file +RABBIT_VHOST= +RABBIT_LOGGING_ENABLED= \ No newline at end of file diff --git a/config/config.go b/config/config.go index d0528ba..00e5411 100644 --- a/config/config.go +++ b/config/config.go @@ -28,11 +28,12 @@ type TasksSource struct { } type Rabbit struct { - Host string - Port uint16 - User string - Pass string - Vhost string + Host string + Port uint16 + User string + Pass string + Vhost string + LoggingEnabled bool } func NewConfig() Config { @@ -50,17 +51,20 @@ func NewConfig() Config { }, TasksSource: TasksSource{ - Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"), - Port: getEnv("TASK_SOURCE_PORT", "61000"), + //Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"), + //Port: getEnv("TASK_SOURCE_PORT", "61000"), + Host: getEnv("TASK_SOURCE_HOST", "10.0.0.1"), + Port: getEnv("TASK_SOURCE_PORT", "9099"), Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60), }, Rabbit: Rabbit{ - Host: getEnv("RABBIT_HOST", "10.0.0.4"), - Port: getEnvPort("RABBIT_PORT", 5672), - User: getEnv("RABBIT_USER", "taskProcessorDev"), - Pass: getEnv("RABBIT_PASS", "pass1234"), - Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"), + Host: getEnv("RABBIT_HOST", "10.0.0.4"), + Port: getEnvPort("RABBIT_PORT", 5672), + User: getEnv("RABBIT_USER", "taskProcessorDev"), + Pass: getEnv("RABBIT_PASS", "pass1234"), + Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"), + LoggingEnabled: getEnvBool("RABBIT_LOGGING_ENABLED", true), }, } } diff --git a/config/helper.go b/config/helper.go index 1771dbe..5ff6a49 100644 --- a/config/helper.go +++ b/config/helper.go @@ -50,3 +50,14 @@ func getEnvPort(key string, fallback uint16) uint16 { } return fallback } + +func getEnvBool(key string, fallback bool) bool { + if value, ok := os.LookupEnv(key); ok { + val, err := strconv.ParseBool(value) + if err != nil { + return fallback + } + return val + } + return fallback +} diff --git a/internal/app/handler.go b/internal/app/handler.go index 5f050f6..f895073 100644 --- a/internal/app/handler.go +++ b/internal/app/handler.go @@ -33,8 +33,8 @@ func NewApp(ctx context.Context, cfg config.Config) *App { Pass: cfg.Rabbit.Pass, Vhost: cfg.Rabbit.Vhost, }, - ChanLen: cfg.App.ProcChanLen, - LoggerEnabled: false, + LoggingEnabled: cfg.Rabbit.LoggingEnabled, + ChanLen: cfg.App.ProcChanLen, }) return &App{ @@ -53,11 +53,13 @@ func (app *App) Run(ctx context.Context) error { defer mainLoop.Stop() go func() { + log.Info("Process tasks after start") if err := app.processor.ProcessTasks(ctx); err != nil { errChan <- err } for range mainLoop.C { + log.WithField("period", app.config.App.CheckPeriod).Info("Repeated process tasks") if err := app.processor.ProcessTasks(ctx); err != nil { errChan <- err } diff --git a/internal/processor/handler.go b/internal/processor/handler.go index 3252672..164cb44 100644 --- a/internal/processor/handler.go +++ b/internal/processor/handler.go @@ -20,11 +20,11 @@ type Addr struct { } type Deps struct { - Ctx context.Context - TA taskAgent.TaskAgent - Addr Addr - ChanLen uint - LoggerEnabled bool + Ctx context.Context + TA taskAgent.TaskAgent + Addr Addr + ChanLen uint + LoggingEnabled bool } func NewHandler(deps Deps) Processor { diff --git a/internal/processor/service.go b/internal/processor/service.go index 4544487..1cebf06 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -30,7 +30,7 @@ func newService(deps Deps) *service { s := &service{ taskAgent: deps.TA, brokerAddr: ba, - rabbitLoggingEnabled: deps.LoggerEnabled, + rabbitLoggingEnabled: deps.LoggingEnabled, } s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen) From 98bb6a5bc077781bd5b9027c38f1efc2154e58e7 Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:16:08 +0300 Subject: [PATCH 11/13] update --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index d3f6288..d16b4dc 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/sirupsen/logrus v1.9.4 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 - repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16 + repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17 ) require ( @@ -15,5 +15,5 @@ require ( golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.35.0 // indirect golang.org/x/time v0.15.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect ) diff --git a/go.sum b/go.sum index 07fdb4a..23b2483 100644 --- a/go.sum +++ b/go.sum @@ -44,13 +44,13 @@ golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d h1:wT2n40TBqFY6wiwazVK9/iTWbsQrgk5ZfCSVFLO9LQA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16 h1:bqoQZr4kblRyGQFjqBItakGm/p2PP0+68U12lLnuIYM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.16/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17 h1:f+oJRW0t894F9TjqG4UJAHWLcg1RffoHe5olsmYoltQ= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= From c8a384efcbfeb1395792af85d650c55e5b8132df Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:28:56 +0300 Subject: [PATCH 12/13] update --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d16b4dc..d25d4a7 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/sirupsen/logrus v1.9.4 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 - repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17 + repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 ) require ( diff --git a/go.sum b/go.sum index 23b2483..80b33d5 100644 --- a/go.sum +++ b/go.sum @@ -52,5 +52,5 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17 h1:f+oJRW0t894F9TjqG4UJAHWLcg1RffoHe5olsmYoltQ= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.17/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 h1:y2oQOoQApDXUdZc/9T5fA80pkq2O+0aw3AKbSNrY1aE= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= From 9361319f0a334b0f3d28de953222124cf1a23e2a Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 27 Apr 2026 00:18:39 +0300 Subject: [PATCH 13/13] declare queue with opts quick fix --- go.mod | 2 +- go.sum | 4 +- internal/processor/service.go | 83 ++++++++++++++++++++++++++--------- 3 files changed, 66 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index d25d4a7..9bfcfe1 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/sirupsen/logrus v1.9.4 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 - repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 + repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 ) require ( diff --git a/go.sum b/go.sum index 80b33d5..0ad66c6 100644 --- a/go.sum +++ b/go.sum @@ -52,5 +52,5 @@ google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBN google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18 h1:y2oQOoQApDXUdZc/9T5fA80pkq2O+0aw3AKbSNrY1aE= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.18/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= diff --git a/internal/processor/service.go b/internal/processor/service.go index 1cebf06..c7ea017 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -3,7 +3,6 @@ package processor import ( "context" "encoding/json" - "fmt" log "github.com/sirupsen/logrus" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" "task-processor/internal/structs" @@ -62,8 +61,16 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) - qn := "tasks-results" - consumerClient, err := rabbit.NewClient(s.brokerAddr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) + taskResults := rabbit.QueueOpts{ + QueueName: "tasks-results", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + } + + consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled)) if err != nil { cancel() return err @@ -71,7 +78,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) - log.Debugf("%v Results consumer started: %v", pkgLogHeader, qn) + log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName) go func() { defer cancel() @@ -145,26 +152,62 @@ func (s *service) convertResult(b []byte) *structs.Result { // TODO refactor this later: get origins from merch api and remove ctx pass via deps func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { - origins := [...]string{ - "surugaya", - "mandarake", - "amiami", - } - publishers := make(map[string]chan<- []byte) - for _, origin := range origins { - qn := fmt.Sprintf("task-publisher-%s", origin) - pubClient, err := rabbit.NewClient(addr, qn, rabbit.WithLogging(s.rabbitLoggingEnabled)) - if err != nil { - log.WithError(err).Error("Failed to create publisher") - continue - } - - publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen) - log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) + // surugaya + surugayaOpts := rabbit.QueueOpts{ + QueueName: "task-publisher-surugaya", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, } + surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + } + + publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName) + + //mandarake + mandarakeOpts := rabbit.QueueOpts{ + QueueName: "task-publisher-mandarake", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + } + + mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + } + + publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName) + + //amiami + amiamiOpts := rabbit.QueueOpts{ + QueueName: "task-publisher-amiami", + Durable: true, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + } + + amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + } + + publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName) + s.taskPublishers = publishers }