Franta – Občasník malého ajťáka

Domény, Hosting, Cestování

Go & Crawler – update

Ja a Crawler, to je taky dlouha story 😀 Prvni verzi svyho crawlera jsem mel nekdy v letech kolem roku 2012 v kombinaci PHP+MySQL .. o par mesicu pozdeji pak PHP+Cassandra … az nakonec jsem dosel k zaveru ze se na cely Crawler vykaslu a budu analyzovat CommonCrawl data … viz Crawler ? Zatim ne z roku 2019 🙂 Tam je hodne zajimavych dat za hubicku, ale pri jejich celkem narocnem zpracovani nakonec clovek zjisti, ze tam je stejne jen zlomek internetu a neni tam vse co potrebuje v pouzitelny forme jak potrebuje. Proto jsem presto zkusil opet vyvinout vlastni clawler, tentokrat Go + ElasticSearch – Go & Crawler z roku 2020.

Cas postupne plyne, ja nabiram zkusenosti z jinych projektu a tak nejak z mnoha duvodu jsem se dostal opet k vyvoji Crawlera. V zakladu vychazim porad z te puvodni Go verze … jen je samozrejme vyrazne updatovana – navic sbiram nejenom odkazy, ale i RSS zdroje, scripts, meta tagy, title, chybovky (4xx, 5xx, timeout, atd) a obrazky. U obrazku navic rovnou stahuji hlavicky pro zjisteni, jestli obrazek obsahuje Geo informace v EXIFu, a pokud ano, pak stahuji rovnou cely obrazek pro ktery delam nahledovy. Samozrejmosti je, ze rovnou ukladam bokem domeny u kterych koncim na chybe DNS prekladu, tedy domeny bud nefunkcni, nebo volne 🙂

Suma sumarum beznym crawlerem jsem schopny za jeden den najit pres 2 miliardy linku, a stovky milionu ostatnich dat. A to je problem!

Doposud jsem pouzival reseni, ze jsem z Elastiku nacetl hromadu odkazu, ktere jsem pak prochazel, a po jejich projiti updatnul zaznamy o done=1. Abych se vyhnul konkurenci mezi crawler nodama, tak jich je v provozu presne 16, protoze pro URL adresy pouzivam hashe – tedy prvni server si vybiral odkazy s hashem zacinajicim na 0, posledni server s hashem zacinajicim na f. To reseni sice bylo funkcni ale z hlediska optimalizace naprosto nevhodny. V situaci kdy do ES zapisovalo nekolik nodu update done=1 do indexu obsahujicim miliardu zaznamu, tak zbyvajici nebyly schopni v radech minut ziskat novy data pro crawlovani.

Zabredl jsem se tedy vice do studia jak toto resit – a spousta veci me porad smerovala do nejakych messaging (MQ) systemu, se kterymi nemam absolutne zadnou zkusenost a schopnost studovat uplne novy veci je pro me uz ta tam … teda ne ze by to neslo, jen to trva dlouho a cas na to neni. Nicmene s aktualnim fenomenem ChatGPT od OpenAI se s vecma pekne hnulo – nejen ze je schopny mi problematiku vysvetlit polopaticky, ale jsem schopny s nim na to tema i diskutovat a rovnou resit nejaky priklady integrace pri ktere clovek pochopi jak to cele funguje.

Potrebuji tedy robustni system, ktery dokaze zpracovat tisice a tisice prichozich zprav za sekundu, ulozit je a distribuovat jednotlivym klientum tak, aby kazdy klient dostal pouze jednu unikatni URL. Nesmi se tedy stat, ze by jedna URL se objevila u vicero klientu. Po ruznych debatach co pouzit a jak nastavit se ukazalo, ze toto splnuje Apache Kafka, kterou lze navic skrze Docker velmi jednoduse provozovat a ma plnou podporu pro GO jazyk. Pro moji smulu ma ale Kafka absenci jedne vlastnosti, ktere mi to cele komplikuje. Potrebuji totiz podporu unikatnich klicu. Kdyz nacrawluji stovky stranek, budu mit samozrejme v odkazech hromadu duplicit a tech se potrebuju zbavit abych je duplicitne neprochazel pri dalsim crawlovani. Bohuzel Kafka nesleduje unikatnost klice, jen prijima a posila zpravy, takze unikatnost je potreba resit nekde jinde. V puvodni verzi jsem k tomu vyuzival u Elastiku to, ze jsem jako id zpravy v indexu pouzil hash te URL. Tim jsem docilil, ze pri prichodu noveho odkazu se stejnou URL se ukladal zaznam do jiz existujci zpravy, a tedy se vlastne nic nestalo. Crawler tak tuto URL znovu nedostal. Rozhodl jsem se tedy Kafce predradit Redis, do ktereho si ukladam jen hashe URL a overuji tak jejich unikatnost pred samostatnym poslanim do Kafky.

Cele testovaci prostredi jsem si nainstaloval pres Docker, coz je proste genialni nastroj pro tyhle ucely:

version: '3.3'

services:

  zookeeper:
    image: 'zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: 'bitnami/kafka:latest'
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'
    environment:
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CFG_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_CFG_LOG_RETENTION_HOURS: 72
      KAFKA_CFG_MESSAGE_MAX_BYTES: 10485760
      ALLOW_PLAINTEXT_LISTENER: "yes"

  kafka-manager:
    image: 'provectuslabs/kafka-ui:latest'
    depends_on:
      - kafka
      - zookeeper
    ports:
      - "9000:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: "local"
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka:9092"
      KAFKA_CLUSTERS_0_ZOOKEEPER: "zookeeper:2181"

  redis:
    image: redis:latest
    ports:
      - "6379:6379"

volumes:
  esdata:
    driver: local

V docker-compose.yaml mam obycejnou instalaci Kafka + Zookeper, Kafka-Manager (UI ktery docela doporucuji pro prehled co se v Kafce deje) a Redis. Je to testovaci instance, takze nejaky hlubsi zabezpeceni neresim.

Integrace producera do Go byla jednoducha:

package main

import (
	"bufio"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"log"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/redis/go-redis/v9"
	"os"
	"golang.org/x/net/context"
)

func hashURL(url string) string {
	hasher := sha256.New()
	hasher.Write([]byte(url))
	return hex.EncodeToString(hasher.Sum(nil))
}

func main() {
	topic := "links" // Název topicu

	// Otevření vstupního souboru
	file, err := os.Open("domains.txt")
	if err != nil {
		panic(err)
	}
	defer file.Close()

	// Nastavení producenta Kafky
	p, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"queue.buffering.max.kbytes": 102400000,
		"queue.buffering.max.messages": 10000000,
	})
	if err != nil {
		panic(err)
	}
	defer p.Close()

	// Nastavení Redis klienta
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	defer rdb.Close()

	// Kontext pro Redis
	ctx := context.Background()

	// Čtení souboru po řádcích
	scanner := bufio.NewScanner(file)
	count := 0
	for scanner.Scan() {
		domain := scanner.Text()
		url := "https://" + domain // Přidání schématu https:// před doménu
		hashedURL := hashURL(url)

                count++
                if count%100000 == 0 {
                        log.Printf("Sent %d messages (%s | %s)\n", count, hashedURL, domain)
			p.Purge(kafka.PurgeQueue)
                }

		exists, err := rdb.SetNX(ctx, hashedURL, 1, 0).Result()
		if err != nil {
			fmt.Printf("Failed to check URL in Redis: %s - %v\n", domain, err)
			continue
		}
		if exists {

                	message := &kafka.Message{
                        	TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                        	Key:            []byte(url),  // URL jako klíč
                        	Value:          []byte("1"),  // Hodnota 1
                	}
                	err = p.Produce(message, nil)

		} else {
			// URL již existuje, přeskočit
			continue
		}

	}

	// Čekání na odeslání všech zpráv
	p.Flush(15 * 1000)

	if err := scanner.Err(); err != nil {
		fmt.Println("Chyba při čtení souboru:", err)
	}
}

Tento program nacte domains.txt, a ulozi je skrze overeni pres Redis do Kafky … jednoduche/ucinne.

Pro zajimavost:

Ulozeni do Kafky bez Redisu: 200.000 domains / s
Ulozeni do Kafky s Redisem: 35.000 domains / s
Ulozeni do Kafky s Cassandrou (misto Redisu) bez overeni unikatnosti: 800 domains / s
Ulozeni do Kafky s Cassandrou (misto Redisu) s overenim unikatnosti: 700 domains / s

Na druhe strane pak mame neomezene mnozstvi consumeru, ktere mohou vypadat takto:

package main

import (
	"fmt"
	"log"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	broker := "localhost:9092"
	topic := "links"

	// Vytvoření Consumer klienta
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		"group.id":          "myGroup",
		"auto.offset.reset": "latest",
	})
	if err != nil {
		panic(err)
	}
	defer c.Close()

	// Předplatné topicu
	err = c.Subscribe(topic, nil)
	if err != nil {
		panic(err)
	}

	count := 0;

	// Neustálé čtení zpráv
	for {
		msg, err := c.ReadMessage(-1) // Blokující čekání na novou zprávu

		if err != nil {
			// Výpis chyb, které nejsou zprávy
			fmt.Printf("Consumer error: %v\n", err)
			continue
		}
		// fmt.Printf("Received message: %s\n", string(msg.Key))
                count++
                if count%100000 == 0 {
                        log.Printf("Received %d messages (%s)\n", count, msg.Key)
                }
	}
}

A to je zatim k update vse 🙂

One Comment

Napsat komentář

Vaše e-mailová adresa nebude zveřejněna. Vyžadované informace jsou označeny *

Tato stránka používá Akismet k omezení spamu. Podívejte se, jak vaše data z komentářů zpracováváme..