package grpcService import ( log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "io" "merch-parser-api/internal/interfaces" "merch-parser-api/internal/shared" pb "merch-parser-api/proto/taskProcessor" "time" ) type repoServer struct { pb.UnimplementedTaskProcessorServer taskProvider interfaces.TaskProvider } func NewGrpcServer(taskProvider interfaces.TaskProvider) *grpc.Server { srv := grpc.NewServer() repoSrv := &repoServer{ taskProvider: taskProvider, } pb.RegisterTaskProcessorServer(srv, repoSrv) return srv } func (r *repoServer) RequestTask(_ *emptypb.Empty, stream pb.TaskProcessor_RequestTaskServer) error { tasks, err := r.taskProvider.PrepareTasks() if err != nil { log.WithField("err", err).Error("gRPC Server | Request task error") return err } for _, task := range tasks { if err = stream.Send(&pb.Task{ MerchUuid: task.MerchUuid, OriginSurugayaLink: task.OriginSurugayaLink, OriginMandarakeLink: task.OriginMandarakeLink, }); err != nil { log.WithField("err", err).Error("gRPC Server | Stream send error") return err } } return nil } func (r *repoServer) SendResult(stream pb.TaskProcessor_SendResultServer) error { saveInterval := time.Second * 2 batch := make([]shared.TaskResult, 0) ticker := time.NewTicker(saveInterval) defer ticker.Stop() done := make(chan struct{}) go func() { for { select { case <-done: return case <-ticker.C: if len(batch) > 0 { err := r.taskProvider.InsertPrices(batch) if err != nil { log.WithField("err", err).Error("gRPC Server | Batch insert") } } } } }() for { response, err := stream.Recv() if err == io.EOF { log.Debug("gRPC EOF") break } if err != nil { log.WithField("err", err).Error("gRPC Server | Receive") return err } entry := shared.TaskResult{ MerchUuid: response.MerchUuid, Origin: response.OriginName, Price: response.Price, } batch = append(batch, entry) log.WithField("response", entry).Debug("gRPC Server | Receive success") } close(done) if len(batch) > 0 { err := r.taskProvider.InsertPrices(batch) if err != nil { log.WithField("err", err).Error("gRPC Server | Last data batch insert") return err } } return nil }