package task import ( log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/emptypb" "io" "merch-api/internal/common" tt "merch-api/pkg/taskTransport/v1" "time" ) type service struct { tt.UnimplementedTaskProcessorServer merchProvider common.MerchProvider } func newService(mp common.MerchProvider) *service { return &service{ merchProvider: mp, } } func (s *service) RequestTasks(_ *emptypb.Empty, stream tt.TaskProcessor_RequestTasksServer) error { tasks, err := s.merchProvider.GetTasks(stream.Context()) if err != nil { log.WithError(err).Errorf("%v Failed to get tasks", pkgLogHeader) return err } for _, task := range tasks { if err = stream.Send(&tt.Task{ MerchUuid: task.MerchUuid, Origins: task.Origins, }); err != nil { log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) return err } } return nil } func (s *service) SendResults(stream tt.TaskProcessor_SendResultsServer) error { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() batch := make([]common.Result, 0) done := make(chan struct{}) go func() { for { select { case <-done: return case <-ticker.C: if len(batch) > 0 { err := s.merchProvider.InsertPrices(stream.Context(), batch) if err != nil { log.WithError(err).Errorf("%v Failed to batch insert result", pkgLogHeader) } } } } }() for { response, err := stream.Recv() if err == io.EOF { log.Debugf("%v gRPC EOF", pkgLogHeader) break } if err != nil { log.WithError(err).Errorf("%v Failed to receive", pkgLogHeader) return err } batch = append(batch, common.Result{ MerchUuid: response.MerchUuid, OriginName: response.OriginName, Price: response.Price, }) } close(done) if len(batch) > 0 { err := s.merchProvider.InsertPrices(stream.Context(), batch) if err != nil { log.WithError(err).Errorf("%v Failed to batch insert last data", pkgLogHeader) return err } } if err := stream.SendAndClose(&emptypb.Empty{}); err != nil { return err } return nil }