Compare commits
No commits in common. "main" and "v0.2.0" have entirely different histories.
11 changed files with 65 additions and 183 deletions
|
|
@ -33,11 +33,8 @@ jobs:
|
||||||
echo "VERSION=${VERSION}" >> $GITHUB_ENV
|
echo "VERSION=${VERSION}" >> $GITHUB_ENV
|
||||||
|
|
||||||
- name: Make image
|
- name: Make image
|
||||||
env:
|
|
||||||
GIT_CREDENTIALS: https://${{ secrets.MAINTAINER_USERNAME }}:${{ secrets.MAINTAINER_TOKEN }}@repo.nqws.ru/
|
|
||||||
run: |
|
run: |
|
||||||
docker buildx build --platform linux/amd64 \
|
docker buildx build --platform linux/amd64 \
|
||||||
--secret id=git_creds,env=GIT_CREDENTIALS \
|
|
||||||
--tag repo.nqws.ru/${{ github.repository }}:latest \
|
--tag repo.nqws.ru/${{ github.repository }}:latest \
|
||||||
--tag repo.nqws.ru/${{ github.repository }}:${{ env.VERSION }} \
|
--tag repo.nqws.ru/${{ github.repository }}:${{ env.VERSION }} \
|
||||||
--push .
|
--push .
|
||||||
|
|
|
||||||
21
Dockerfile
21
Dockerfile
|
|
@ -1,21 +0,0 @@
|
||||||
FROM golang:1.26.1-alpine3.23 AS builder
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
COPY go.mod go.sum ./
|
|
||||||
RUN go mod download
|
|
||||||
COPY . .
|
|
||||||
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags="-s -w" -o main "./cmd"
|
|
||||||
|
|
||||||
|
|
||||||
FROM alpine:3.23
|
|
||||||
|
|
||||||
RUN apk add --no-cache \
|
|
||||||
tzdata \
|
|
||||||
ca-certificates
|
|
||||||
|
|
||||||
COPY --from=builder /app/main /usr/local/bin/app
|
|
||||||
|
|
||||||
|
|
||||||
RUN chmod +x /usr/local/bin/app
|
|
||||||
|
|
||||||
ENTRYPOINT ["app"]
|
|
||||||
|
|
@ -19,4 +19,3 @@ RABBIT_PORT=
|
||||||
RABBIT_USER=
|
RABBIT_USER=
|
||||||
RABBIT_PASS=
|
RABBIT_PASS=
|
||||||
RABBIT_VHOST=
|
RABBIT_VHOST=
|
||||||
RABBIT_LOGGING_ENABLED=
|
|
||||||
|
|
@ -29,11 +29,10 @@ type TasksSource struct {
|
||||||
|
|
||||||
type Rabbit struct {
|
type Rabbit struct {
|
||||||
Host string
|
Host string
|
||||||
Port uint16
|
Port string
|
||||||
User string
|
User string
|
||||||
Pass string
|
Pass string
|
||||||
Vhost string
|
Vhost string
|
||||||
LoggingEnabled bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfig() Config {
|
func NewConfig() Config {
|
||||||
|
|
@ -41,7 +40,7 @@ func NewConfig() Config {
|
||||||
App: App{
|
App: App{
|
||||||
Mode: getEnv("APP_MODE", "dev"),
|
Mode: getEnv("APP_MODE", "dev"),
|
||||||
LogLvl: getEnv("APP_LOG_LVL", "debug"),
|
LogLvl: getEnv("APP_LOG_LVL", "debug"),
|
||||||
CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 60*60*6),
|
CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 20),
|
||||||
ProcChanLen: getEnvUint("APP_PROCESSOR_CHANNEL_SIZE", 100),
|
ProcChanLen: getEnvUint("APP_PROCESSOR_CHANNEL_SIZE", 100),
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
@ -51,20 +50,17 @@ func NewConfig() Config {
|
||||||
},
|
},
|
||||||
|
|
||||||
TasksSource: TasksSource{
|
TasksSource: TasksSource{
|
||||||
//Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"),
|
Host: getEnv("TASK_API_HOST", "127.0.0.1"),
|
||||||
//Port: getEnv("TASK_SOURCE_PORT", "61000"),
|
Port: getEnv("TASK_API_PORT", "61000"),
|
||||||
Host: getEnv("TASK_SOURCE_HOST", "10.0.0.1"),
|
|
||||||
Port: getEnv("TASK_SOURCE_PORT", "9099"),
|
|
||||||
Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60),
|
Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60),
|
||||||
},
|
},
|
||||||
|
|
||||||
Rabbit: Rabbit{
|
Rabbit: Rabbit{
|
||||||
Host: getEnv("RABBIT_HOST", "10.0.0.4"),
|
Host: getEnv("RABBIT_HOST", "10.0.0.4"),
|
||||||
Port: getEnvPort("RABBIT_PORT", 5672),
|
Port: getEnv("RABBIT_PORT", "5672"),
|
||||||
User: getEnv("RABBIT_USER", "taskProcessorDev"),
|
User: getEnv("RABBIT_USER", "taskProcessorDev"),
|
||||||
Pass: getEnv("RABBIT_PASS", "pass1234"),
|
Pass: getEnv("RABBIT_PASS", "pass1234"),
|
||||||
Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"),
|
Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"),
|
||||||
LoggingEnabled: getEnvBool("RABBIT_LOGGING_ENABLED", true),
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,25 +39,3 @@ func getEnvUint(key string, fallback uint) uint {
|
||||||
}
|
}
|
||||||
return fallback
|
return fallback
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEnvPort(key string, fallback uint16) uint16 {
|
|
||||||
if value, ok := os.LookupEnv(key); ok {
|
|
||||||
num, err := strconv.ParseUint(value, 10, 16)
|
|
||||||
if err != nil {
|
|
||||||
return fallback
|
|
||||||
}
|
|
||||||
return uint16(num)
|
|
||||||
}
|
|
||||||
return fallback
|
|
||||||
}
|
|
||||||
|
|
||||||
func getEnvBool(key string, fallback bool) bool {
|
|
||||||
if value, ok := os.LookupEnv(key); ok {
|
|
||||||
val, err := strconv.ParseBool(value)
|
|
||||||
if err != nil {
|
|
||||||
return fallback
|
|
||||||
}
|
|
||||||
return val
|
|
||||||
}
|
|
||||||
return fallback
|
|
||||||
}
|
|
||||||
|
|
|
||||||
4
go.mod
4
go.mod
|
|
@ -6,7 +6,7 @@ require (
|
||||||
github.com/sirupsen/logrus v1.9.4
|
github.com/sirupsen/logrus v1.9.4
|
||||||
google.golang.org/grpc v1.80.0
|
google.golang.org/grpc v1.80.0
|
||||||
google.golang.org/protobuf v1.36.11
|
google.golang.org/protobuf v1.36.11
|
||||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19
|
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
|
@ -15,5 +15,5 @@ require (
|
||||||
golang.org/x/sys v0.42.0 // indirect
|
golang.org/x/sys v0.42.0 // indirect
|
||||||
golang.org/x/text v0.35.0 // indirect
|
golang.org/x/text v0.35.0 // indirect
|
||||||
golang.org/x/time v0.15.0 // indirect
|
golang.org/x/time v0.15.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||||
)
|
)
|
||||||
|
|
|
||||||
8
go.sum
8
go.sum
|
|
@ -44,13 +44,13 @@ golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
|
||||||
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
|
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
|
||||||
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
|
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
|
||||||
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
|
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d h1:wT2n40TBqFY6wiwazVK9/iTWbsQrgk5ZfCSVFLO9LQA=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
||||||
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
|
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
|
||||||
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
|
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
|
||||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY=
|
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15 h1:w+jvbwEnpGXxYI37iJvfTEYYNJ9ROUjBwlEOY7LJGxM=
|
||||||
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=
|
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,6 @@ func NewApp(ctx context.Context, cfg config.Config) *App {
|
||||||
Pass: cfg.Rabbit.Pass,
|
Pass: cfg.Rabbit.Pass,
|
||||||
Vhost: cfg.Rabbit.Vhost,
|
Vhost: cfg.Rabbit.Vhost,
|
||||||
},
|
},
|
||||||
LoggingEnabled: cfg.Rabbit.LoggingEnabled,
|
|
||||||
ChanLen: cfg.App.ProcChanLen,
|
ChanLen: cfg.App.ProcChanLen,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -53,13 +52,11 @@ func (app *App) Run(ctx context.Context) error {
|
||||||
defer mainLoop.Stop()
|
defer mainLoop.Stop()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Info("Process tasks after start")
|
|
||||||
if err := app.processor.ProcessTasks(ctx); err != nil {
|
if err := app.processor.ProcessTasks(ctx); err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
}
|
}
|
||||||
|
|
||||||
for range mainLoop.C {
|
for range mainLoop.C {
|
||||||
log.WithField("period", app.config.App.CheckPeriod).Info("Repeated process tasks")
|
|
||||||
if err := app.processor.ProcessTasks(ctx); err != nil {
|
if err := app.processor.ProcessTasks(ctx); err != nil {
|
||||||
errChan <- err
|
errChan <- err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,8 @@ package processor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
"task-processor/internal/taskAgent"
|
"task-processor/internal/taskAgent"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -13,7 +15,7 @@ type handler struct {
|
||||||
|
|
||||||
type Addr struct {
|
type Addr struct {
|
||||||
Host string
|
Host string
|
||||||
Port uint16
|
Port string
|
||||||
User string
|
User string
|
||||||
Pass string
|
Pass string
|
||||||
Vhost string
|
Vhost string
|
||||||
|
|
@ -24,11 +26,17 @@ type Deps struct {
|
||||||
TA taskAgent.TaskAgent
|
TA taskAgent.TaskAgent
|
||||||
Addr Addr
|
Addr Addr
|
||||||
ChanLen uint
|
ChanLen uint
|
||||||
LoggingEnabled bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandler(deps Deps) Processor {
|
func NewHandler(deps Deps) Processor {
|
||||||
|
addr := makeAddr(deps.Addr)
|
||||||
|
|
||||||
return &handler{
|
return &handler{
|
||||||
service: newService(deps),
|
service: newService(deps, addr),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeAddr(addr Addr) string {
|
||||||
|
//"amqp://username:password@host:port/vhost"
|
||||||
|
return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,10 +0,0 @@
|
||||||
package processor
|
|
||||||
|
|
||||||
import "strings"
|
|
||||||
|
|
||||||
func linkIsValid(link string) bool {
|
|
||||||
if strings.HasPrefix(link, "http://") || strings.HasPrefix(link, "https://") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
@ -3,6 +3,7 @@ package processor
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
|
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
|
||||||
"task-processor/internal/structs"
|
"task-processor/internal/structs"
|
||||||
|
|
@ -12,29 +13,17 @@ import (
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
taskAgent taskAgent.TaskAgent
|
taskAgent taskAgent.TaskAgent
|
||||||
brokerAddr rabbit.Address
|
brokerAddr string
|
||||||
taskPublishers map[string]chan<- []byte
|
taskPublishers map[string]chan<- []byte
|
||||||
rabbitLoggingEnabled bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newService(deps Deps) *service {
|
func newService(deps Deps, addr string) *service {
|
||||||
ba := rabbit.Address{
|
|
||||||
Username: deps.Addr.User,
|
|
||||||
Password: deps.Addr.Pass,
|
|
||||||
Host: deps.Addr.Host,
|
|
||||||
Port: deps.Addr.Port,
|
|
||||||
Vhost: deps.Addr.Vhost,
|
|
||||||
}
|
|
||||||
|
|
||||||
s := &service{
|
return &service{
|
||||||
taskAgent: deps.TA,
|
taskAgent: deps.TA,
|
||||||
brokerAddr: ba,
|
brokerAddr: addr,
|
||||||
rabbitLoggingEnabled: deps.LoggingEnabled,
|
taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen),
|
||||||
}
|
}
|
||||||
|
|
||||||
s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen)
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) ProcessTasks(ctx context.Context) error {
|
func (s *service) ProcessTasks(ctx context.Context) error {
|
||||||
|
|
@ -61,16 +50,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||||
log.Debugf("%v Results sender start", pkgLogHeader)
|
log.Debugf("%v Results sender start", pkgLogHeader)
|
||||||
runCtx, cancel := context.WithCancel(ctx)
|
runCtx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
taskResults := rabbit.QueueOpts{
|
consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results")
|
||||||
QueueName: "tasks-results",
|
|
||||||
Durable: false,
|
|
||||||
AutoDelete: false,
|
|
||||||
Exclusive: false,
|
|
||||||
NoWait: false,
|
|
||||||
Args: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return err
|
return err
|
||||||
|
|
@ -78,7 +58,6 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||||
|
|
||||||
resultsConsumer := rabbit.NewConsumer(consumerClient)
|
resultsConsumer := rabbit.NewConsumer(consumerClient)
|
||||||
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
||||||
log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
@ -119,10 +98,6 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||||
func (s *service) sendTasks(tasks []structs.Task) error {
|
func (s *service) sendTasks(tasks []structs.Task) error {
|
||||||
for _, tsk := range tasks {
|
for _, tsk := range tasks {
|
||||||
for origin, link := range tsk.Origins {
|
for origin, link := range tsk.Origins {
|
||||||
if !linkIsValid(link) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if origin == "surugaya" {
|
if origin == "surugaya" {
|
||||||
pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link)
|
pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link)
|
||||||
}
|
}
|
||||||
|
|
@ -150,65 +125,28 @@ func (s *service) convertResult(b []byte) *structs.Result {
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO refactor this later: get origins from merch api and remove ctx pass via deps
|
func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte {
|
||||||
func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) {
|
origins := [...]string{
|
||||||
|
"surugaya",
|
||||||
|
"mandarake",
|
||||||
|
"amiami",
|
||||||
|
}
|
||||||
|
|
||||||
publishers := make(map[string]chan<- []byte)
|
publishers := make(map[string]chan<- []byte)
|
||||||
|
|
||||||
// surugaya
|
for _, origin := range origins {
|
||||||
surugayaOpts := rabbit.QueueOpts{
|
qn := fmt.Sprintf("task-publisher-%s", origin)
|
||||||
QueueName: "task-publisher-surugaya",
|
pubClient, err := rabbit.NewClient(addr, qn)
|
||||||
Durable: false,
|
|
||||||
AutoDelete: false,
|
|
||||||
Exclusive: false,
|
|
||||||
NoWait: false,
|
|
||||||
Args: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to create publisher")
|
log.WithError(err).Error("Failed to create publisher")
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen)
|
publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen)
|
||||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName)
|
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
|
||||||
|
|
||||||
//mandarake
|
|
||||||
mandarakeOpts := rabbit.QueueOpts{
|
|
||||||
QueueName: "task-publisher-mandarake",
|
|
||||||
Durable: false,
|
|
||||||
AutoDelete: false,
|
|
||||||
Exclusive: false,
|
|
||||||
NoWait: false,
|
|
||||||
Args: nil,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
return publishers
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Failed to create publisher")
|
|
||||||
}
|
|
||||||
|
|
||||||
publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen)
|
|
||||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName)
|
|
||||||
|
|
||||||
//amiami
|
|
||||||
amiamiOpts := rabbit.QueueOpts{
|
|
||||||
QueueName: "task-publisher-amiami",
|
|
||||||
Durable: true,
|
|
||||||
AutoDelete: false,
|
|
||||||
Exclusive: false,
|
|
||||||
NoWait: false,
|
|
||||||
Args: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Failed to create publisher")
|
|
||||||
}
|
|
||||||
|
|
||||||
publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen)
|
|
||||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName)
|
|
||||||
|
|
||||||
s.taskPublishers = publishers
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func pushTask(pubChan chan<- []byte, m, l string) {
|
func pushTask(pubChan chan<- []byte, m, l string) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue