Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
temirovazat committed Oct 16, 2023
0 parents commit f62d43e
Show file tree
Hide file tree
Showing 48 changed files with 2,272 additions and 0 deletions.
152 changes: 152 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
name: kafka-to-clickhouse

on:
push:
branches: [ main ]

env:
KAFKA_PACKAGE: "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1"
CLICKHOUSE_PACKAGE: "com.github.housepower:clickhouse-native-jdbc-shaded:2.6.5"

jobs:
tests:
name: Tests
runs-on: ubuntu-latest

strategy:
matrix:
python-version: ['3.8', '3.9', '3.10']

services:
kafka:
image: confluentinc/cp-kafka:7.3.1
ports:
- 29092:29092
env:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
options: >-
--health-cmd "nc -z localhost 9092 || exit -1"
--health-interval 5s
--health-timeout 10s
--health-retries 10
clickhouse:
image: clickhouse/clickhouse-server:22.12
ports:
- 8123:8123
- 9000:9000
options: >-
--health-cmd "wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1"
--health-interval 5s
--health-timeout 10s
--health-retries 10
zookeeper:
image: zookeeper:3.8
options: >-
--health-cmd "nc -z localhost 2181 || exit -1"
--health-interval 5s
--health-timeout 10s
--health-retries 10
steps:
- name: Check out the repo
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Set up Java
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: 8
- name: Install dependencies
run: |
pip install --upgrade pip
pip install -r backend/fastapi_consumer/requirements.txt --no-cache-dir
pip install -r backend/pyspark_producer/requirements.txt --no-cache-dir
- name: Lint with flake8
run: |
pip install wemake-python-styleguide flake8-html
flake8 backend --format=html --htmldir=flake8/
- name: Lint with mypy
run: |
pip install mypy lxml
mypy backend --html-report=mypy/
- name: Run API
run: |
cd backend/fastapi_consumer/src
nohup python main.py &
- name: Stop API
run: |
kill $(ps aux | grep python | grep main.py | awk '{print $2}')
- name: Run ETL
run: |
cd backend/pyspark_producer/src
nohup python main.py &
env:
PYSPARK_PYTHON: "${{env.Python3_ROOT_DIR}}"
PYSPARK_DRIVER_PYTHON: "${{env.Python3_ROOT_DIR}}"
PYSPARK_SUBMIT_ARGS: "--packages ${{env.KAFKA_PACKAGE}},${{env.CLICKHOUSE_PACKAGE}} pyspark-shell"
- name: Stop ETL
run: |
kill $(ps aux | grep python | grep main.py | awk '{print $2}')
- name: Output results
uses: actions/upload-artifact@v3
with:
name: Report
path: |
flake8/
mypy/
docker:
name: Docker
runs-on: ubuntu-latest
needs: tests
steps:
- name: Check out the repo
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to Docker
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Push Backend (FastAPI Consumer) to Docker Hub
uses: docker/build-push-action@v2
with:
push: true
context: backend/fastapi_consumer
tags: |
${{ secrets.DOCKER_USERNAME }}/event_sourcing:${{ vars.TAG }}
${{ secrets.DOCKER_USERNAME }}/event_sourcing:latest
- name: Push Backend (PySpark Producer) to Docker Hub
uses: docker/build-push-action@v2
with:
push: true
context: backend/pyspark_producer
tags: |
${{ secrets.DOCKER_USERNAME }}/kafka_to_clickhouse:${{ vars.TAG }}
${{ secrets.DOCKER_USERNAME }}/kafka_to_clickhouse:latest
send_message:
name: Send message
runs-on: ubuntu-latest
needs: docker
steps:
- name: Send message
uses: appleboy/telegram-action@master
with:
to: ${{ secrets.TELEGRAM_TO }}
token: ${{ secrets.TELEGRAM_TOKEN }}
message: |
В репозитории ${{ github.repository }} выполнен коммит:
Автор: ${{ github.event.commits[0].author.name }}
Сообщение: ${{ github.event.commits[0].message }}
Ссылка: https://github.com/${{ github.repository }}/commit/${{github.sha}}
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.vscode
.DS_Store
.python-version
__pycache__
.env
main.log
.pytest_cache
.mypy_cache
nohup.out
flake8
mypy
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
## Kafka to Clickhouse

[![python](https://img.shields.io/static/v1?label=python&message=3.8%20|%203.9%20|%203.10&color=informational)](https://github.com/temirovazat/kafka-to-clickhouse/actions/workflows/main.yml)
[![dockerfile](https://img.shields.io/static/v1?label=dockerfile&message=published&color=2CB3E8)](https://hub.docker.com/r/temirovazat/kafka_to_clickhouse)
[![lint](https://img.shields.io/static/v1?label=lint&message=flake8%20|%20mypy&color=brightgreen)](https://github.com/temirovazat/kafka-to-clickhouse/actions/workflows/main.yml)
[![code style](https://img.shields.io/static/v1?label=code%20style&message=WPS&color=orange)](https://wemake-python-styleguide.readthedocs.io/en/latest/)
[![platform](https://img.shields.io/static/v1?label=platform&message=linux%20|%20macos&color=inactive)](https://github.com/temirovazat/kafka-to-clickhouse/actions/workflows/main.yml)

### **Description**

_The aim of this project is to implement an ETL system for analysts that stores data about movie views. Since the service needs to handle the constant influx of information from each user, it uses the event streaming platform [Kafka](https://kafka.apache.org). To provide an API layer that sends events to Kafka without any transformations underneath, it leverages the [FastAPI](https://fastapi.tiangolo.com) framework. The ETL process for loading data into the analytical data store is implemented using the batch and stream data processing library [PySpark](https://spark.apache.org). The storage must handle very large data and do so within a reasonable time frame for analysts to conduct their research. Therefore, the project involved research to choose the right storage solution, and the best choice was the analytical OLAP system [ClickHouse](https://clickhouse.com)._

### **Technologies**

```Python``` ```Kafka``` ```FastAPI``` ```PySpark``` ```Clickhouse``` ```Vertica``` ```Jupyter Notebook``` ```Docker```

### **How to Run the Project:**

Clone the repository and navigate to the `infra` directory:
```
git clone https://github.com/temirovazat/kafka-to-clickhouse.git
```
```
cd kafka-to-clickhouse/infra/
```

Create a .env file and add project settings:
```
nano .env
```
```
# Kafka
KAFKA_HOST=kafka
KAFKA_PORT=9092
# Clickhouse
CLICKHOUSE_HOST=clickhouse-node1
CLICKHOUSE_PORT=9000
```

Deploy and run the project in containers:
```
docker-compose up
```

Send a POST request with the current movie view frame:
```
http://127.0.0.1/films/<UUID>/video_progress
```
```
{
"frame": <INTEGER>
}
```
18 changes: 18 additions & 0 deletions backend/fastapi_consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.10

WORKDIR /opt/event_sourcing

COPY requirements.txt requirements.txt

RUN pip install --upgrade pip \
&& pip install -r requirements.txt --no-cache-dir

COPY ./src .

EXPOSE 8000

COPY script.sh /

RUN chmod +x /script.sh

ENTRYPOINT ["/script.sh"]
6 changes: 6 additions & 0 deletions backend/fastapi_consumer/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fastapi==0.88.0
uvicorn==0.20.0
gunicorn==20.1.0
orjson==3.8.4
aiokafka==0.8.0
PyJWT==2.6.0
12 changes: 12 additions & 0 deletions backend/fastapi_consumer/script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
kafka_ready() {
(echo > /dev/tcp/${KAFKA_HOST:-localhost}/${KAFKA_PORT:-29092}) >/dev/null 2>&1
}

until kafka_ready; do
>&2 echo 'Waiting for Kafka to become available...'
sleep 1
done
>&2 echo 'Kafka is available.'

gunicorn main:app --bind 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker
62 changes: 62 additions & 0 deletions backend/fastapi_consumer/src/api/v1/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging
from http import HTTPStatus
from typing import Dict, Optional
from uuid import UUID, uuid4

import jwt
from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jwt.exceptions import ExpiredSignatureError

from core.config import CONFIG

security = HTTPBearer(auto_error=not CONFIG.fastapi.debug)


class Auth:
"""Class for user authentication."""

def __init__(self, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)):
"""Initialize the class with a request header containing a JWT token.
Args:
credentials: HTTP authorization header with a token
"""
if credentials:
self.token = credentials.credentials
else:
self.token = jwt.encode({'user_id': str(uuid4())}, key=CONFIG.fastapi.secret_key, algorithm='HS256')

@property
def user_id(self) -> UUID:
"""Property with the user ID from the token claims.
Raises:
HTTPException: Identification error
Returns:
UUID: Unique user identifier
"""
claims = self.decode_token()
if not (user_id := claims.get('user_id')):
logging.critical('Problem with user identification: No user ID in the token!')
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
return user_id

def decode_token(self) -> Dict:
"""Decode a JWT token.
Raises:
HTTPException: Authorization error
Returns:
Dict: Token content
"""
try:
payload = jwt.decode(jwt=self.token, key=CONFIG.fastapi.secret_key, algorithms=['HS256'])
except ExpiredSignatureError:
raise HTTPException(status_code=HTTPStatus.UNAUTHORIZED)
except Exception as exc:
logging.error(f'Problem with user authorization: {exc}!')
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
return payload
33 changes: 33 additions & 0 deletions backend/fastapi_consumer/src/api/v1/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from functools import lru_cache

from aiokafka import AIOKafkaProducer
from fastapi import Depends, Path

from api.v1.auth import Auth
from services.post_event import PostEventService
from core.enums import KafkaTopics
from db.kafka import get_kafka
from models.key import UserFilmID


@lru_cache()
def get_film_event(
auth: Auth = Depends(),
film_id: str = Path(title='Фильм ID'),
kafka: AIOKafkaProducer = Depends(get_kafka),
):
"""Provide a function to send events related to a movie.
Args:
auth: User authentication
film_id: Movie ID
kafka: Kafka connection
Returns:
PostEventService: Service for publishing the user's current movie position
"""
return PostEventService(
kafka=kafka,
topic=KafkaTopics.video_progress.name,
key=UserFilmID(user_id=auth.user_id, film_id=film_id),
)
33 changes: 33 additions & 0 deletions backend/fastapi_consumer/src/api/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from http import HTTPStatus

from fastapi import APIRouter, Depends, HTTPException, Response

from api.v1.events import get_film_event
from services.post_event import PostEventService
from models.value import VideoProgress

router = APIRouter()


@router.post(
'/films/{film_id}/video_progress',
summary='Send current movie viewing time',
description='Save the time in seconds where the user left off during the last movie viewing.',
tags=['films'])
async def send_film_progress(video_progress: VideoProgress, film_event: PostEventService = Depends(get_film_event)):
"""Handle publishing the current movie viewing time to Kafka.
Args:
video_progress: A data label for video viewing
film_event: The service for publishing the event
Raises:
HTTPException: Error 400 if the Kafka server is unavailable
Returns:
Response: An HTTP response with a status code of 200
"""
ok = await film_event.post(video_progress)
if not ok:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
return Response(status_code=HTTPStatus.CREATED)
Empty file.
Loading

0 comments on commit f62d43e

Please sign in to comment.