96 lines
2 KiB
Go
96 lines
2 KiB
Go
|
|
package grpcService
|
||
|
|
|
||
|
|
import (
|
||
|
|
log "github.com/sirupsen/logrus"
|
||
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
||
|
|
"io"
|
||
|
|
"merch-parser-api/internal/interfaces"
|
||
|
|
"merch-parser-api/internal/shared"
|
||
|
|
pb "merch-parser-api/proto/taskProcessor"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
type repoServer struct {
|
||
|
|
pb.UnimplementedTaskProcessorServer
|
||
|
|
taskProvider interfaces.TaskProvider
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *repoServer) RequestTask(_ *emptypb.Empty, stream pb.TaskProcessor_RequestTaskServer) error {
|
||
|
|
tasks, err := r.taskProvider.PrepareTasks()
|
||
|
|
if err != nil {
|
||
|
|
log.WithField("err", err).Error("gRPC Server | Request task error")
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, task := range tasks {
|
||
|
|
if err = stream.Send(&pb.Task{
|
||
|
|
MerchUuid: task.MerchUuid,
|
||
|
|
OriginSurugayaLink: task.OriginSurugayaLink,
|
||
|
|
OriginMandarakeLink: task.OriginMandarakeLink,
|
||
|
|
}); err != nil {
|
||
|
|
log.WithField("err", err).Error("gRPC Server | Stream send error")
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (r *repoServer) SendResult(stream pb.TaskProcessor_SendResultServer) error {
|
||
|
|
saveInterval := time.Second * 2
|
||
|
|
batch := make([]shared.TaskResult, 0)
|
||
|
|
|
||
|
|
ticker := time.NewTicker(saveInterval)
|
||
|
|
defer ticker.Stop()
|
||
|
|
|
||
|
|
done := make(chan struct{})
|
||
|
|
|
||
|
|
go func() {
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
return
|
||
|
|
case <-ticker.C:
|
||
|
|
if len(batch) > 0 {
|
||
|
|
err := r.taskProvider.InsertPrices(batch)
|
||
|
|
if err != nil {
|
||
|
|
log.WithField("err", err).Error("gRPC Server | Batch insert")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
for {
|
||
|
|
response, err := stream.Recv()
|
||
|
|
if err == io.EOF {
|
||
|
|
log.Debug("gRPC EOF")
|
||
|
|
break
|
||
|
|
}
|
||
|
|
|
||
|
|
if err != nil {
|
||
|
|
log.WithField("err", err).Error("gRPC Server | Receive")
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
entry := shared.TaskResult{
|
||
|
|
MerchUuid: response.MerchUuid,
|
||
|
|
Origin: response.OriginName,
|
||
|
|
Price: response.Price,
|
||
|
|
}
|
||
|
|
|
||
|
|
batch = append(batch, entry)
|
||
|
|
log.WithField("response", entry).Debug("gRPC Server | Receive success")
|
||
|
|
}
|
||
|
|
|
||
|
|
close(done)
|
||
|
|
if len(batch) > 0 {
|
||
|
|
err := r.taskProvider.InsertPrices(batch)
|
||
|
|
if err != nil {
|
||
|
|
log.WithField("err", err).Error("gRPC Server | Last data batch insert")
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|