factor out tp methods
This commit is contained in:
parent
dae627f4ad
commit
e90852cc95
2 changed files with 97 additions and 90 deletions
|
|
@ -1,106 +1,18 @@
|
|||
package grpcService
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
"io"
|
||||
"merch-parser-api/internal/interfaces"
|
||||
"merch-parser-api/internal/shared"
|
||||
pb "merch-parser-api/proto/taskProcessor"
|
||||
"time"
|
||||
)
|
||||
|
||||
type repoServer struct {
|
||||
pb.UnimplementedTaskProcessorServer
|
||||
taskProvider interfaces.TaskProvider
|
||||
}
|
||||
|
||||
func NewGrpcServer(taskProvider interfaces.TaskProvider) *grpc.Server {
|
||||
srv := grpc.NewServer()
|
||||
|
||||
repoSrv := &repoServer{
|
||||
taskProvider: taskProvider,
|
||||
}
|
||||
|
||||
pb.RegisterTaskProcessorServer(srv, repoSrv)
|
||||
|
||||
return srv
|
||||
}
|
||||
|
||||
func (r *repoServer) RequestTask(_ *emptypb.Empty, stream pb.TaskProcessor_RequestTaskServer) error {
|
||||
tasks, err := r.taskProvider.PrepareTasks()
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Request task error")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, task := range tasks {
|
||||
if err = stream.Send(&pb.Task{
|
||||
MerchUuid: task.MerchUuid,
|
||||
OriginSurugayaLink: task.OriginSurugayaLink,
|
||||
OriginMandarakeLink: task.OriginMandarakeLink,
|
||||
}); err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Stream send error")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *repoServer) SendResult(stream pb.TaskProcessor_SendResultServer) error {
|
||||
saveInterval := time.Second * 2
|
||||
batch := make([]shared.TaskResult, 0)
|
||||
|
||||
ticker := time.NewTicker(saveInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if len(batch) > 0 {
|
||||
err := r.taskProvider.InsertPrices(batch)
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Batch insert")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
response, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
log.Debug("gRPC EOF")
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Receive")
|
||||
return err
|
||||
}
|
||||
|
||||
entry := shared.TaskResult{
|
||||
MerchUuid: response.MerchUuid,
|
||||
Origin: response.OriginName,
|
||||
Price: response.Price,
|
||||
}
|
||||
|
||||
batch = append(batch, entry)
|
||||
log.WithField("response", entry).Debug("gRPC Server | Receive success")
|
||||
}
|
||||
|
||||
close(done)
|
||||
if len(batch) > 0 {
|
||||
err := r.taskProvider.InsertPrices(batch)
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Last data batch insert")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
95
internal/grpcService/taskProcessor.go
Normal file
95
internal/grpcService/taskProcessor.go
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
package grpcService
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
"io"
|
||||
"merch-parser-api/internal/interfaces"
|
||||
"merch-parser-api/internal/shared"
|
||||
pb "merch-parser-api/proto/taskProcessor"
|
||||
"time"
|
||||
)
|
||||
|
||||
type repoServer struct {
|
||||
pb.UnimplementedTaskProcessorServer
|
||||
taskProvider interfaces.TaskProvider
|
||||
}
|
||||
|
||||
func (r *repoServer) RequestTask(_ *emptypb.Empty, stream pb.TaskProcessor_RequestTaskServer) error {
|
||||
tasks, err := r.taskProvider.PrepareTasks()
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Request task error")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, task := range tasks {
|
||||
if err = stream.Send(&pb.Task{
|
||||
MerchUuid: task.MerchUuid,
|
||||
OriginSurugayaLink: task.OriginSurugayaLink,
|
||||
OriginMandarakeLink: task.OriginMandarakeLink,
|
||||
}); err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Stream send error")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *repoServer) SendResult(stream pb.TaskProcessor_SendResultServer) error {
|
||||
saveInterval := time.Second * 2
|
||||
batch := make([]shared.TaskResult, 0)
|
||||
|
||||
ticker := time.NewTicker(saveInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if len(batch) > 0 {
|
||||
err := r.taskProvider.InsertPrices(batch)
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Batch insert")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
response, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
log.Debug("gRPC EOF")
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Receive")
|
||||
return err
|
||||
}
|
||||
|
||||
entry := shared.TaskResult{
|
||||
MerchUuid: response.MerchUuid,
|
||||
Origin: response.OriginName,
|
||||
Price: response.Price,
|
||||
}
|
||||
|
||||
batch = append(batch, entry)
|
||||
log.WithField("response", entry).Debug("gRPC Server | Receive success")
|
||||
}
|
||||
|
||||
close(done)
|
||||
if len(batch) > 0 {
|
||||
err := r.taskProvider.InsertPrices(batch)
|
||||
if err != nil {
|
||||
log.WithField("err", err).Error("gRPC Server | Last data batch insert")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue