diff --git a/.forgejo/workflows/make-image.yml b/.forgejo/workflows/make-image.yml index 842c851..392037e 100644 --- a/.forgejo/workflows/make-image.yml +++ b/.forgejo/workflows/make-image.yml @@ -33,11 +33,8 @@ jobs: echo "VERSION=${VERSION}" >> $GITHUB_ENV - name: Make image - env: - GIT_CREDENTIALS: https://${{ secrets.MAINTAINER_USERNAME }}:${{ secrets.MAINTAINER_TOKEN }}@repo.nqws.ru/ run: | docker buildx build --platform linux/amd64 \ - --secret id=git_creds,env=GIT_CREDENTIALS \ --tag repo.nqws.ru/${{ github.repository }}:latest \ --tag repo.nqws.ru/${{ github.repository }}:${{ env.VERSION }} \ --push . diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 2fafaf6..0000000 --- a/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -FROM golang:1.26.1-alpine3.23 AS builder - -WORKDIR /app -COPY go.mod go.sum ./ -RUN go mod download -COPY . . -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags="-s -w" -o main "./cmd" - - -FROM alpine:3.23 - -RUN apk add --no-cache \ - tzdata \ - ca-certificates - -COPY --from=builder /app/main /usr/local/bin/app - - -RUN chmod +x /usr/local/bin/app - -ENTRYPOINT ["app"] \ No newline at end of file diff --git a/config.env b/config.env index a5a9766..42f4f76 100644 --- a/config.env +++ b/config.env @@ -18,5 +18,4 @@ RABBIT_HOST= RABBIT_PORT= RABBIT_USER= RABBIT_PASS= -RABBIT_VHOST= -RABBIT_LOGGING_ENABLED= \ No newline at end of file +RABBIT_VHOST= \ No newline at end of file diff --git a/config/config.go b/config/config.go index 00e5411..a4f88e4 100644 --- a/config/config.go +++ b/config/config.go @@ -28,12 +28,11 @@ type TasksSource struct { } type Rabbit struct { - Host string - Port uint16 - User string - Pass string - Vhost string - LoggingEnabled bool + Host string + Port string + User string + Pass string + Vhost string } func NewConfig() Config { @@ -41,7 +40,7 @@ func NewConfig() Config { App: App{ Mode: getEnv("APP_MODE", "dev"), LogLvl: getEnv("APP_LOG_LVL", "debug"), - CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 60*60*6), + CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 20), ProcChanLen: getEnvUint("APP_PROCESSOR_CHANNEL_SIZE", 100), }, @@ -51,20 +50,17 @@ func NewConfig() Config { }, TasksSource: TasksSource{ - //Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"), - //Port: getEnv("TASK_SOURCE_PORT", "61000"), - Host: getEnv("TASK_SOURCE_HOST", "10.0.0.1"), - Port: getEnv("TASK_SOURCE_PORT", "9099"), + Host: getEnv("TASK_API_HOST", "127.0.0.1"), + Port: getEnv("TASK_API_PORT", "61000"), Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60), }, Rabbit: Rabbit{ - Host: getEnv("RABBIT_HOST", "10.0.0.4"), - Port: getEnvPort("RABBIT_PORT", 5672), - User: getEnv("RABBIT_USER", "taskProcessorDev"), - Pass: getEnv("RABBIT_PASS", "pass1234"), - Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"), - LoggingEnabled: getEnvBool("RABBIT_LOGGING_ENABLED", true), + Host: getEnv("RABBIT_HOST", "10.0.0.4"), + Port: getEnv("RABBIT_PORT", "5672"), + User: getEnv("RABBIT_USER", "taskProcessorDev"), + Pass: getEnv("RABBIT_PASS", "pass1234"), + Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"), }, } } diff --git a/config/helper.go b/config/helper.go index 5ff6a49..e17dc02 100644 --- a/config/helper.go +++ b/config/helper.go @@ -39,25 +39,3 @@ func getEnvUint(key string, fallback uint) uint { } return fallback } - -func getEnvPort(key string, fallback uint16) uint16 { - if value, ok := os.LookupEnv(key); ok { - num, err := strconv.ParseUint(value, 10, 16) - if err != nil { - return fallback - } - return uint16(num) - } - return fallback -} - -func getEnvBool(key string, fallback bool) bool { - if value, ok := os.LookupEnv(key); ok { - val, err := strconv.ParseBool(value) - if err != nil { - return fallback - } - return val - } - return fallback -} diff --git a/go.mod b/go.mod index 9bfcfe1..1bfc9f5 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/sirupsen/logrus v1.9.4 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 - repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 + repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15 ) require ( @@ -15,5 +15,5 @@ require ( golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.35.0 // indirect golang.org/x/time v0.15.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect ) diff --git a/go.sum b/go.sum index 0ad66c6..dfd472e 100644 --- a/go.sum +++ b/go.sum @@ -44,13 +44,13 @@ golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d h1:wT2n40TBqFY6wiwazVK9/iTWbsQrgk5ZfCSVFLO9LQA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY= -repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15 h1:w+jvbwEnpGXxYI37iJvfTEYYNJ9ROUjBwlEOY7LJGxM= +repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo= diff --git a/internal/app/handler.go b/internal/app/handler.go index f895073..676bb81 100644 --- a/internal/app/handler.go +++ b/internal/app/handler.go @@ -33,8 +33,7 @@ func NewApp(ctx context.Context, cfg config.Config) *App { Pass: cfg.Rabbit.Pass, Vhost: cfg.Rabbit.Vhost, }, - LoggingEnabled: cfg.Rabbit.LoggingEnabled, - ChanLen: cfg.App.ProcChanLen, + ChanLen: cfg.App.ProcChanLen, }) return &App{ @@ -53,13 +52,11 @@ func (app *App) Run(ctx context.Context) error { defer mainLoop.Stop() go func() { - log.Info("Process tasks after start") if err := app.processor.ProcessTasks(ctx); err != nil { errChan <- err } for range mainLoop.C { - log.WithField("period", app.config.App.CheckPeriod).Info("Repeated process tasks") if err := app.processor.ProcessTasks(ctx); err != nil { errChan <- err } diff --git a/internal/processor/handler.go b/internal/processor/handler.go index 164cb44..b008dba 100644 --- a/internal/processor/handler.go +++ b/internal/processor/handler.go @@ -2,6 +2,8 @@ package processor import ( "context" + "fmt" + "net" "task-processor/internal/taskAgent" ) @@ -13,22 +15,28 @@ type handler struct { type Addr struct { Host string - Port uint16 + Port string User string Pass string Vhost string } type Deps struct { - Ctx context.Context - TA taskAgent.TaskAgent - Addr Addr - ChanLen uint - LoggingEnabled bool + Ctx context.Context + TA taskAgent.TaskAgent + Addr Addr + ChanLen uint } func NewHandler(deps Deps) Processor { + addr := makeAddr(deps.Addr) + return &handler{ - service: newService(deps), + service: newService(deps, addr), } } + +func makeAddr(addr Addr) string { + //"amqp://username:password@host:port/vhost" + return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost) +} diff --git a/internal/processor/helper.go b/internal/processor/helper.go deleted file mode 100644 index cb46a12..0000000 --- a/internal/processor/helper.go +++ /dev/null @@ -1,10 +0,0 @@ -package processor - -import "strings" - -func linkIsValid(link string) bool { - if strings.HasPrefix(link, "http://") || strings.HasPrefix(link, "https://") { - return true - } - return false -} diff --git a/internal/processor/service.go b/internal/processor/service.go index c7ea017..80224e0 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -3,6 +3,7 @@ package processor import ( "context" "encoding/json" + "fmt" log "github.com/sirupsen/logrus" rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit" "task-processor/internal/structs" @@ -11,30 +12,18 @@ import ( ) type service struct { - taskAgent taskAgent.TaskAgent - brokerAddr rabbit.Address - taskPublishers map[string]chan<- []byte - rabbitLoggingEnabled bool + taskAgent taskAgent.TaskAgent + brokerAddr string + taskPublishers map[string]chan<- []byte } -func newService(deps Deps) *service { - ba := rabbit.Address{ - Username: deps.Addr.User, - Password: deps.Addr.Pass, - Host: deps.Addr.Host, - Port: deps.Addr.Port, - Vhost: deps.Addr.Vhost, +func newService(deps Deps, addr string) *service { + + return &service{ + taskAgent: deps.TA, + brokerAddr: addr, + taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen), } - - s := &service{ - taskAgent: deps.TA, - brokerAddr: ba, - rabbitLoggingEnabled: deps.LoggingEnabled, - } - - s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen) - - return s } func (s *service) ProcessTasks(ctx context.Context) error { @@ -61,16 +50,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { log.Debugf("%v Results sender start", pkgLogHeader) runCtx, cancel := context.WithCancel(ctx) - taskResults := rabbit.QueueOpts{ - QueueName: "tasks-results", - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Args: nil, - } - - consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled)) + consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results") if err != nil { cancel() return err @@ -78,7 +58,6 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { resultsConsumer := rabbit.NewConsumer(consumerClient) resultChan := resultsConsumer.Start(runCtx, chanLen) - log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName) go func() { defer cancel() @@ -119,10 +98,6 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error { func (s *service) sendTasks(tasks []structs.Task) error { for _, tsk := range tasks { for origin, link := range tsk.Origins { - if !linkIsValid(link) { - continue - } - if origin == "surugaya" { pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link) } @@ -150,65 +125,28 @@ func (s *service) convertResult(b []byte) *structs.Result { return &res } -// TODO refactor this later: get origins from merch api and remove ctx pass via deps -func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) { +func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte { + origins := [...]string{ + "surugaya", + "mandarake", + "amiami", + } + publishers := make(map[string]chan<- []byte) - // surugaya - surugayaOpts := rabbit.QueueOpts{ - QueueName: "task-publisher-surugaya", - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Args: nil, + for _, origin := range origins { + qn := fmt.Sprintf("task-publisher-%s", origin) + pubClient, err := rabbit.NewClient(addr, qn) + if err != nil { + log.WithError(err).Error("Failed to create publisher") + continue + } + + publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen) + log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn) } - surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) - if err != nil { - log.WithError(err).Error("Failed to create publisher") - } - - publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen) - log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName) - - //mandarake - mandarakeOpts := rabbit.QueueOpts{ - QueueName: "task-publisher-mandarake", - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Args: nil, - } - - mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) - if err != nil { - log.WithError(err).Error("Failed to create publisher") - } - - publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen) - log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName) - - //amiami - amiamiOpts := rabbit.QueueOpts{ - QueueName: "task-publisher-amiami", - Durable: true, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Args: nil, - } - - amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled)) - if err != nil { - log.WithError(err).Error("Failed to create publisher") - } - - publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen) - log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName) - - s.taskPublishers = publishers + return publishers } func pushTask(pubChan chan<- []byte, m, l string) {