diff --git a/internal/taskAgent/handler.go b/internal/taskAgent/handler.go new file mode 100644 index 0000000..2bb2bfb --- /dev/null +++ b/internal/taskAgent/handler.go @@ -0,0 +1,40 @@ +package taskAgent + +import ( + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + tt "task-processor/pkg/taskTransport/v1" + "time" +) + +const pkgLogHeader string = "Task agent |" + +type Handler struct { + client tt.TaskProcessorClient + *service +} + +type Deps struct { + Addr string + Timeout time.Duration +} + +func NewHandler(deps Deps) *Handler { + var opts []grpc.DialOption + insec := grpc.WithTransportCredentials(insecure.NewCredentials()) + opts = append(opts, insec) + + conn, err := grpc.NewClient(deps.Addr, opts...) + if err != nil { + log.WithError(err).Fatalf("%v grpc connection failed", pkgLogHeader) + } + + client := tt.NewTaskProcessorClient(conn) + log.WithField("address", deps.Addr).Debugf("%v client", pkgLogHeader) + + return &Handler{ + client: client, + service: newService(client, deps.Timeout), + } +} diff --git a/internal/taskAgent/interface.go b/internal/taskAgent/interface.go new file mode 100644 index 0000000..37160cb --- /dev/null +++ b/internal/taskAgent/interface.go @@ -0,0 +1,8 @@ +package taskAgent + +import "context" + +type TaskAgent interface { + FetchTasks(ctx context.Context) ([]Task, error) + SendResults(ctx context.Context, results []Result) error +} diff --git a/internal/taskAgent/service.go b/internal/taskAgent/service.go new file mode 100644 index 0000000..bd435ec --- /dev/null +++ b/internal/taskAgent/service.go @@ -0,0 +1,84 @@ +package taskAgent + +import ( + "context" + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/emptypb" + "io" + tt "task-processor/pkg/taskTransport/v1" + "time" +) + +type service struct { + client tt.TaskProcessorClient + timeout time.Duration +} + +func newService(c tt.TaskProcessorClient, timeout time.Duration) *service { + return &service{ + client: c, + timeout: timeout, + } +} + +func (s *service) FetchTasks(ctx context.Context) (received []Task, err error) { + fetchCtx, cancel := context.WithTimeout(ctx, s.timeout) + defer cancel() + + stream, err := s.client.RequestTasks(fetchCtx, &emptypb.Empty{}) + if err != nil { + log.WithError(err).Errorf("%v failed to open stream", pkgLogHeader) + return nil, err + } + + for { + log.Infof("%v Receiving tasks", pkgLogHeader) + task, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.WithError(err).Errorf("%v failed to receive task", pkgLogHeader) + return nil, err + } + + received = append(received, Task{ + MerchUuid: task.MerchUuid, + Origins: task.Origins, + }) + log.WithField("Count", len(received)).Infof("%v End receiving tasks", pkgLogHeader) + } + + return received, nil +} + +func (s *service) SendResults(ctx context.Context, results []Result) error { + sendCtx, cancel := context.WithTimeout(ctx, s.timeout) + defer cancel() + + stream, err := s.client.SendResults(sendCtx) + if err != nil { + log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) + return err + } + + for _, result := range results { + toSend := tt.Result{ + MerchUuid: result.MerchUuid, + OriginName: result.OriginName, + Price: result.Price, + } + + if err = stream.Send(&toSend); err != nil { + log.WithError(err).Errorf("%v Failed to send results", pkgLogHeader) + return err + } + } + + if err = stream.CloseSend(); err != nil { + log.WithError(err).Errorf("%v Failed to close send stream", pkgLogHeader) + return err + } + + return nil +} diff --git a/internal/taskAgent/struct.go b/internal/taskAgent/struct.go new file mode 100644 index 0000000..441500f --- /dev/null +++ b/internal/taskAgent/struct.go @@ -0,0 +1,12 @@ +package taskAgent + +type Task struct { + MerchUuid string + Origins map[string]string +} + +type Result struct { + MerchUuid string + OriginName string + Price int32 +}