From e48160dfa396613dd2711863535434fb1d001961 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 26 Dec 2025 16:19:09 +0300 Subject: [PATCH] mandarake parser rework --- config.env | 7 +- config/config.go | 43 ++- internal/app/app.go | 10 +- internal/parsers/mandarake.go | 378 ---------------------- internal/parsers/mandarake/handleTasks.go | 60 ++++ internal/parsers/mandarake/handler.go | 40 +++ internal/parsers/mandarake/service.go | 90 ++++++ internal/processor/service.go | 5 +- 8 files changed, 243 insertions(+), 390 deletions(-) delete mode 100644 internal/parsers/mandarake.go create mode 100644 internal/parsers/mandarake/handleTasks.go create mode 100644 internal/parsers/mandarake/handler.go create mode 100644 internal/parsers/mandarake/service.go diff --git a/config.env b/config.env index cf0a214..3937a61 100644 --- a/config.env +++ b/config.env @@ -1,6 +1,7 @@ APP_LOG_LEVEL=error APP_NUMCPUS=-1 APP_CHECK_PERIOD=6 +EXTERNAL_BROWSER= GRPC_SERVER_HOST=0.0.0.0 GRPC_SERVER_PORT=9060 @@ -12,4 +13,8 @@ GRPC_SURUGAYA_SCRAPPER_HOST=0.0.0.0 GRPC_SURUGAYA_SCRAPPER_PORT=9070 TASK_RETRY_COUNT=3 -TASK_RETRY_MINUTES=5 \ No newline at end of file +TASK_RETRY_MINUTES=5 + +ORIGIN_SURUGAYA_ENABLED=false +ORIGIN_MANDARAKE_ENABLED=false +ORIGIN_AMIAMI_ENABLED=false diff --git a/config/config.go b/config/config.go index 12b7a76..8b5babe 100644 --- a/config/config.go +++ b/config/config.go @@ -7,11 +7,13 @@ import ( ) type Config struct { - LogLevel string - NumCPUs int - CheckPeriod int - TasksCfg TasksConfig - GrpcCfg GrpcConfig + LogLevel string + NumCPUs int + CheckPeriod int + TasksCfg TasksConfig + GrpcCfg GrpcConfig + OriginEnabled OriginEnabled + ExternalBrowser string } type GrpcConfig struct { @@ -28,11 +30,18 @@ type TasksConfig struct { RetryMinutes int } +type OriginEnabled struct { + Surugaya bool + Mandarake bool + Amiami bool +} + func NewConfig() *Config { return &Config{ - LogLevel: getEnv("APP_LOG_LEVEL", "debug"), - NumCPUs: getEnvInt("APP_NUMCPUS", -1), - CheckPeriod: getEnvInt("APP_CHECK_PERIOD", 6), + LogLevel: getEnv("APP_LOG_LEVEL", "debug"), + NumCPUs: getEnvInt("APP_NUMCPUS", -1), + CheckPeriod: getEnvInt("APP_CHECK_PERIOD", 6), + ExternalBrowser: getEnv("EXTERNAL_BROWSER", ""), GrpcCfg: GrpcConfig{ ServerHost: getEnv("GRPC_SERVER_HOST", "0.0.0.0"), @@ -47,6 +56,12 @@ func NewConfig() *Config { RetryCount: getEnvInt("TASK_RETRY_COUNT", 3), RetryMinutes: getEnvInt("TASK_RETRY_MINUTES", 5), }, + + OriginEnabled: OriginEnabled{ + Surugaya: getEnvBool("ORIGIN_SURUGAYA_ENABLED", false), + Mandarake: getEnvBool("ORIGIN_MANDARAKE_ENABLED", false), + Amiami: getEnvBool("ORIGIN_AMIAMI_ENABLED", false), + }, } } @@ -68,3 +83,15 @@ func getEnvInt(key string, fallback int) int { } return fallback } + +func getEnvBool(key string, fallback bool) bool { + if value, ok := os.LookupEnv(key); ok { + val, err := strconv.ParseBool(value) + if err != nil { + log.WithField("default", false).Warn("Config | Can't parse value as bool") + return fallback + } + return val + } + return fallback +} diff --git a/internal/app/app.go b/internal/app/app.go index d886e71..2bd5f0f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -11,6 +11,7 @@ import ( "task-processor/config" "task-processor/internal/appState" "task-processor/internal/parsers" + "task-processor/internal/parsers/mandarake" "task-processor/internal/processor" "task-processor/internal/remote" "task-processor/internal/shared" @@ -71,8 +72,13 @@ func (app *App) Run() { //task processor handlers := map[string]parsers.TaskHandler{ - shared.OriginSurugaya: parsers.NewSurugayaParser(ctx, surugayaScrapper), - shared.OriginMandarake: parsers.NewMandarakeParser(app.numCPUs), + shared.OriginSurugaya: parsers.NewSurugayaParser(ctx, surugayaScrapper), + + shared.OriginMandarake: mandarake.NewParser(mandarake.ParserDeps{ + Enabled: app.config.OriginEnabled.Mandarake, + ExternalBrowser: app.config.ExternalBrowser, + GoroutinesNumber: app.numCPUs, + }), } taskProcessor := processor.New(processor.Deps{ diff --git a/internal/parsers/mandarake.go b/internal/parsers/mandarake.go deleted file mode 100644 index 219e032..0000000 --- a/internal/parsers/mandarake.go +++ /dev/null @@ -1,378 +0,0 @@ -package parsers - -import ( - "context" - "fmt" - "github.com/chromedp/cdproto/network" - "github.com/chromedp/chromedp" - log "github.com/sirupsen/logrus" - "golang.org/x/net/html" - "net/http" - "net/http/cookiejar" - "net/url" - "regexp" - "slices" - "strconv" - "strings" - "sync" - "task-processor/internal/appState" - "task-processor/internal/shared" - "time" -) - -type MandarakeParser struct { - goroutinesNumber int - parseParams parseParams - client *http.Client -} - -type parseParams struct { - userAgent string - cookieUrl string - single price - ranged price - taxMult float64 -} - -type price struct { - tag string - attrKey string - attrVal string - subTag string - substring string -} - -func NewMandarakeParser(goroutinesNumber int) *MandarakeParser { - p := parseParams{ - userAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - cookieUrl: "https://www.mandarake.co.jp/", - single: price{ - tag: "div", - attrKey: "class", - attrVal: "price", - subTag: "p", - substring: "円", - }, - ranged: price{ - tag: "div", - attrKey: "class", - attrVal: "price_range", - subTag: "p", - substring: "円"}, - taxMult: 1.1, - } - - return &MandarakeParser{ - goroutinesNumber: goroutinesNumber, - parseParams: p, - } -} - -func (s *MandarakeParser) HandleTasks(tasks []shared.Task, sender chan shared.TaskResult, state *appState.State) { - log.Debug("Handling Mandarake tasks") - - if err := s.initClient2(); err != nil { - log.WithError(err).Error("Mandarake handler | Error initializing client") - return - } - - receiver := make(chan shared.Task, len(tasks)) - for _, task := range tasks { - receiver <- task - } - close(receiver) - - wg := sync.WaitGroup{} - for i := 0; i < s.goroutinesNumber; i++ { - wg.Add(1) - go func() { - defer wg.Done() - s.worker(receiver, sender, state) - }() - } - wg.Wait() - log.Debug("Finished handling Mandarake tasks") -} - -func (s *MandarakeParser) worker(receiver chan shared.Task, sender chan shared.TaskResult, state *appState.State) { - for task := range receiver { - log.WithField("task id", task.MerchUuid).Debug("Mandarake worker | Processing task") - - page, err := s.getPage(task.Link) - if err != nil { - log.WithError(err).Error("Mandarake worker | Error getting page for task") - continue - } - - if page == nil { - log.Debug("Mandarake worker | Page for task is nil") - continue - } - - p := int32(s.getMinPrice(page)) - - sender <- shared.TaskResult{ - MerchUuid: task.MerchUuid, - Origin: task.Origin, - Price: p, - } - } -} - -// Deprecated: use initClient2 instead. -func (s *MandarakeParser) initClient() error { - //preload cookies for client - req, err := http.NewRequest("GET", s.parseParams.cookieUrl, nil) - if err != nil { - return err - } - - //TODO сделать один клиент с одним джаром - client := http.Client{} - req.Header.Set("User-Agent", s.parseParams.userAgent) - - result, err := client.Do(req) - if err != nil { - return err - } - defer result.Body.Close() - - c := result.Cookies() - log.WithField("cookies", c).Debug("Mandarake handler | Get cookies") - - //make client - jar, err := cookiejar.New(nil) - if err != nil { - log.WithError(err).Error("Mandarake | Cookie jar") - return err - } - - u, err := url.Parse(s.parseParams.cookieUrl) - if err != nil { - log.WithError(err).Error("Mandarake | Parse cookie URL") - return err - } - - jar.SetCookies(u, c) - - taskClient := &http.Client{ - Timeout: time.Second * 30, - Jar: jar, - } - - s.client = taskClient - return nil -} - -func (s *MandarakeParser) getPage(url string) (*html.Node, error) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err - } - req.Header.Set("User-Agent", s.parseParams.userAgent) - - result, err := s.client.Do(req) - if err != nil { - return nil, err - } - - doc, err := html.Parse(result.Body) - if err != nil { - return nil, err - } - - return doc, nil -} - -func (s *MandarakeParser) getMinPrice(page *html.Node) int { - singlePriceNode := s.findNode(page, s.parseParams.single) - if singlePriceNode == nil { - return 0 - } - singlePriceStr := s.findData(singlePriceNode, s.parseParams.single) - if singlePriceStr == nil { - return 0 - } - - var prices []int - prices = append(prices, s.getPrice(singlePriceStr)) - - priceRangeNode := s.findNode(page, s.parseParams.ranged) - if priceRangeNode != nil { - priceFromRange := s.findData(priceRangeNode, s.parseParams.ranged) - if priceFromRange != nil { - withTax := int(float64(s.getPrice(priceFromRange)) * s.parseParams.taxMult) - prices = append(prices, withTax) - } - } - return slices.Min(prices) -} - -func (s *MandarakeParser) findNode(doc *html.Node, params price) *html.Node { - if doc == nil { - return nil - } - - var ( - crawler func(*html.Node) - result *html.Node - ) - - crawler = func(node *html.Node) { - if result != nil { - return - } - - if node.Type == html.ElementNode && node.Data == params.tag { - for _, attr := range node.Attr { - if attr.Key == params.attrKey && attr.Val == params.attrVal { - result = node - return - } - } - } - for child := node.FirstChild; child != nil; child = child.NextSibling { - crawler(child) - } - } - crawler(doc) - - return result -} - -func (s *MandarakeParser) findData(doc *html.Node, params price) []string { - if doc == nil { - return nil - } - - var ( - crawler func(*html.Node) - values []string - getText func(*html.Node) string - ) - - getText = func(n *html.Node) string { - if n.Type == html.TextNode { - return n.Data - } - var result strings.Builder - for c := n.FirstChild; c != nil; c = c.NextSibling { - result.WriteString(getText(c)) - } - return result.String() - } - - crawler = func(node *html.Node) { - if node.Type == html.ElementNode && node.Data == params.subTag { - text := strings.TrimSpace(getText(node)) - if strings.Contains(text, params.substring) { - values = append(values, text) - } - } - for child := node.FirstChild; child != nil; child = child.NextSibling { - crawler(child) - } - } - crawler(doc) - return values -} - -func (s *MandarakeParser) getPrice(rawStr []string) int { - re := regexp.MustCompile(`\([^)]*?([0-9,]+)[^)]*?\)`) - - for _, str := range rawStr { - matches := re.FindStringSubmatch(str) - if len(matches) > 1 { - priceStr := strings.ReplaceAll(matches[1], ",", "") - price, err := strconv.Atoi(priceStr) - if err != nil { - fmt.Println(err) - return 0 - } - return price - } - } - return 0 -} - -//new client - -func (s *MandarakeParser) initClient2() error { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - allocCtx, allocCancel := chromedp.NewRemoteAllocator(ctx, "ws://headless-shell:9222") - defer allocCancel() - - ctx, _ = chromedp.NewContext(allocCtx) - - if err := chromedp.Run(ctx, - chromedp.Navigate(s.parseParams.cookieUrl), - chromedp.WaitVisible("body", chromedp.ByQuery), - ); err != nil { - return fmt.Errorf("failed to navigate: %w", err) - } - - var cookies []*network.Cookie - err := chromedp.Run(ctx, chromedp.ActionFunc(func(ctx context.Context) error { - var err error - cookies, err = network.GetCookies().Do(ctx) - return err - })) - if err != nil { - return fmt.Errorf("failed to get cookies: %w", err) - } - - var httpCookies []*http.Cookie - for _, c := range cookies { - httpCookies = append(httpCookies, &http.Cookie{ - Name: c.Name, - Value: c.Value, - Path: c.Path, - Domain: c.Domain, - Expires: float64ToTime(c.Expires), - Secure: c.Secure, - HttpOnly: c.HTTPOnly, - SameSite: convertSameSite(c.SameSite), - }) - } - - jar, err := cookiejar.New(nil) - if err != nil { - return err - } - - u, err := url.Parse(s.parseParams.cookieUrl) - if err != nil { - return err - } - - jar.SetCookies(u, httpCookies) - - client := &http.Client{ - Jar: jar, - Timeout: 30 * time.Second, - } - - s.client = client - return nil -} - -func convertSameSite(s network.CookieSameSite) http.SameSite { - switch s { - case network.CookieSameSiteStrict: - return http.SameSiteStrictMode - case network.CookieSameSiteLax: - return http.SameSiteLaxMode - case network.CookieSameSiteNone: - return http.SameSiteNoneMode - default: - return http.SameSiteDefaultMode - } -} - -func float64ToTime(unixFloat float64) time.Time { - sec := int64(unixFloat) - nsec := int64((unixFloat - float64(sec)) * 1e9) - return time.Unix(sec, nsec) -} diff --git a/internal/parsers/mandarake/handleTasks.go b/internal/parsers/mandarake/handleTasks.go new file mode 100644 index 0000000..c31a54c --- /dev/null +++ b/internal/parsers/mandarake/handleTasks.go @@ -0,0 +1,60 @@ +package mandarake + +import ( + "context" + "github.com/chromedp/chromedp" + log "github.com/sirupsen/logrus" + "sync" + "task-processor/internal/appState" + "task-processor/internal/shared" +) + +func (s *Parser) HandleTasks(tasks []shared.Task, sender chan shared.TaskResult, state *appState.State) { + log.Debug(logHeader + logWorker + "handling tasks") + + allocCtx, allocCancel := chromedp.NewRemoteAllocator(s.baseCtx, s.externalBrowser) + + receiver := make(chan shared.Task, len(tasks)) + for _, task := range tasks { + receiver <- task + } + close(receiver) + + wg := sync.WaitGroup{} + for i := 0; i < s.goroutinesNumber; i++ { + wg.Add(1) + go func() { + defer wg.Done() + s.worker(allocCtx, receiver, sender, state) + }() + } + wg.Wait() + allocCancel() + log.Debug(logHeader + logWorker + "finished handling tasks") +} + +func (s *Parser) worker(ctx context.Context, receiver chan shared.Task, sender chan shared.TaskResult, state *appState.State) { + pageCtx, pageCancel := chromedp.NewContext(ctx, chromedp.WithLogf(log.Printf)) + defer pageCancel() + + for task := range receiver { + log.WithField("task_uuid", task.MerchUuid).Debug(logHeader + logWorker + "processing task") + + price, err := s.getPrice(pageCtx, task) + if err != nil { + log.WithField("task_uuid", task.MerchUuid).Warn(logHeader + logWorker + logTaskWarning + "failed to process, zero price") + sender <- shared.TaskResult{ + MerchUuid: task.MerchUuid, + Origin: task.Origin, + Price: zeroPrice, + } + continue + } + + sender <- shared.TaskResult{ + MerchUuid: task.MerchUuid, + Origin: task.Origin, + Price: price, + } + } +} diff --git a/internal/parsers/mandarake/handler.go b/internal/parsers/mandarake/handler.go new file mode 100644 index 0000000..92e24dd --- /dev/null +++ b/internal/parsers/mandarake/handler.go @@ -0,0 +1,40 @@ +package mandarake + +import ( + "context" + log "github.com/sirupsen/logrus" +) + +const ( + zeroPrice int32 = 0 + taxMultiplier float64 = 1.1 + logHeader = "Mandarake parser | " + logWorker = "worker: " + logTaskWarning = "task warning: " + logGetPrice = "get price: " +) + +type Parser struct { + baseCtx context.Context + externalBrowser string + goroutinesNumber int +} + +type ParserDeps struct { + Enabled bool + ExternalBrowser string + GoroutinesNumber int +} + +func NewParser(deps ParserDeps) *Parser { + if !deps.Enabled { + log.Info(logHeader + "disabled") + return nil + } + + return &Parser{ + baseCtx: context.Background(), + externalBrowser: deps.ExternalBrowser, + goroutinesNumber: deps.GoroutinesNumber, + } +} diff --git a/internal/parsers/mandarake/service.go b/internal/parsers/mandarake/service.go new file mode 100644 index 0000000..f3e191a --- /dev/null +++ b/internal/parsers/mandarake/service.go @@ -0,0 +1,90 @@ +package mandarake + +import ( + "context" + "github.com/chromedp/chromedp" + log "github.com/sirupsen/logrus" + "regexp" + "slices" + "strconv" + "strings" + "task-processor/internal/shared" +) + +func (s *Parser) getPrice(ctx context.Context, task shared.Task) (int32, error) { + var ( + singlePrice string + rangedPrice string + prices []int32 + ) + + //get single price + if err := chromedp.Run(ctx, + chromedp.Navigate(task.Link), + chromedp.WaitReady("body"), + chromedp.WaitVisible(`div.price`, chromedp.ByQuery), + chromedp.Text(`div.price`, &singlePrice, chromedp.ByQuery), + ); err != nil { + log.WithError(err).Error(logHeader + logGetPrice + "failed to get single price tag") + return zeroPrice, err + } + singlePrice = strings.TrimSpace(singlePrice) + prices = append(prices, s.getSinglePriceWithTax(singlePrice)) + + //get price range + if err := chromedp.Run(ctx, + chromedp.Navigate(task.Link), + chromedp.WaitReady("body"), + chromedp.WaitVisible(`price_range`, chromedp.ByQuery), + chromedp.Text(`price_range`, &rangedPrice, chromedp.ByQuery), + ); err != nil { + log.WithError(err).Warn(logHeader + logGetPrice + "failed to get ranged price tag") + } + + rangedPrice = strings.TrimSpace(rangedPrice) + + if rangedPrice != "" { + prices = append(prices, s.getMinimalPriceFromRangeWithTax(rangedPrice)) + } + + //get minimal price + minimal := slices.Min(prices) + log.Infof(logHeader+"uuid: %s, price: %d", task.MerchUuid, minimal) + + return minimal, nil +} + +func (s *Parser) getSinglePriceWithTax(rawPrice string) int32 { + re := regexp.MustCompile(`(\d+)\s*円`) + matches := re.FindStringSubmatch(rawPrice) + if len(matches) < 2 { + log.Error("Mandarake | Single price not found, returning zero price") + return zeroPrice + } + + priceStr := matches[1] + price, err := strconv.Atoi(priceStr) + if err != nil { + log.Error("Mandarake | Failed to convert single price, returning zero price") + return zeroPrice + } + return int32(price) +} + +func (s *Parser) getMinimalPriceFromRangeWithTax(priceRange string) int32 { + re := regexp.MustCompile(`他([\d,]+)円`) + matches := re.FindStringSubmatch(priceRange) + if len(matches) < 2 { + log.Error("Price not found in range, returning zero price") + return zeroPrice + } + + priceStr := strings.ReplaceAll(matches[1], ",", "") + price, err := strconv.Atoi(priceStr) + if err != nil { + log.Error("Failed to convert minimal price in range, returning zero price") + return zeroPrice + } + + return int32(float64(price) * taxMultiplier) +} diff --git a/internal/processor/service.go b/internal/processor/service.go index ca2827c..f2b2d82 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -26,7 +26,10 @@ func (p *Processor) StartWork(receivedTasks []shared.TaskResponse) { wg.Add(1) go func(origin string, tasks []shared.Task) { defer wg.Done() - p.handlers[origin].HandleTasks(tasks, p.out, p.state) + if p.handlers[origin] != nil { + log.Info("Running task handler for origin: ", origin) + p.handlers[origin].HandleTasks(tasks, p.out, p.state) + } }(origin, tasks) } wg.Wait()