End-to-end real-time data pipeline:
Web Scraping (producer.py)
β
Kafka
β
Spark Cleaner (spark_cleaner.py)
β
cleaned_text / rejected_text (Kafka topics)
β
InfluxDB Consumer (influx_consumer.py)
β
InfluxDB
β
Grafana β http://localhost:3000
java -versionExpected: java version "11.x" or "17.x"
python -m pip install kafka-python pyspark influxdb-client requests beautifulsoup4- Download from: https://kafka.apache.org/downloads
- Extract to
C:\kafka - Verify:
C:\kafka\bin\windows\folder exists
- Download from: https://www.docker.com/products/docker-desktop
- Install and reboot if prompted
- Press
Winβ search Docker Desktop β open it - Wait for the π³ whale icon to appear in the system tray (bottom-right)
- Hover over it β wait until it says "Engine running"
β οΈ Docker Desktop must be fully running before Step 4.
cd C:\kafka
bin\windows\kafka-server-start.bat config\server.propertiesExpected log line:
[BrokerServer id=1] Starting broker
Keep this terminal open. Kafka must stay running.
cd C:\kafka
bin\windows\kafka-topics.bat --create --topic raw_text --bootstrap-server localhost:9092
bin\windows\kafka-topics.bat --create --topic cleaned_text --bootstrap-server localhost:9092
bin\windows\kafka-topics.bat --create --topic rejected_text --bootstrap-server localhost:9092Expected output for each:
Created topic raw_text.
If you see
Topic 'X' already existsβ that's fine, skip it.
cd C:\Codes\OpenSource\bda
docker-compose upWait until you see:
bda_grafana | logger=settings t=... msg="HTTP Server Listen" address=[::]:3000
Keep this terminal open. InfluxDB runs on
localhost:8086, Grafana onlocalhost:3001.
conda activate bda
cd C:\Codes\OpenSource\bda
python producer.pyExpected output (every 5 seconds):
Scraping: scrape_hacker_news β¦
Got 30 items from hacker_news
β Sent [hacker_news]: AI startup raises $500M...
Scraping: scrape_wikipedia_random β¦
β Sent [wikipedia]: The Roman Empire was...
Scraping: scrape_bbc_news β¦
β Sent [bbc_news]: UK economy grows 0.3%...
Scrapes live from: Hacker News, Wikipedia (random), BBC News
If you are already running spark_cleaner.py, use the default Spark output topics:
conda activate bda
cd C:\Codes\OpenSource\bda
python influx_consumer.pyIf you want to skip Spark and still feed Grafana, use the raw-text fallback mode:
conda activate bda
cd C:\Codes\OpenSource\bda
python influx_consumer.py --source raw_textExpected output:
Starting Kafka β InfluxDB bridge β¦
[VALID] topic=cleaned_text latency=0.021s rej_rate=12.0% | The Roman Empire...
[REJECTED] topic=rejected_text latency=0.018s rej_rate=14.0% | The Roman Empire...
Note:
--source raw_textlets the bridge clean/reject messages in Python and removes the hard Spark dependency for Grafana metrics.
Requires Java 11/17 + PySpark 4.x (already installed in
bdaconda env). One-time setup:winutils.exemust be present atC:\hadoop\bin\winutils.exe(needed by Spark on Windows).
# One-time: download winutils.exe if not already done
New-Item -ItemType Directory -Force -Path "C:\hadoop\bin"
Invoke-WebRequest -Uri "https://github.com/kontext-tech/winutils/raw/master/hadoop-3.4.0/bin/winutils.exe" -OutFile "C:\hadoop\bin\winutils.exe"
Unblock-File "C:\hadoop\bin\winutils.exe" # Unblock the downloaded file# Clear SPARK_HOME so conda PySpark 4.1.1 is used (not any standalone install)
set SPARK_HOME=
set HADOOP_HOME=C:/hadoop
set PYSPARK_PYTHON=C:\Users\varsh\anaconda3\envs\bda\python.exe
set PYSPARK_DRIVER_PYTHON=C:\Users\varsh\anaconda3\envs\bda\python.exe
conda activate bda
cd C:\Codes\OpenSource\bda
python spark_cleaner.pyNote: The Kafka connector JAR (
org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0) is declared insidespark_cleaner.pyviaspark.jars.packagesand will be auto-downloaded on first run (~10 MB). Internet access required. If you seeNativeIO$Windows.access0error βwinutils.exeis missing orHADOOP_HOMEis not set.
Without this,
cleaned_textandrejected_texttopics stay empty and Grafana will show no data.
- Open browser β
http://localhost:3001 - Login: admin / admin
- Go to: Dashboards β BDA β BDA Pipeline Monitor
- Dashboard auto-refreshes every 5 seconds
| Panel | Description |
|---|---|
| π Message Throughput | Valid vs Rejected messages over time |
| π΄ Rejection Rate | Rolling % of rejected messages (gauge) |
| β Total Valid | Cumulative valid message count |
| β Total Rejected | Cumulative rejected message count |
| β±οΈ Processing Latency | Time from producer β InfluxDB (seconds) |
| Terminal | Command | Keep Open? |
|---|---|---|
| 1 | kafka-server-start.bat |
β Yes |
| 2 | Create topics (3 commands) | β Close after |
| 3 | docker-compose up |
β Yes (Grafana β port 3001) |
| 4 | python producer.py |
β Yes |
| 5 | python influx_consumer.py |
β Yes |
| 6 | python spark_cleaner.py |
β Yes (optional) |
bda/
βββ producer.py # Scrapes web data β sends to Kafka raw_text
βββ spark_cleaner.py # Spark: cleans & routes to cleaned_text / rejected_text
βββ cleaning.py # Text cleaning utilities (used by Spark)
βββ influx_consumer.py # Kafka consumer β writes metrics to InfluxDB
βββ docker-compose.yml # Spins up InfluxDB + Grafana
βββ requirements.txt # Python dependencies
βββ grafana/
βββ provisioning/
β βββ datasources/influxdb.yml # Auto-configures InfluxDB datasource
β βββ dashboards/dashboard.yml # Dashboard loader config
βββ dashboards/
βββ bda_pipeline.json # Pre-built Grafana dashboard
| Error | Fix |
|---|---|
open //./pipe/dockerDesktopLinuxEngine |
Docker Desktop not running β open it first |
Topic 'X' already exists |
Not an error β topic already created, continue |
DEPRECATED: Log4j 1.x |
Harmless Kafka warning β ignore it |
| Grafana shows no data | Spark cleaner not running β start Terminal 6 |
influx_consumer exits immediately |
InfluxDB not ready yet β wait for Docker step |
# Check raw incoming data
cd C:\kafka
bin\windows\kafka-console-consumer.bat --topic raw_text --from-beginning --bootstrap-server localhost:9092
# Check cleaned data
bin\windows\kafka-console-consumer.bat --topic cleaned_text --bootstrap-server localhost:9092
# Check rejected data
bin\windows\kafka-console-consumer.bat --topic rejected_text --bootstrap-server localhost:9092