package taskAgent import ( "context" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/emptypb" "io" "task-processor/internal/structs" tt "task-processor/pkg/taskTransport/v1" "time" ) type service struct { client tt.TaskProcessorClient timeout time.Duration } func newService(c tt.TaskProcessorClient, timeout time.Duration) *service { return &service{ client: c, timeout: timeout, } } func (s *service) FetchTasks(ctx context.Context) (received []structs.Task, err error) { fetchCtx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() stream, err := s.client.RequestTasks(fetchCtx, &emptypb.Empty{}) if err != nil { log.WithError(err).Errorf("%v failed to open stream", pkgLogHeader) return nil, err } log.Infof("%v Receiving tasks", pkgLogHeader) for { task, err := stream.Recv() if err == io.EOF { break } if err != nil { log.WithError(err).Errorf("%v failed to receive task", pkgLogHeader) return nil, err } received = append(received, structs.Task{ MerchUuid: task.MerchUuid, Origins: task.Origins, }) } log.WithField("Count", len(received)).Infof("%v End receiving tasks", pkgLogHeader) return received, nil } func (s *service) SendResults(ctx context.Context, results []structs.Result) error { sendCtx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() stream, err := s.client.SendResults(sendCtx) if err != nil { log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) return err } for _, result := range results { toSend := tt.Result{ MerchUuid: result.MerchUuid, OriginName: result.OriginName, Price: result.Price, } if err = stream.Send(&toSend); err != nil { log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) return err } } if err = stream.CloseSend(); err != nil { log.WithError(err).Errorf("%v Failed to close send stream", pkgLogHeader) return err } if _, err = stream.CloseAndRecv(); err != nil { log.WithError(err).Errorf("%v Failed to receive server response", pkgLogHeader) return err } log.Debugf("%v End send results", pkgLogHeader) return nil }