diff --git a/internal/task/service.go b/internal/task/service.go index 38da458..39cba20 100644 --- a/internal/task/service.go +++ b/internal/task/service.go @@ -1,11 +1,13 @@ package task import ( + "context" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/emptypb" "io" "merch-api/internal/common" tt "merch-api/pkg/taskTransport/v1" + "sync" "time" ) @@ -41,23 +43,37 @@ func (s *service) RequestTasks(_ *emptypb.Empty, stream tt.TaskProcessor_Request } func (s *service) SendResults(stream tt.TaskProcessor_SendResultsServer) error { + ctx, cancel := context.WithCancel(context.Background()) + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() + mu := &sync.Mutex{} batch := make([]common.Result, 0) - done := make(chan struct{}) + + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) go func() { for { select { - case <-done: + case <-ctx.Done(): return case <-ticker.C: - if len(batch) > 0 { - err := s.merchProvider.InsertPrices(stream.Context(), batch) - if err != nil { - log.WithError(err).Errorf("%v Failed to batch insert result", pkgLogHeader) - } + mu.Lock() + if len(batch) == 0 { + mu.Unlock() + continue + } + + toInsert := make([]common.Result, len(batch)) + copy(toInsert, batch) + batch = batch[:0] + mu.Unlock() + + if err := s.merchProvider.InsertPrices(ctx, toInsert); err != nil { + log.WithError(err).Errorf("%v Failed to batch insert result", pkgLogHeader) } } } @@ -72,28 +88,36 @@ func (s *service) SendResults(stream tt.TaskProcessor_SendResultsServer) error { if err != nil { log.WithError(err).Errorf("%v Failed to receive", pkgLogHeader) + cancel() return err } + mu.Lock() batch = append(batch, common.Result{ MerchUuid: response.MerchUuid, OriginName: response.OriginName, Price: response.Price, }) + mu.Unlock() } - close(done) - if len(batch) > 0 { - err := s.merchProvider.InsertPrices(stream.Context(), batch) + close(stop) + wg.Wait() + + mu.Lock() + finalBatch := make([]common.Result, len(batch)) + copy(finalBatch, batch) + mu.Unlock() + + if len(finalBatch) > 0 { + err := s.merchProvider.InsertPrices(ctx, finalBatch) if err != nil { log.WithError(err).Errorf("%v Failed to batch insert last data", pkgLogHeader) + cancel() return err } } - if err := stream.SendAndClose(&emptypb.Empty{}); err != nil { - return err - } - - return nil + cancel() + return stream.SendAndClose(&emptypb.Empty{}) }