diff --git a/internal/app/handler.go b/internal/app/handler.go index 6b09c72..00c3535 100644 --- a/internal/app/handler.go +++ b/internal/app/handler.go @@ -7,6 +7,7 @@ import ( log "github.com/sirupsen/logrus" "merch-api/config" "merch-api/internal/merch" + "merch-api/internal/task" "merch-api/internal/user" "merch-api/pkg/dbase" "merch-api/pkg/router" @@ -21,6 +22,7 @@ type App struct { router *router.Router modules []Module dbPool *pgxpool.Pool + tasker *task.Handler } func New(ctx context.Context, cfg config.Config) *App { @@ -34,6 +36,9 @@ func New(ctx context.Context, cfg config.Config) *App { Password: cfg.DBase.Password, DBName: cfg.DBase.DBName, }) + if err != nil { + log.WithError(err).Fatalf("%v failed to connect database", pkgLogHeader) + } //providers with deps userProv := user.New(user.Deps{ @@ -49,10 +54,6 @@ func New(ctx context.Context, cfg config.Config) *App { UserProvider: userProv, }) - if err != nil { - log.WithError(err).Fatalf("%v failed to connect database", pkgLogHeader) - } - //modules var modules []Module @@ -63,11 +64,17 @@ func New(ctx context.Context, cfg config.Config) *App { }) modules = append(modules, m) + tasker := task.New(task.Deps{ + Addr: "", + MerchProvider: m, + }) + return &App{ cfg: cfg, router: r, modules: modules, dbPool: dbPool, + tasker: tasker, } } @@ -85,6 +92,12 @@ func (app *App) Run(ctx context.Context) error { } }() + go func() { + if err := app.tasker.Serve(); err != nil { + errCh <- err + } + }() + select { case <-ctx.Done(): app.shutdown(ctx) @@ -106,6 +119,8 @@ func (app *App) shutdown(ctx context.Context) { log.WithError(err).Warnf("%v error shutting down application", pkgLogHeader) } + app.tasker.Shutdown() + log.Infof("%v shutdown complete", pkgLogHeader) } diff --git a/internal/task/handler.go b/internal/task/handler.go new file mode 100644 index 0000000..eae255c --- /dev/null +++ b/internal/task/handler.go @@ -0,0 +1,56 @@ +package task + +import ( + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "merch-api/internal/common" + taskTransportV1 "merch-api/pkg/taskTransport/v1" + "net" +) + +const pkgLogHeader string = "Tasker |" + +type Handler struct { + addr string + srv *grpc.Server + *service +} + +type Deps struct { + Addr string + MerchProvider common.MerchProvider +} + +func New(deps Deps) *Handler { + srv := grpc.NewServer() + + handler := &Handler{ + addr: deps.Addr, + service: newService(deps.MerchProvider), + } + + taskTransportV1.RegisterTaskProcessorServer(srv, handler) + + return handler +} + +func (h *Handler) Serve() error { + listener, err := net.Listen("tcp", h.addr) + if err != nil { + log.WithError(err).Errorf("%v Listner failure", pkgLogHeader) + return err + } + + if err = h.srv.Serve(listener); err != nil { + log.WithError(err).Errorf("%v Serve failure", pkgLogHeader) + return err + } + + log.Infof("%v gRPC server started on: %v", pkgLogHeader, h.addr) + return nil +} + +func (h *Handler) Shutdown() { + h.srv.GracefulStop() + log.Infof("%v gRPC server shutdown", pkgLogHeader) +} diff --git a/internal/task/service.go b/internal/task/service.go new file mode 100644 index 0000000..947aab6 --- /dev/null +++ b/internal/task/service.go @@ -0,0 +1,44 @@ +package task + +import ( + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/emptypb" + "merch-api/internal/common" + tt "merch-api/pkg/taskTransport/v1" +) + +type service struct { + tt.UnimplementedTaskProcessorServer + merchProvider common.MerchProvider +} + +func newService(mp common.MerchProvider) *service { + return &service{ + merchProvider: mp, + } +} + +func (s *service) RequestTasks(_ *emptypb.Empty, stream tt.TaskProcessor_RequestTasksServer) error { + tasks, err := s.merchProvider.GetTasks(stream.Context()) + if err != nil { + log.WithError(err).Errorf("%v Failed to get tasks", pkgLogHeader) + return err + } + + for _, task := range tasks { + if err = stream.Send(&tt.Task{ + MerchUuid: task.MerchUuid, + Origins: task.Origins, + }); err != nil { + log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader) + return err + } + } + + return nil +} + +func (s *service) SendResults(stream tt.TaskProcessor_SendResultsServer) error { + //TODO + return nil +}