package task import ( "context" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/emptypb" "io" "merch-api/internal/common" tt "merch-api/pkg/taskTransport/v1" "sync" "time" ) type service struct { tt.UnimplementedTaskProcessorServer merchProvider common.MerchProvider } func newService(mp common.MerchProvider) *service { return &service{ merchProvider: mp, } } func (s *service) RequestTasks(_ *emptypb.Empty, stream tt.TaskProcessor_RequestTasksServer) error { tasks, err := s.merchProvider.GetTasks(stream.Context()) if err != nil { log.WithError(err).Errorf("%v Failed to get tasks", pkgLogHeader) return err } for _, task := range tasks { if err = stream.Send(&tt.Task{ MerchUuid: task.MerchUuid, Origins: task.Origins, }); err != nil { log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) return err } } return nil } func (s *service) SendResults(stream tt.TaskProcessor_SendResultsServer) error { ctx, cancel := context.WithCancel(context.Background()) ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() mu := &sync.Mutex{} batch := make([]common.Result, 0) stop := make(chan struct{}) var wg sync.WaitGroup wg.Add(1) go func() { for { select { case <-ctx.Done(): return case <-ticker.C: mu.Lock() if len(batch) == 0 { mu.Unlock() continue } toInsert := make([]common.Result, len(batch)) copy(toInsert, batch) batch = batch[:0] mu.Unlock() if err := s.merchProvider.InsertPrices(ctx, toInsert); err != nil { log.WithError(err).Errorf("%v Failed to batch insert result", pkgLogHeader) } } } }() for { response, err := stream.Recv() if err == io.EOF { log.Debugf("%v gRPC EOF", pkgLogHeader) break } if err != nil { log.WithError(err).Errorf("%v Failed to receive", pkgLogHeader) cancel() return err } mu.Lock() batch = append(batch, common.Result{ MerchUuid: response.MerchUuid, OriginName: response.OriginName, Price: response.Price, }) mu.Unlock() } close(stop) wg.Wait() mu.Lock() finalBatch := make([]common.Result, len(batch)) copy(finalBatch, batch) mu.Unlock() if len(finalBatch) > 0 { err := s.merchProvider.InsertPrices(ctx, finalBatch) if err != nil { log.WithError(err).Errorf("%v Failed to batch insert last data", pkgLogHeader) cancel() return err } } cancel() return stream.SendAndClose(&emptypb.Empty{}) }