task handler
This commit is contained in:
parent
03bcda8eab
commit
338802339c
3 changed files with 119 additions and 4 deletions
|
|
@ -7,6 +7,7 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
"merch-api/config"
|
||||
"merch-api/internal/merch"
|
||||
"merch-api/internal/task"
|
||||
"merch-api/internal/user"
|
||||
"merch-api/pkg/dbase"
|
||||
"merch-api/pkg/router"
|
||||
|
|
@ -21,6 +22,7 @@ type App struct {
|
|||
router *router.Router
|
||||
modules []Module
|
||||
dbPool *pgxpool.Pool
|
||||
tasker *task.Handler
|
||||
}
|
||||
|
||||
func New(ctx context.Context, cfg config.Config) *App {
|
||||
|
|
@ -34,6 +36,9 @@ func New(ctx context.Context, cfg config.Config) *App {
|
|||
Password: cfg.DBase.Password,
|
||||
DBName: cfg.DBase.DBName,
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Fatalf("%v failed to connect database", pkgLogHeader)
|
||||
}
|
||||
|
||||
//providers with deps
|
||||
userProv := user.New(user.Deps{
|
||||
|
|
@ -49,10 +54,6 @@ func New(ctx context.Context, cfg config.Config) *App {
|
|||
UserProvider: userProv,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Fatalf("%v failed to connect database", pkgLogHeader)
|
||||
}
|
||||
|
||||
//modules
|
||||
var modules []Module
|
||||
|
||||
|
|
@ -63,11 +64,17 @@ func New(ctx context.Context, cfg config.Config) *App {
|
|||
})
|
||||
modules = append(modules, m)
|
||||
|
||||
tasker := task.New(task.Deps{
|
||||
Addr: "",
|
||||
MerchProvider: m,
|
||||
})
|
||||
|
||||
return &App{
|
||||
cfg: cfg,
|
||||
router: r,
|
||||
modules: modules,
|
||||
dbPool: dbPool,
|
||||
tasker: tasker,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -85,6 +92,12 @@ func (app *App) Run(ctx context.Context) error {
|
|||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
if err := app.tasker.Serve(); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
app.shutdown(ctx)
|
||||
|
|
@ -106,6 +119,8 @@ func (app *App) shutdown(ctx context.Context) {
|
|||
log.WithError(err).Warnf("%v error shutting down application", pkgLogHeader)
|
||||
}
|
||||
|
||||
app.tasker.Shutdown()
|
||||
|
||||
log.Infof("%v shutdown complete", pkgLogHeader)
|
||||
}
|
||||
|
||||
|
|
|
|||
56
internal/task/handler.go
Normal file
56
internal/task/handler.go
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"merch-api/internal/common"
|
||||
taskTransportV1 "merch-api/pkg/taskTransport/v1"
|
||||
"net"
|
||||
)
|
||||
|
||||
const pkgLogHeader string = "Tasker |"
|
||||
|
||||
type Handler struct {
|
||||
addr string
|
||||
srv *grpc.Server
|
||||
*service
|
||||
}
|
||||
|
||||
type Deps struct {
|
||||
Addr string
|
||||
MerchProvider common.MerchProvider
|
||||
}
|
||||
|
||||
func New(deps Deps) *Handler {
|
||||
srv := grpc.NewServer()
|
||||
|
||||
handler := &Handler{
|
||||
addr: deps.Addr,
|
||||
service: newService(deps.MerchProvider),
|
||||
}
|
||||
|
||||
taskTransportV1.RegisterTaskProcessorServer(srv, handler)
|
||||
|
||||
return handler
|
||||
}
|
||||
|
||||
func (h *Handler) Serve() error {
|
||||
listener, err := net.Listen("tcp", h.addr)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("%v Listner failure", pkgLogHeader)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = h.srv.Serve(listener); err != nil {
|
||||
log.WithError(err).Errorf("%v Serve failure", pkgLogHeader)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("%v gRPC server started on: %v", pkgLogHeader, h.addr)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) Shutdown() {
|
||||
h.srv.GracefulStop()
|
||||
log.Infof("%v gRPC server shutdown", pkgLogHeader)
|
||||
}
|
||||
44
internal/task/service.go
Normal file
44
internal/task/service.go
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
"merch-api/internal/common"
|
||||
tt "merch-api/pkg/taskTransport/v1"
|
||||
)
|
||||
|
||||
type service struct {
|
||||
tt.UnimplementedTaskProcessorServer
|
||||
merchProvider common.MerchProvider
|
||||
}
|
||||
|
||||
func newService(mp common.MerchProvider) *service {
|
||||
return &service{
|
||||
merchProvider: mp,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) RequestTasks(_ *emptypb.Empty, stream tt.TaskProcessor_RequestTasksServer) error {
|
||||
tasks, err := s.merchProvider.GetTasks(stream.Context())
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("%v Failed to get tasks", pkgLogHeader)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, task := range tasks {
|
||||
if err = stream.Send(&tt.Task{
|
||||
MerchUuid: task.MerchUuid,
|
||||
Origins: task.Origins,
|
||||
}); err != nil {
|
||||
log.WithError(err).Errorf("%v Failed to send tasks", pkgLogHeader)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) SendResults(stream tt.TaskProcessor_SendResultsServer) error {
|
||||
//TODO
|
||||
return nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue