diff --git a/api.env b/api.env index 8622ace..843383b 100644 --- a/api.env +++ b/api.env @@ -5,6 +5,9 @@ APP_API_PREFIX=/api/v2 APP_GIN_MODE=development APP_ALLOWED_ORIGINS=http://localhost:5173, +GRPC_SERVER_PORT=9050 +GRPC_CLIENT_PORT=9060 + DB_HOST= DB_PORT= DB_USER= diff --git a/cmd/main.go b/cmd/main.go index 16f80b1..f16c4e3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -8,6 +8,7 @@ import ( "merch-parser-api/internal/api/merch" "merch-parser-api/internal/api/user" "merch-parser-api/internal/app" + "merch-parser-api/internal/grpcService" "merch-parser-api/internal/interfaces" "merch-parser-api/internal/provider/auth" "merch-parser-api/internal/provider/token" @@ -66,6 +67,10 @@ func main() { }) log.Debug("Auth provider initialized") + tasksRepo := merch.NewTaskRepository(database) + tasksProvider := merch.NewTaskProvider(tasksRepo) + grpcServer := grpcService.NewGrpcServer(tasksProvider) + //register app modules users := user.NewHandler(user.Deps{ Auth: authProvider, @@ -87,11 +92,14 @@ func main() { //keep last appl := app.NewApp(app.Deps{ - Host: c.AppConf.Host, - Port: c.AppConf.Port, - ApiPrefix: c.AppConf.ApiPrefix, - RouterHandler: routerHandler, - Modules: modules, + Host: c.AppConf.Host, + Port: c.AppConf.Port, + ApiPrefix: c.AppConf.ApiPrefix, + RouterHandler: routerHandler, + Modules: modules, + GrpcServer: grpcServer, + GrpcServerPort: c.GrpcConf.GrpcServerPort, + GrpcClientPort: c.GrpcConf.GrpcClientPort, }) err = appl.Run(ctx) diff --git a/config/config.go b/config/config.go index 304e051..2646db5 100644 --- a/config/config.go +++ b/config/config.go @@ -3,9 +3,10 @@ package config import "strings" type Config struct { - AppConf AppConfig - DBConf DatabaseConfig - JWTConf JWTConfig + AppConf AppConfig + DBConf DatabaseConfig + JWTConf JWTConfig + GrpcConf GrpcConfig } type AppConfig struct { @@ -34,6 +35,11 @@ type JWTConfig struct { RefreshExpire string } +type GrpcConfig struct { + GrpcServerPort string + GrpcClientPort string +} + func NewConfig() *Config { return &Config{ AppConf: AppConfig{ @@ -61,5 +67,10 @@ func NewConfig() *Config { AccessExpire: getEnv("JWT_ACCESS_EXPIRE", ""), RefreshExpire: getEnv("JWT_REFRESH_EXPIRE", ""), }, + + GrpcConf: GrpcConfig{ + GrpcServerPort: getEnv("GRPC_SERVER_PORT", ""), + GrpcClientPort: getEnv("GRPC_CLIENT_PORT", ""), + }, } } diff --git a/internal/api/merch/provider.go b/internal/api/merch/provider.go new file mode 100644 index 0000000..f856fbd --- /dev/null +++ b/internal/api/merch/provider.go @@ -0,0 +1,93 @@ +package merch + +import ( + "gorm.io/gorm" + "merch-parser-api/internal/shared" +) + +type Link struct { + Surugaya []Surugaya + Mandarake []Mandarake +} + +type TaskProvider struct { + repo TaskRepository +} + +type TaskRepository interface { + GetLinks() (*Link, error) + InsertPrices() error +} + +type TaskRepo struct { + db *gorm.DB +} + +func NewTaskProvider(repo TaskRepository) *TaskProvider { + return &TaskProvider{ + repo: repo, + } +} + +func NewTaskRepository(db *gorm.DB) TaskRepository { + return &TaskRepo{db: db} +} + +func (p *TaskProvider) PrepareTasks() (map[string]shared.Task, error) { + getLinks, err := p.repo.GetLinks() + if err != nil { + return nil, err + } + + taskMap := make(map[string]shared.Task) + + for _, item := range getLinks.Surugaya { + if task, exists := taskMap[item.MerchUuid]; exists { + task.OriginSurugayaLink = item.Link + taskMap[item.MerchUuid] = task + } else { + taskMap[item.MerchUuid] = shared.Task{ + MerchUuid: item.MerchUuid, + OriginSurugayaLink: item.Link, + } + } + } + + for _, item := range getLinks.Mandarake { + if task, exists := taskMap[item.MerchUuid]; exists { + task.OriginMandarakeLink = item.Link + taskMap[item.MerchUuid] = task + } else { + taskMap[item.MerchUuid] = shared.Task{ + MerchUuid: item.MerchUuid, + OriginMandarakeLink: item.Link, + } + } + } + return taskMap, nil +} + +func (p *TaskProvider) InsertPrices([]shared.TaskResult) error { + return nil +} + +func (r *TaskRepo) GetLinks() (*Link, error) { + var surugayaList []Surugaya + if err := r.db.Model(&Surugaya{}).Find(&surugayaList).Error; err != nil { + return nil, err + } + + var mandarakeList []Mandarake + if err := r.db.Model(&Mandarake{}).Find(&mandarakeList).Error; err != nil { + return nil, err + } + + return &Link{ + Surugaya: surugayaList, + Mandarake: mandarakeList, + }, nil +} + +func (r *TaskRepo) InsertPrices() error { + return nil +} diff --git a/internal/app/handler.go b/internal/app/handler.go index 09de7e0..0164c13 100644 --- a/internal/app/handler.go +++ b/internal/app/handler.go @@ -4,33 +4,46 @@ import ( "context" "github.com/gin-gonic/gin" log "github.com/sirupsen/logrus" + "google.golang.org/grpc" "merch-parser-api/internal/interfaces" + "net" "net/http" "time" ) type App struct { - address string - apiPrefix string - modules []interfaces.Module - routerHandler interfaces.Router - router *gin.Engine + host string + address string + apiPrefix string + modules []interfaces.Module + routerHandler interfaces.Router + router *gin.Engine + grpcServer *grpc.Server + grpcServerPort string + grpcClientPort string } type Deps struct { - Host string - Port string - ApiPrefix string - Modules []interfaces.Module - RouterHandler interfaces.Router + Host string + Port string + ApiPrefix string + Modules []interfaces.Module + RouterHandler interfaces.Router + GrpcServer *grpc.Server + GrpcServerPort string + GrpcClientPort string } func NewApp(deps Deps) *App { app := &App{ - address: deps.Host + ":" + deps.Port, - apiPrefix: deps.ApiPrefix, - routerHandler: deps.RouterHandler, - modules: deps.Modules, + host: deps.Host, + address: deps.Host + ":" + deps.Port, + apiPrefix: deps.ApiPrefix, + routerHandler: deps.RouterHandler, + modules: deps.Modules, + grpcServer: deps.GrpcServer, + grpcServerPort: deps.GrpcServerPort, + grpcClientPort: deps.GrpcClientPort, } app.router = app.routerHandler.Set() @@ -62,6 +75,19 @@ func (a *App) Run(ctx context.Context) error { serverErr <- server.ListenAndServe() }() + go func() { + listener, err := net.Listen("tcp", net.JoinHostPort(a.host, a.grpcServerPort)) + if err != nil { + log.WithField("err", err).Fatal("gRPC Server | Listener") + } + + err = a.grpcServer.Serve(listener) + if err != nil { + log.WithField("err", err).Fatal("gRPC Server | Serve") + } + }() + log.Info("Starting gRPC server on port: ", a.grpcServerPort) + select { case <-ctx.Done(): log.Info("Shutting down server") diff --git a/internal/grpcService/handler.go b/internal/grpcService/handler.go new file mode 100644 index 0000000..6af20cf --- /dev/null +++ b/internal/grpcService/handler.go @@ -0,0 +1,106 @@ +package grpcService + +import ( + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + "io" + "merch-parser-api/internal/interfaces" + "merch-parser-api/internal/shared" + pb "merch-parser-api/proto/taskProcessor" + "time" +) + +type repoServer struct { + pb.UnimplementedTaskProcessorServer + taskProvider interfaces.TaskProvider +} + +func NewGrpcServer(taskProvider interfaces.TaskProvider) *grpc.Server { + srv := grpc.NewServer() + repoSrv := &repoServer{ + taskProvider: taskProvider, + } + + pb.RegisterTaskProcessorServer(srv, repoSrv) + return srv +} + +func (r *repoServer) RequestTask(_ *emptypb.Empty, stream pb.TaskProcessor_RequestTaskServer) error { + tasks, err := r.taskProvider.PrepareTasks() + if err != nil { + log.WithField("err", err).Error("gRPC Server | Request task error") + return err + } + + for _, task := range tasks { + if err = stream.Send(&pb.Task{ + MerchUuid: task.MerchUuid, + OriginSurugayaLink: task.OriginSurugayaLink, + OriginMandarakeLink: task.OriginMandarakeLink, + }); err != nil { + log.WithField("err", err).Error("gRPC Server | Stream send error") + return err + } + } + return nil +} + +func (r *repoServer) SendResult(stream pb.TaskProcessor_SendResultServer) error { + saveInterval := time.Second * 2 + batch := make([]shared.TaskResult, 0) + + ticker := time.NewTicker(saveInterval) + defer ticker.Stop() + + done := make(chan struct{}) + + go func() { + for { + select { + case <-done: + return + case <-ticker.C: + if len(batch) > 0 { + err := r.taskProvider.InsertPrices(batch) + if err != nil { + log.WithField("err", err).Error("gRPC Server | Batch insert") + } + } + } + } + }() + + for { + response, err := stream.Recv() + if err == io.EOF { + log.Debug("gRPC EOF") + break + } + + if err != nil { + log.WithField("err", err).Error("gRPC Server | Receive") + return err + } + + entry := shared.TaskResult{ + MerchUuid: response.MerchUuid, + Origin: response.OriginName, + Price: response.Price, + } + + batch = append(batch, entry) + log.WithField("response", entry).Debug("gRPC Server | Receive success") + } + + close(done) + if len(batch) > 0 { + err := r.taskProvider.InsertPrices(batch) + if err != nil { + log.WithField("err", err).Error("gRPC Server | Last data batch insert") + return err + } + } + + return nil +} diff --git a/internal/interfaces/task.go b/internal/interfaces/task.go new file mode 100644 index 0000000..dd8e299 --- /dev/null +++ b/internal/interfaces/task.go @@ -0,0 +1,8 @@ +package interfaces + +import "merch-parser-api/internal/shared" + +type TaskProvider interface { + PrepareTasks() (map[string]shared.Task, error) + InsertPrices([]shared.TaskResult) error +} diff --git a/internal/router/handler.go b/internal/router/handler.go index 70e882a..328ab93 100644 --- a/internal/router/handler.go +++ b/internal/router/handler.go @@ -7,17 +7,15 @@ import ( swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" "merch-parser-api/internal/interfaces" - "merch-parser-api/internal/shared" "net/http" "time" ) type router struct { - apiPrefix string - engine *gin.Engine - ginMode string - excludeRoutes map[string]shared.ExcludeRoute - tokenProv interfaces.JWTProvider + apiPrefix string + engine *gin.Engine + ginMode string + tokenProv interfaces.JWTProvider } type Deps struct { diff --git a/internal/shared/task.go b/internal/shared/task.go new file mode 100644 index 0000000..119babd --- /dev/null +++ b/internal/shared/task.go @@ -0,0 +1,13 @@ +package shared + +type Task struct { + MerchUuid string + OriginSurugayaLink string + OriginMandarakeLink string +} + +type TaskResult struct { + MerchUuid string + Origin string + Price uint32 +}