From bf557255794d0213b8d4b9f1e8d04cd47a166d84 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sun, 8 Mar 2026 16:53:13 +0300 Subject: [PATCH] SendResults impl --- internal/task/service.go | 57 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/internal/task/service.go b/internal/task/service.go index 947aab6..38da458 100644 --- a/internal/task/service.go +++ b/internal/task/service.go @@ -3,8 +3,10 @@ package task import ( log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/emptypb" + "io" "merch-api/internal/common" tt "merch-api/pkg/taskTransport/v1" + "time" ) type service struct { @@ -39,6 +41,59 @@ func (s *service) RequestTasks(_ *emptypb.Empty, stream tt.TaskProcessor_Request } func (s *service) SendResults(stream tt.TaskProcessor_SendResultsServer) error { - //TODO + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + batch := make([]common.Result, 0) + done := make(chan struct{}) + + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + if len(batch) > 0 { + err := s.merchProvider.InsertPrices(stream.Context(), batch) + if err != nil { + log.WithError(err).Errorf("%v Failed to batch insert result", pkgLogHeader) + } + } + } + } + }() + + for { + response, err := stream.Recv() + if err == io.EOF { + log.Debugf("%v gRPC EOF", pkgLogHeader) + break + } + + if err != nil { + log.WithError(err).Errorf("%v Failed to receive", pkgLogHeader) + return err + } + + batch = append(batch, common.Result{ + MerchUuid: response.MerchUuid, + OriginName: response.OriginName, + Price: response.Price, + }) + } + + close(done) + if len(batch) > 0 { + err := s.merchProvider.InsertPrices(stream.Context(), batch) + if err != nil { + log.WithError(err).Errorf("%v Failed to batch insert last data", pkgLogHeader) + return err + } + } + + if err := stream.SendAndClose(&emptypb.Empty{}); err != nil { + return err + } + return nil }