Compare commits

..

13 commits
v0.2.0 ... main

Author SHA1 Message Date
nquidox
9361319f0a declare queue with opts quick fix
All checks were successful
/ Make image (push) Successful in 44s
2026-04-27 00:18:39 +03:00
nquidox
c8a384efcb update
All checks were successful
/ Make image (push) Successful in 39s
2026-04-08 12:28:56 +03:00
nquidox
98bb6a5bc0 update
Some checks failed
/ Make image (push) Failing after 1m53s
2026-04-08 12:16:08 +03:00
nquidox
b29e60b712 log enabled dep 2026-04-08 12:15:24 +03:00
nquidox
c5013c03ff links validator
All checks were successful
/ Make image (push) Successful in 37s
2026-04-06 17:05:00 +03:00
nquidox
af1710c3fe env name fix
All checks were successful
/ Make image (push) Successful in 37s
2026-04-06 12:15:32 +03:00
nquidox
047b61c9a4 update
All checks were successful
/ Make image (push) Successful in 44s
2026-04-06 12:03:03 +03:00
nquidox
8cf8ff70d7 fix
Some checks failed
/ Make image (push) Failing after 23s
2026-04-05 20:21:16 +03:00
nquidox
a5aec1fca3 build files
Some checks failed
/ Make image (push) Failing after 23s
2026-04-05 20:16:44 +03:00
nquidox
2a3ba0e797 deps
Some checks failed
/ Make image (push) Failing after 13s
2026-04-03 11:46:39 +03:00
nquidox
5fe38af035 address format and deps change + logger option 2026-04-03 11:46:29 +03:00
nquidox
f70d53c75c uint16 type for port + env getter 2026-04-03 11:45:22 +03:00
nquidox
979c7c4a4f update 2026-04-03 11:44:44 +03:00
11 changed files with 183 additions and 65 deletions

View file

@ -33,8 +33,11 @@ jobs:
echo "VERSION=${VERSION}" >> $GITHUB_ENV
- name: Make image
env:
GIT_CREDENTIALS: https://${{ secrets.MAINTAINER_USERNAME }}:${{ secrets.MAINTAINER_TOKEN }}@repo.nqws.ru/
run: |
docker buildx build --platform linux/amd64 \
--secret id=git_creds,env=GIT_CREDENTIALS \
--tag repo.nqws.ru/${{ github.repository }}:latest \
--tag repo.nqws.ru/${{ github.repository }}:${{ env.VERSION }} \
--push .

21
Dockerfile Normal file
View file

@ -0,0 +1,21 @@
FROM golang:1.26.1-alpine3.23 AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags="-s -w" -o main "./cmd"
FROM alpine:3.23
RUN apk add --no-cache \
tzdata \
ca-certificates
COPY --from=builder /app/main /usr/local/bin/app
RUN chmod +x /usr/local/bin/app
ENTRYPOINT ["app"]

View file

@ -19,3 +19,4 @@ RABBIT_PORT=
RABBIT_USER=
RABBIT_PASS=
RABBIT_VHOST=
RABBIT_LOGGING_ENABLED=

View file

@ -28,11 +28,12 @@ type TasksSource struct {
}
type Rabbit struct {
Host string
Port string
User string
Pass string
Vhost string
Host string
Port uint16
User string
Pass string
Vhost string
LoggingEnabled bool
}
func NewConfig() Config {
@ -40,7 +41,7 @@ func NewConfig() Config {
App: App{
Mode: getEnv("APP_MODE", "dev"),
LogLvl: getEnv("APP_LOG_LVL", "debug"),
CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 20),
CheckPeriod: getEnvSeconds("APP_CHECK_PERIOD_SECONDS", 60*60*6),
ProcChanLen: getEnvUint("APP_PROCESSOR_CHANNEL_SIZE", 100),
},
@ -50,17 +51,20 @@ func NewConfig() Config {
},
TasksSource: TasksSource{
Host: getEnv("TASK_API_HOST", "127.0.0.1"),
Port: getEnv("TASK_API_PORT", "61000"),
//Host: getEnv("TASK_SOURCE_HOST", "127.0.0.1"),
//Port: getEnv("TASK_SOURCE_PORT", "61000"),
Host: getEnv("TASK_SOURCE_HOST", "10.0.0.1"),
Port: getEnv("TASK_SOURCE_PORT", "9099"),
Timeout: getEnvSeconds("TASK_SOURCE_TIMEOUT_SECONDS", 60),
},
Rabbit: Rabbit{
Host: getEnv("RABBIT_HOST", "10.0.0.4"),
Port: getEnv("RABBIT_PORT", "5672"),
User: getEnv("RABBIT_USER", "taskProcessorDev"),
Pass: getEnv("RABBIT_PASS", "pass1234"),
Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"),
Host: getEnv("RABBIT_HOST", "10.0.0.4"),
Port: getEnvPort("RABBIT_PORT", 5672),
User: getEnv("RABBIT_USER", "taskProcessorDev"),
Pass: getEnv("RABBIT_PASS", "pass1234"),
Vhost: getEnv("RABBIT_VHOST", "taskProcessorDevHost"),
LoggingEnabled: getEnvBool("RABBIT_LOGGING_ENABLED", true),
},
}
}

View file

@ -39,3 +39,25 @@ func getEnvUint(key string, fallback uint) uint {
}
return fallback
}
func getEnvPort(key string, fallback uint16) uint16 {
if value, ok := os.LookupEnv(key); ok {
num, err := strconv.ParseUint(value, 10, 16)
if err != nil {
return fallback
}
return uint16(num)
}
return fallback
}
func getEnvBool(key string, fallback bool) bool {
if value, ok := os.LookupEnv(key); ok {
val, err := strconv.ParseBool(value)
if err != nil {
return fallback
}
return val
}
return fallback
}

4
go.mod
View file

@ -6,7 +6,7 @@ require (
github.com/sirupsen/logrus v1.9.4
google.golang.org/grpc v1.80.0
google.golang.org/protobuf v1.36.11
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19
)
require (
@ -15,5 +15,5 @@ require (
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
golang.org/x/time v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect
)

8
go.sum
View file

@ -44,13 +44,13 @@ golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d h1:wT2n40TBqFY6wiwazVK9/iTWbsQrgk5ZfCSVFLO9LQA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15 h1:w+jvbwEnpGXxYI37iJvfTEYYNJ9ROUjBwlEOY7LJGxM=
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.15/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19 h1:IKxMEU0Awn8cnC+x9Ptw7t2lsK8WLr4MKSKq2RN/hJY=
repo.nqws.ru/merch-tracker-v2/mt-rabbit v0.1.19/go.mod h1:8PREIIYfA3UPigQNF+Hx+778/twVxJzI8bI8fcpVXEo=

View file

@ -33,7 +33,8 @@ func NewApp(ctx context.Context, cfg config.Config) *App {
Pass: cfg.Rabbit.Pass,
Vhost: cfg.Rabbit.Vhost,
},
ChanLen: cfg.App.ProcChanLen,
LoggingEnabled: cfg.Rabbit.LoggingEnabled,
ChanLen: cfg.App.ProcChanLen,
})
return &App{
@ -52,11 +53,13 @@ func (app *App) Run(ctx context.Context) error {
defer mainLoop.Stop()
go func() {
log.Info("Process tasks after start")
if err := app.processor.ProcessTasks(ctx); err != nil {
errChan <- err
}
for range mainLoop.C {
log.WithField("period", app.config.App.CheckPeriod).Info("Repeated process tasks")
if err := app.processor.ProcessTasks(ctx); err != nil {
errChan <- err
}

View file

@ -2,8 +2,6 @@ package processor
import (
"context"
"fmt"
"net"
"task-processor/internal/taskAgent"
)
@ -15,28 +13,22 @@ type handler struct {
type Addr struct {
Host string
Port string
Port uint16
User string
Pass string
Vhost string
}
type Deps struct {
Ctx context.Context
TA taskAgent.TaskAgent
Addr Addr
ChanLen uint
Ctx context.Context
TA taskAgent.TaskAgent
Addr Addr
ChanLen uint
LoggingEnabled bool
}
func NewHandler(deps Deps) Processor {
addr := makeAddr(deps.Addr)
return &handler{
service: newService(deps, addr),
service: newService(deps),
}
}
func makeAddr(addr Addr) string {
//"amqp://username:password@host:port/vhost"
return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost)
}

View file

@ -0,0 +1,10 @@
package processor
import "strings"
func linkIsValid(link string) bool {
if strings.HasPrefix(link, "http://") || strings.HasPrefix(link, "https://") {
return true
}
return false
}

View file

@ -3,7 +3,6 @@ package processor
import (
"context"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
"task-processor/internal/structs"
@ -12,18 +11,30 @@ import (
)
type service struct {
taskAgent taskAgent.TaskAgent
brokerAddr string
taskPublishers map[string]chan<- []byte
taskAgent taskAgent.TaskAgent
brokerAddr rabbit.Address
taskPublishers map[string]chan<- []byte
rabbitLoggingEnabled bool
}
func newService(deps Deps, addr string) *service {
return &service{
taskAgent: deps.TA,
brokerAddr: addr,
taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen),
func newService(deps Deps) *service {
ba := rabbit.Address{
Username: deps.Addr.User,
Password: deps.Addr.Pass,
Host: deps.Addr.Host,
Port: deps.Addr.Port,
Vhost: deps.Addr.Vhost,
}
s := &service{
taskAgent: deps.TA,
brokerAddr: ba,
rabbitLoggingEnabled: deps.LoggingEnabled,
}
s.makeTaskPublishers(deps.Ctx, ba, deps.ChanLen)
return s
}
func (s *service) ProcessTasks(ctx context.Context) error {
@ -50,7 +61,16 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
log.Debugf("%v Results sender start", pkgLogHeader)
runCtx, cancel := context.WithCancel(ctx)
consumerClient, err := rabbit.NewClient(s.brokerAddr, "tasks-results")
taskResults := rabbit.QueueOpts{
QueueName: "tasks-results",
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
consumerClient, err := rabbit.NewClient(s.brokerAddr, taskResults, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil {
cancel()
return err
@ -58,6 +78,7 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
resultsConsumer := rabbit.NewConsumer(consumerClient)
resultChan := resultsConsumer.Start(runCtx, chanLen)
log.Debugf("%v Results consumer started: %v", pkgLogHeader, taskResults.QueueName)
go func() {
defer cancel()
@ -98,6 +119,10 @@ func (s *service) SendResults(ctx context.Context, chanLen uint) error {
func (s *service) sendTasks(tasks []structs.Task) error {
for _, tsk := range tasks {
for origin, link := range tsk.Origins {
if !linkIsValid(link) {
continue
}
if origin == "surugaya" {
pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link)
}
@ -125,28 +150,65 @@ func (s *service) convertResult(b []byte) *structs.Result {
return &res
}
func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte {
origins := [...]string{
"surugaya",
"mandarake",
"amiami",
}
// TODO refactor this later: get origins from merch api and remove ctx pass via deps
func (s *service) makeTaskPublishers(ctx context.Context, addr rabbit.Address, chanLen uint) {
publishers := make(map[string]chan<- []byte)
for _, origin := range origins {
qn := fmt.Sprintf("task-publisher-%s", origin)
pubClient, err := rabbit.NewClient(addr, qn)
if err != nil {
log.WithError(err).Error("Failed to create publisher")
continue
}
publishers[origin] = rabbit.NewPublisher(pubClient).Start(ctx, chanLen)
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
// surugaya
surugayaOpts := rabbit.QueueOpts{
QueueName: "task-publisher-surugaya",
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
return publishers
surugayaClient, err := rabbit.NewClient(addr, surugayaOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil {
log.WithError(err).Error("Failed to create publisher")
}
publishers["surugaya"] = rabbit.NewPublisher(surugayaClient).Start(ctx, chanLen)
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, surugayaOpts.QueueName)
//mandarake
mandarakeOpts := rabbit.QueueOpts{
QueueName: "task-publisher-mandarake",
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
mandarakeClient, err := rabbit.NewClient(addr, mandarakeOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil {
log.WithError(err).Error("Failed to create publisher")
}
publishers["mandarake"] = rabbit.NewPublisher(mandarakeClient).Start(ctx, chanLen)
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, mandarakeOpts.QueueName)
//amiami
amiamiOpts := rabbit.QueueOpts{
QueueName: "task-publisher-amiami",
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
}
amiamiClient, err := rabbit.NewClient(addr, amiamiOpts, rabbit.WithLogging(s.rabbitLoggingEnabled))
if err != nil {
log.WithError(err).Error("Failed to create publisher")
}
publishers["amiami"] = rabbit.NewPublisher(amiamiClient).Start(ctx, chanLen)
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, amiamiOpts.QueueName)
s.taskPublishers = publishers
}
func pushTask(pubChan chan<- []byte, m, l string) {