package created
This commit is contained in:
parent
353dcd6fe6
commit
71e2e1b7b1
5 changed files with 216 additions and 1 deletions
42
internal/processor/handler.go
Normal file
42
internal/processor/handler.go
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"task-processor/internal/taskAgent"
|
||||
)
|
||||
|
||||
const pkgLogHeader string = "Processor |"
|
||||
|
||||
type handler struct {
|
||||
*service
|
||||
}
|
||||
|
||||
type Addr struct {
|
||||
Host string
|
||||
Port string
|
||||
User string
|
||||
Pass string
|
||||
Vhost string
|
||||
}
|
||||
|
||||
type Deps struct {
|
||||
Ctx context.Context
|
||||
TA taskAgent.TaskAgent
|
||||
Addr Addr
|
||||
ChanLen uint
|
||||
}
|
||||
|
||||
func NewHandler(deps Deps) Processor {
|
||||
addr := makeAddr(deps.Addr)
|
||||
|
||||
return &handler{
|
||||
service: newService(deps, addr),
|
||||
}
|
||||
}
|
||||
|
||||
func makeAddr(addr Addr) string {
|
||||
//"amqp://username:password@host:port/vhost"
|
||||
return fmt.Sprintf("amqp://%v:%v@%v/%v", addr.User, addr.Pass, net.JoinHostPort(addr.Host, addr.Port), addr.Vhost)
|
||||
}
|
||||
10
internal/processor/interface.go
Normal file
10
internal/processor/interface.go
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type Processor interface {
|
||||
ProcessTasks(ctx context.Context) error
|
||||
SendResults(ctx context.Context, chanLen uint) error
|
||||
}
|
||||
6
internal/processor/model.go
Normal file
6
internal/processor/model.go
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
package processor
|
||||
|
||||
type task struct {
|
||||
MerchUuid string `json:"merch_uuid"`
|
||||
Link string `json:"link"`
|
||||
}
|
||||
157
internal/processor/service.go
Normal file
157
internal/processor/service.go
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
rabbit "repo.nqws.ru/merch-tracker-v2/mt-rabbit"
|
||||
"task-processor/internal/structs"
|
||||
"task-processor/internal/taskAgent"
|
||||
"time"
|
||||
)
|
||||
|
||||
type service struct {
|
||||
taskAgent taskAgent.TaskAgent
|
||||
brokerAddr string
|
||||
taskPublishers map[string]chan<- []byte
|
||||
}
|
||||
|
||||
func newService(deps Deps, addr string) *service {
|
||||
|
||||
return &service{
|
||||
taskAgent: deps.TA,
|
||||
brokerAddr: addr,
|
||||
taskPublishers: makeTaskPublishers(deps.Ctx, addr, deps.ChanLen),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) ProcessTasks(ctx context.Context) error {
|
||||
runCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
log.Infof("%v Processing tasks", pkgLogHeader)
|
||||
|
||||
fetchTasks, err := s.taskAgent.FetchTasks(runCtx)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("%v Failed to fetch tasks", pkgLogHeader)
|
||||
return err
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 5) //wait for connect
|
||||
|
||||
if err = s.sendTasks(fetchTasks); err != nil {
|
||||
log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) SendResults(ctx context.Context, chanLen uint) error {
|
||||
runCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
resultsConsumer := rabbit.NewConsumer(rabbit.NewClient(s.brokerAddr, "tasks-results"))
|
||||
resultChan := resultsConsumer.Start(runCtx, chanLen)
|
||||
|
||||
go func() {
|
||||
sendTicker := time.NewTicker(2 * time.Second)
|
||||
defer sendTicker.Stop()
|
||||
|
||||
var sendResults []structs.Result
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-runCtx.Done():
|
||||
return
|
||||
case result := <-resultChan:
|
||||
r := s.convertResult(result)
|
||||
if r == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
sendResults = append(sendResults, *r)
|
||||
|
||||
case <-sendTicker.C:
|
||||
l := len(sendResults)
|
||||
if l > 0 {
|
||||
log.Printf("%v Sending results: %v", pkgLogHeader, sendResults)
|
||||
|
||||
if err := s.taskAgent.SendResults(runCtx, sendResults); err != nil {
|
||||
log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader)
|
||||
}
|
||||
|
||||
sendResults = sendResults[:0]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) sendTasks(tasks []structs.Task) error {
|
||||
for _, tsk := range tasks {
|
||||
for origin, link := range tsk.Origins {
|
||||
if origin == "surugaya" {
|
||||
pushTask(s.taskPublishers["surugaya"], tsk.MerchUuid, link)
|
||||
}
|
||||
|
||||
if origin == "mandarake" {
|
||||
pushTask(s.taskPublishers["mandarake"], tsk.MerchUuid, link)
|
||||
}
|
||||
|
||||
if origin == "amiami" {
|
||||
pushTask(s.taskPublishers["amiami"], tsk.MerchUuid, link)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) convertResult(b []byte) *structs.Result {
|
||||
var res *structs.Result
|
||||
|
||||
if err := json.Unmarshal(b, res); err != nil {
|
||||
log.WithError(err).Error("Failed to unmarshal result")
|
||||
return nil
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func makeTaskPublishers(ctx context.Context, addr string, chanLen uint) map[string]chan<- []byte {
|
||||
origins := [...]string{
|
||||
"surugaya",
|
||||
"mandarake",
|
||||
"amiami",
|
||||
}
|
||||
|
||||
publishers := make(map[string]chan<- []byte)
|
||||
|
||||
for _, origin := range origins {
|
||||
qn := fmt.Sprintf("task-publisher-%s", origin)
|
||||
publishers[origin] = rabbit.NewPublisher(rabbit.NewClient(addr, qn)).Start(ctx, chanLen)
|
||||
log.Debugf("%v Publisher queue created: %v", pkgLogHeader, qn)
|
||||
}
|
||||
|
||||
return publishers
|
||||
}
|
||||
|
||||
func pushTask(pubChan chan<- []byte, m, l string) {
|
||||
log.Debugf("%v Pushing task: %v", pkgLogHeader, m)
|
||||
t := task{
|
||||
MerchUuid: m,
|
||||
Link: l,
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(t)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("%v Failed to marshal task", pkgLogHeader)
|
||||
return
|
||||
}
|
||||
|
||||
pubChan <- bytes
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue