From e90852cc950a5845bff08705c8ef2ffb6ade0a3f Mon Sep 17 00:00:00 2001 From: nquidox Date: Sun, 26 Oct 2025 19:53:14 +0300 Subject: [PATCH] factor out tp methods --- internal/grpcService/handler.go | 92 +------------------------- internal/grpcService/taskProcessor.go | 95 +++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 90 deletions(-) create mode 100644 internal/grpcService/taskProcessor.go diff --git a/internal/grpcService/handler.go b/internal/grpcService/handler.go index 6af20cf..1ec9841 100644 --- a/internal/grpcService/handler.go +++ b/internal/grpcService/handler.go @@ -1,106 +1,18 @@ package grpcService import ( - log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/emptypb" - "io" "merch-parser-api/internal/interfaces" - "merch-parser-api/internal/shared" pb "merch-parser-api/proto/taskProcessor" - "time" ) -type repoServer struct { - pb.UnimplementedTaskProcessorServer - taskProvider interfaces.TaskProvider -} - func NewGrpcServer(taskProvider interfaces.TaskProvider) *grpc.Server { srv := grpc.NewServer() + repoSrv := &repoServer{ taskProvider: taskProvider, } - pb.RegisterTaskProcessorServer(srv, repoSrv) + return srv } - -func (r *repoServer) RequestTask(_ *emptypb.Empty, stream pb.TaskProcessor_RequestTaskServer) error { - tasks, err := r.taskProvider.PrepareTasks() - if err != nil { - log.WithField("err", err).Error("gRPC Server | Request task error") - return err - } - - for _, task := range tasks { - if err = stream.Send(&pb.Task{ - MerchUuid: task.MerchUuid, - OriginSurugayaLink: task.OriginSurugayaLink, - OriginMandarakeLink: task.OriginMandarakeLink, - }); err != nil { - log.WithField("err", err).Error("gRPC Server | Stream send error") - return err - } - } - return nil -} - -func (r *repoServer) SendResult(stream pb.TaskProcessor_SendResultServer) error { - saveInterval := time.Second * 2 - batch := make([]shared.TaskResult, 0) - - ticker := time.NewTicker(saveInterval) - defer ticker.Stop() - - done := make(chan struct{}) - - go func() { - for { - select { - case <-done: - return - case <-ticker.C: - if len(batch) > 0 { - err := r.taskProvider.InsertPrices(batch) - if err != nil { - log.WithField("err", err).Error("gRPC Server | Batch insert") - } - } - } - } - }() - - for { - response, err := stream.Recv() - if err == io.EOF { - log.Debug("gRPC EOF") - break - } - - if err != nil { - log.WithField("err", err).Error("gRPC Server | Receive") - return err - } - - entry := shared.TaskResult{ - MerchUuid: response.MerchUuid, - Origin: response.OriginName, - Price: response.Price, - } - - batch = append(batch, entry) - log.WithField("response", entry).Debug("gRPC Server | Receive success") - } - - close(done) - if len(batch) > 0 { - err := r.taskProvider.InsertPrices(batch) - if err != nil { - log.WithField("err", err).Error("gRPC Server | Last data batch insert") - return err - } - } - - return nil -} diff --git a/internal/grpcService/taskProcessor.go b/internal/grpcService/taskProcessor.go new file mode 100644 index 0000000..93986bb --- /dev/null +++ b/internal/grpcService/taskProcessor.go @@ -0,0 +1,95 @@ +package grpcService + +import ( + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/emptypb" + "io" + "merch-parser-api/internal/interfaces" + "merch-parser-api/internal/shared" + pb "merch-parser-api/proto/taskProcessor" + "time" +) + +type repoServer struct { + pb.UnimplementedTaskProcessorServer + taskProvider interfaces.TaskProvider +} + +func (r *repoServer) RequestTask(_ *emptypb.Empty, stream pb.TaskProcessor_RequestTaskServer) error { + tasks, err := r.taskProvider.PrepareTasks() + if err != nil { + log.WithField("err", err).Error("gRPC Server | Request task error") + return err + } + + for _, task := range tasks { + if err = stream.Send(&pb.Task{ + MerchUuid: task.MerchUuid, + OriginSurugayaLink: task.OriginSurugayaLink, + OriginMandarakeLink: task.OriginMandarakeLink, + }); err != nil { + log.WithField("err", err).Error("gRPC Server | Stream send error") + return err + } + } + return nil +} + +func (r *repoServer) SendResult(stream pb.TaskProcessor_SendResultServer) error { + saveInterval := time.Second * 2 + batch := make([]shared.TaskResult, 0) + + ticker := time.NewTicker(saveInterval) + defer ticker.Stop() + + done := make(chan struct{}) + + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + if len(batch) > 0 { + err := r.taskProvider.InsertPrices(batch) + if err != nil { + log.WithField("err", err).Error("gRPC Server | Batch insert") + } + } + } + } + }() + + for { + response, err := stream.Recv() + if err == io.EOF { + log.Debug("gRPC EOF") + break + } + + if err != nil { + log.WithField("err", err).Error("gRPC Server | Receive") + return err + } + + entry := shared.TaskResult{ + MerchUuid: response.MerchUuid, + Origin: response.OriginName, + Price: response.Price, + } + + batch = append(batch, entry) + log.WithField("response", entry).Debug("gRPC Server | Receive success") + } + + close(done) + if len(batch) > 0 { + err := r.taskProvider.InsertPrices(batch) + if err != nil { + log.WithField("err", err).Error("gRPC Server | Last data batch insert") + return err + } + } + + return nil +}