DataДжунгли🌳
286 subscribers
11 photos
8 videos
1 file
27 links
Data Engineering на пальцах:
🥸Про Airflow-DAG.
🍀ETL, с сарказмом.
🅾️оптимизируем SQL.
🐍Python трюки каждый пн.
Download Telegram
Пример 1️⃣ Spark Streaming – подсчёт слов в реальном времени с мини-батчами
Предположим что вы из потока данных хотите знать количество упоминаний каких-то конкретных слов, напрмиер, из постов в Х(твиттер) скажем про биткоин или эфириум чтобы принимать решение или строить аналитику.

Подключимся к источнику данных через сокетный поток. Spark будет считывать данные по TCP-протоколу на определённом порту:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# SparkContext и StreamingContext с интервалом в 5 секунд
sc = SparkContext("local[2]", "WordCountStreaming")
ssc = StreamingContext(sc, 5)

# gодключаемся к источнику (здесь TCP сокет)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" ")) -- конечно тут должен быть токенайзер, а не простой сплит.
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y) -- кстати отличный пример использование reduceByKey.
word_counts.pprint() # выводим результат в консоль ну или конечно вставляем в таблицу но для простоты примера просто.


Логируем - этот даже самый базовый пример легко масштабировать, расширяя сетки исключений.

import logging

# уровень логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("StreamingApp")

try:
ssc.start()
logger.info("Streaming started...")
ssc.awaitTermination()
except Exception as e:
logger.error("An error occurred in streaming: %s", e)
finally:
ssc.stop(stopSparkContext=True, stopGraceFully=True)
logger.info("Streaming stopped.")


Небольшая оптимизация тонкого места. Тонкое место у нас расчеты - и в нашем случае подсчет слов.
Допустим, наш поток генерирует слишком много мелких партиций. Мы можем уменьшить их число, используя coalesce:

optimized_word_counts = word_counts.coalesce(1)
optimized_word_counts.pprint()
Please open Telegram to view this post
VIEW IN TELEGRAM
3
Пример 2️⃣ Structured Streaming – обновление таблицы в реальном времени
Structured Streaming — это уже другой уровень, он позволяет обрабатывать данные как таблицу, обновляющуюся в реальном времени. Давай предположим, что у нас есть Kafka, откуда мы будем получать данные для стрима.

Настройка источника Kafka

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

# SparkSession для Structured Streaming
spark = SparkSession.builder.appName("StructuredWordCount").getOrCreate()

# подключаемся к Kafka
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "word_topic") \
.load()

# конвертируем байты Kafka в строки - да это всегда дополнительные упражнения 🙂
df = df.selectExpr("CAST(value AS STRING)")


Преобразуем данные и считаем слова


words = df.select(explode(split(df.value, " ")).alias("word")) # ну вы помните про токенайзер
word_counts = words.groupBy("word").count()

# консольный вывод (можно вывести в HDFS, Cassandra и другие модные слова)
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.option("checkpointLocation", "/path/to/checkpoint/dir") \
.start()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("StructuredStreamingApp")

try:
logger.info("Starting Structured Streaming...")
query.awaitTermination()
except Exception as e:
logger.error("An error occurred: %s", e)
finally:
spark.stop()
logger.info("Structured Streaming stopped.")



Watermarkи и триггеры

# watermark 2 минуты если позже проигнорируем
word_counts = words.withWatermark("timestamp", "2 minutes") \
.groupBy("word", window("timestamp", "10 minutes")) \
.count()


query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="10 seconds") \ # ставим не реал-тайм а раз в 10 секунд чтобы не насиловать систему все время а лишь раз в 10 секунд
.option("checkpointLocation", "/path/to/checkpoint/dir") \
.start()



Вот как то так друзья🍀

🆒И не забывайте: чем больше данных в реальном времени, тем больше внимания к логам, оптимизациям и обработке опоздавших данных!🔽

Friendly Reminder: У нас с вами есть чат, приходите туда задавайте вопросы, комментируйте посты. Это помогает моему каналу жить и нести дата Вести во все уголки интернета.
Вот ссылка: Диалоги DataДжунгли
Вы можете прийти туда с любым вопросом по данным, хранению, архитектуре, аналитике, дата инжинирингу, дата сайнсу, python, SQL, airflow, Spark или просто хотите помощи с тестовым - я всегда рад откликнуться вам на помощь.
Моя цель не просто писать посты и нести просветление а еще и создание комьюнити дата инженеров чтобы в последствии делать вместе проекты.

Отличного вам понедельника 🍷🟢
Please open Telegram to view this post
VIEW IN TELEGRAM
3👍2
🧠Spark MLlib 🤖🤖
Привет, дорогой подписчик! 👋
Сегодня я расскажу тебе про Apache Spark MLlib – библиотеку машинного обучения на базе Spark, которая помогает решать задачи аналитики и предсказаний на больших данных. 🔥

🧠 Что такое MLlib и почему она крутая? -- ну не круче Вас мои подписчики 🙂

1. Работать с огромными объемами данных, благодаря Spark'у и алгоритмы оптимизированы для работы в распределенной среде.
2. Параллельно обрабатывать данные, экономя время (и нервы). Обработка петабайтов данных – легко.
3. Выполнять машинное обучение без необходимости копировать данные из Spark в другие инструменты. Одни и те же данные и коды для ETL, аналитики и обучения моделей.

И да, она отлично вписывается в задачи реального времени, если ты используешь Structured Streaming!

🚀 Что можно попробовать самому с MLlib? -- Это простой пример очень коррелированных данных.

Начнем с линейной регрессии, чтобы предсказать, например, цену квартиры по площади и количеству комнат. 🏡

## Сначала берем спарк и данные
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

## Spark
spark = SparkSession.builder.appName("MLlibExample").getOrCreate()

data = spark.createDataFrame([
(50, 2, 300),
(60, 3, 400),
(80, 3, 500),
(100, 4, 700),
], ["square", "rooms", "price"])

## асемблируем -- если не знаете что это такое не страшно.
assembler = VectorAssembler(inputCols=["square", "rooms"], outputCol="features")
data = assembler.transform(data)

## Учим нашу простую модель

# Инициализация и обучение модели
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(data)

## Предсказываем цену

predictions = model.transform(data)
predictions.select("features", "price", "prediction").show()



Конечно мой пример детский, но если у вас есть хороший кусок данных об объектах недвижимости например Москвы. Где будет куча параметров. То вы можете настроить крутую модель предсказания, но посчитайте корреляцию сначала 🙂

🎯На что обратить внимание когда используете MlLib?

Перед обучением всегда нормализуй данные и удаляй выбросы. Используй StandardScaler или MinMaxScaler.
Экспериментируй с параметрами моделей с помощью CrossValidator.
Убедись, что данные сбалансированы между узлами кластера. Используй .repartition() или .coalesce().

💡 Какие модели можно учить?
В MLlib есть всё самое важное:
Классификация: логистическая регрессия, SVM, Random Forest.
Регрессия: линейная, Decision Tree.
Кластеризация: K-means.
Рекомендации: ALS (для рекомендательных систем).

Иными словами есть все плохо работающее и классическое, ведь мы знаем что нейронные сети давно переплюнули эти алгоритмы.
Но если делать что то по проще и по дешевле то MLlib прекрасно справится, например кластеризация довольно тяжелая штука и ее можно красиво использовать для аналитики крипто данных.

🤖 В общем и целом.
MLlib – это хороший инструмент для решения задач машинного обучения в распределенной среде. Не больше не меньше 💥
Если приложить немного фантазии и усилий можно родить крутой аналитический пайплайн который поможет решить задачу.
Или нет 🙂
Please open Telegram to view this post
VIEW IN TELEGRAM
👍2
Мы с Вами прошлись уже по Spark тематике, я постарался для вас собрать важную информацию, которая пригодится на собеседованиях или для общего понимания что это за технология и вот вписок всех тем:

1. Spark ML
2. Spark Streaming
3. SQL Spark
4. RDD
5. Cluster Manager
6. Spark Core
7. Apache Spark начало

Делитесь этим постом с дата-друзьями кто бы хотел ознакомится с технологией !
5👍1
#мыслиПередМонитором -- иногда меня посещают разные философские мысли о профессии и пройденном пути.
Решил с Вами поделится.

Стать высококвалифицированным дата-гонщиком — задача, знаешь ли, со звездочкой.
Иногда кажется, что технологии растут быстрее, чем ты успеваешь за ними.
Новый инструмент == новая концепция == новые челенджи.
Мой главный вывод: что настоящий смысл не в гонке за результатом, а в самом пути. Что то типо


print("no matter how skilled you are, the matter how much you can take(learn) and move forward")


Когда я начинал свой путь в дата-области, меня влекли блестящие перспективы: стать экспертом, решать сложные задачи, проектировать архитектуры, которые выдерживают нагрузки в миллионы событий в секунду, имплементировать модели ML, оптимизировать запросы SQL и так далее.
И чем больше росла экспертиза, тем больше казалось что я ничего не знаю на самом деле 🙂

Но спустя годы.
Те бесконечные вечера, проведенные за отладкой сложных ETL-пайплайнов, те моменты, когда искал утечку памяти в Spark или оптимизировал запросы изучая планы — это и есть школа.
Не конечный результат, не красиво сданный проект или масштабируемая архитектура, а сами эти «сражения», из которых и складывается ваше мастерство.
Путь к высококвалифицированности — это история маленьких побед. Конечно они не кажутся маленькими 🙂
Когда ты впервые разобрался с ДАГом в Airflow, а через пару лет уже проектируешь сложнейшие оркестрации с куб экзекьютором.
Когда не боишься признаться, что не знаешь чего-то, буквально каждый день чуствуешь себя - тупым. И это осознание тупости - это и есть основной драйвер для будущих побед 🙂

Результат важен, конечно.
Но если ты сместишь акцент лишь на результат - ты рискуешь пропустить саму суть профессии.
Это не просто работа - это
1. возможность научиться очень восстребованным навыкам, -- каждую минуту количество данных увеличивается по экспоненте.
2. встретить единомышленников, -- только в командах создаются самые лютые продукты.
3. поделиться знаниями и, возможно, вдохновить кого-то.

Если ты не можешь определиться, что изучать дальше, просто начни с первого "Hello, World".
Ретроспектируя, ты поймешь, что все эти небольшие "Hello World" сложились в увлекательное путешествие, в котором ты обрел больше знаний и особенно soft skills: Способность адаптироваться к изменениям, Решение проблем, Критическое мышление, Настойчивость и Тайм менеджмент.


К чему это я.
Путь важнее результата. В резульате ты починишь упавший пайплайн, ты оптимизируешь тяжелый запрос, ты построишь крутое хранилище и ты имплементируешь нейросеть в продукт.
Но все самое важное ты получаешь во время поиска решений, так как само решение не так важно, а вот сколько дерьма ты съел пока искал его - это стоит дорого.

Ставьте лайки если согласны 🙂
11👍2🔥2👏1💯1
👋Привет дорогой подписчик,
знакомьтесь это Эрик 🆒и он хочет стать дата инженером!
Мы очень хотим чтобы у него получилось!

Однако не за этим я вас хочу с ним познакомить 🙂

IT сферу качественно выделяет то что вы - будучи в айти - являетесь частью мощного комьюнити, в котором принято легко делится знаниями буквально безвозмездно.
Я сам вспоминаю когда я искал ответы на ХабрВопросах или на СтекОвере или сам создавал тикеты.
И не один мой вопрос не остался без ответа, всегда найдется кто то, кто захочет поедлится знаниями. Потом я и сам делился знаниями также - мне помогли и я помог 🙂 поэтому есть мой канал и поэтому есть еще не равнодушные такие как Эрик.

Сейчас он проходит путь самурая в дата сферу из фронтенда 🙂 Это не легкий путь 😀
Когда он только начинал изучать Data Engineering, он заметил, что не хватает удобного и структурированного ресурса, который мог бы помочь новичкам развиваться в этой области. Поэтому он решил создать сайт!

Основная функция сайта — агрегировать учебные и другие полезные справочные материалы.
Есть функция отображения вакансий, но она пока простая.
Вакансии активно публикуются на моём Telegram-канале — https://t.iss.one/data_engineer_jobs

В общем без лишних слов вот ссылка на его пост из канала в котором вы найдете ссылку на сайт и более подробную инфу!

Подписывайтесь на его канал, пользуйтесь классным ресурсом!

Эрик, тебе успеха в обучении, если есть вопросы приходи в наш чат Диалоги ДатаДжунгли тебе всегда помогут !
Please open Telegram to view this post
VIEW IN TELEGRAM
6👍3
😎Привет дорогой подписчик, давненько не общались!

Почему, спросите вы но уйдете так и не получив ответ...

Уверен что вы (как и я) в той или иной степени пользуетесь JupyterNotebook чтобы писать свой плохой код.
Я его использую в основном, чтобы набросать что то быстро и поглядеть где ошибка закралась или ответить на главный вопрос "почему это дерьмо не работает". 😡
Ответ почему оно не работет всегда в зеркале в 99.9% случаев.

⚙️И уверен когда вы запускали что то тяжелое в одной ячейке и оно долго молотило типо минут 10-15 или даже 20.
💡У вас мелькала в голове светлая мысль...
"А что если пока выполняется этот долгий процесс я бы мог запускать другой код в соседних ячейках 💡"

Штож вызов принят.

Нам известо что базово Jupyter не умеет такого потому что:
- нет такого функционала
- один поток никаких тебе параллелизьмов (написано верно)

Но программист это не тот кто умеет код писать - это тот кто готов потратить на автоматизацию 5-минутного дела 4 часа и никогда не тратить 5 минут более.
Как же заставить этот блокнот работать в параллель и чтобы я еще мог бы в других ячейках смотреть как это красиво выглядит.(Или не красиво)
Как вы успели догадаться речь пойдет про Threading и вы сегодня узнаете как это просто и легко запускать и не боятся сломать ваш компуктер.

Что будем делать ?
1 поток - будет в бесконечном цикле искать простые числа (простое это то которое делится на 1 и само на себя без остатка)
2 поток - будем искать четные числа также в бесконечном цикле
а в другой ячейке мы будем просто смотреть какое последнее число стало простым(prime) или четным(even)

Итак начнем

Ячейка 1

import threading -- для контроля потоков
import time -- для времени ведь его всегда так мало

# Тут будем хранить какое последнее число посчитали простым или четным
progress = {"prime": None, "even": None}


Ячейка 2
Поток 1 поиск простого числа.

stop_prime = threading.Event()

def prime_worker():
n = 2
while not stop_prime.is_set():
is_prime = all(n % i != 0 for i in range(2, int(n**0.5) + 1))
if is_prime:
progress["prime"] = n
n += 1
time.sleep(0.01)

prime_thread = threading.Thread(target=prime_worker, daemon=True)
prime_thread.start()

print("Поток prime_worker запущен")


Ячейка 3
Поток 2 поиск четного числа

stop_even = threading.Event()

def even_worker():
n = 0
while not stop_even.is_set():
if n % 2 == 0:
progress["even"] = n
n += 1
time.sleep(0.01)

even_thread = threading.Thread(target=even_worker, daemon=True)
even_thread.start()

print("Поток even_worker запущен")


Ячейка 4
Просто чтобы чекать как работает


progress



А как это остановить вы посмотрите в моем видео примере 😄
Иначе ваш CPU будет искать вечно эти числа 😂

Как вы успели заметить это вообще не сложно пользоваться потоками
Можете запустить абсолютно любую функцию python в отдельном потоке и не блокировать весь Notebook
Хороший пример когда вам надо взять особенно жирный кусок данных из базы который долго отрабатывает и не дожидаясь пока он вернется выполнять другие действия.

А как же понять что процесс уже закончился ? Расскажу в видео примере 💡

😮‍💨 Каждый, кто говорит «давай просто запустим это в потоке», не запускал ничего, кроме воздушного змея 🐍
Please open Telegram to view this post
VIEW IN TELEGRAM
👍9🔥2😎1
Media is too big
VIEW IN TELEGRAM
👍Как и обещал видео пример того как пользовать потоки в JupyterNotebook 😎

Пользуйтесь на здоровье 🐍
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥2
Голосуем.

Интересно ли видео о том, как разворачивать свой Airflow на проде? - Как выбрать сервер(конфигурацию) - Как сделать Docker образ - Как настроить docker-compose - Как настроить CI/CD из репозитория - Какие даги у вас должныть быть 100% всегда
Final Results
92%
😮 Конечно да!
3%
😒 Нет не надо я все умею
5%
💡 Кто такой Airflow?
🔥4
DataДжунгли🌳
Голосуем.

Интересно ли видео о том, как разворачивать свой Airflow на проде? - Как выбрать сервер(конфигурацию) - Как сделать Docker образ - Как настроить docker-compose - Как настроить CI/CD из репозитория - Какие даги у вас должныть быть 100% всегда
Еще добавлю

Это будет несколько видео эксклюзивно только для тех, кто подписан на мой ТГ-канал так что приводите друзей 💡
🔤Мы рассмотрим полный путь как из ничего развернуть настоящий боевой Airflow на удаленной машине и сколько нам это будет стоить.
🔤Мы рассмотрим основные проблемы про которые все забывают (а они копятся).
🔤Мы сделаем свой репозиторий и настроим правильно CI/CD для того чтобы можно было делать комиты не ломая при этом ваш Airflow.
🔤И конечно мы сделаем первый DAG.

Никакого локального использования только боевой режим.

Потом на его базе попробуем поиграть в настоящих 🔤🔤
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥141
Media is too big
VIEW IN TELEGRAM
Всем добрейшего вторничного вечера 👏🤘

Как и обещал это первое видео из серии.
"Airflow твари и где они обедают.."

В этом видосе вы узнаете:

- Как и какой выбрать сервер под ваш могучий Airflow.
- Где арендовать и сколько деняк он вам будет стоить.
- Как зайти на сервер по SSH и не вводить пароль каждый раз.
- Где лежит докер композ для Airflow и какую версию взять.

Я сделал для Вас общий репозиторий с airflow
https://github.com/da-eos/datajungle_airflow 🔫

Там будет все что на этом видео а также там будет появляться код остальных дагов и прочие шалости 🌿

В следующих видео будем настраивать airflow.cfg и запускать докер на сервере!
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥8👍31
Media is too big
VIEW IN TELEGRAM
Добрейшего утра четверга 😠😃

А мы с Вами продолжаем протаптывать путь к тому как делать свой Airflow на удаленном сервере.
Сегодня разбираем следующие важные шаги:

- Executor какой выбрать чем они отличаются - простыми словами.
- Добавляем в репозиторий Dockerfile + requirements.txt.
- Клонируем репозиторий на арендованный сервер.
- Делаем инициализацию базы данных для Arflow.
- Найдем где лежит airflow.cfg важнейший файл настроек вашего Airflow и добавим его в репозиторий.


Я понимаю, что многие вещи на видео делаю на автомате уже и вам возможно не ясно как использовать команды в терминале вот список того что я использую с описанием.


cd <имя папки> #чтобы перейти в папку в какую то.

cd .. #чтобы выйти на уровень выше из папки.

ls -la #это просмотр содержимого в папке в которой вы находитесь, выпадает список файлов.


Так же уверен что не совсем понятно как я делаю комиты. У молодежи уже не принятно комитить через терминал все пользуют свою IDE 🙂
Но как вы успели заметить на сервере когда вы зашли по SSH нет никакой IDE 🙂 И там придется комит делать по старинке 😆
Вот как это легко делается в 4 команды.


git switch -c <ИМЯ ВЕТКИ> -- это создает новую ветку с именем что вы придумали.

git add <ИМЯ ФАЙЛА ИЛИ ПАПКИ> -- это добавляет в вашу ветки файлы что вы изменили, можно через пробел добавить несколько.

git commit -m 'Например: Добавил конфиг' -- Это сам коммит и коментарий в котором вы пишите что вы там добавили, чтобы ваши коллеги понимали что вы там сделали 🙂

## Но ваши файлы еще не в репозитории это только коммит изменений. Теперь делаем пуш в репозиторий.

git push --set-upstream origin <ИМЯ ВЕТКИ> -- имя ветки что вы указывали в начале.


Все вот так легко и просто сделать коммит в основной репозиторий, потом через интерфейс github можно сделать мердж в главную ветку main.

Пользуйтесь! 😄🤗
Please open Telegram to view this post
VIEW IN TELEGRAM
7👍1
Media is too big
VIEW IN TELEGRAM
#Интрересная история про GIT.
Как связать локальный репозиторий с удаленным сервером и ничего не сломать 🧞

Git был создан Линусом Торвальдсом (создателем ядра Linux) в 2005 году.
Причина - конфликт с владельцами проприетарной системы контроля версий BitKeeper, которую использовали для разработки ядра Linux.
Линус захотел:

- Полностью распределённую систему (каждый разработчик имеет полную копию репозитория).
- Очень быструю работу.
- Защиту от подмены истории (Git использует SHA-1/2 хэши).
- Надёжное управление ветками и слияниями.

С тех пор Git стал де-факто стандартом в мире разработки ⚠️

- Локальный репозиторий - находится у тебя на компьютере.
- Удалённый репозиторий - размещён на сервере (например, GitHub, GitLab, Bitbucket) и используется для совместной работы.

Что хранится в репозитории:
- Файлы проекта (код, конфиги и т.д.)
- История изменений (git log)
- Ветки разработки (git branch)
- Метаданные Git (например, .git папка)
- Твои обнаженные фотки 🤫

В видео примере показал как на практике использовать один и тот же .git в разных местах 🔗
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6👍1
Media is too big
VIEW IN TELEGRAM
Что может быть лучше чем финальное видео про Airflow и настройку
`airflow.cfg`
пятничным вечером?

Да что угодно лучше этого 😆

Но нельзя просто так взять и не завершить базовую настройку Airflow для дальнейшей игры в дата инженеров(мамкиных).
`airflow.cfg`

Это буквально сердце Airflow в котором есть набор самых важных параметров необходимых для того чтобы он отдавал вам всю мощь.🌿
Давайте посмотрим на популярные настройки которыми нельзя принебречь:

ВСЕГДА используйте UTC. Если у вас возникнет вопрос почему, ответ простой: "потому". Сами напишите в комментариях ответ 😁


default_timezone = utc


executor - Из прошлого видео вы уже знаете какие бывают.

executor = LocalExecutor


Параллелизм - максимальное число запущенных одновременно задач во всех дагах(включая сами даги).
Макс таск пердаг - максимальное одновременное количество активных задач в даге.
Макс актив ранс пердаг - максиальное количество запущенных ранов 1 дага.

parallelism = 16

max_active_tasks_per_dag = 4

max_active_runs_per_dag = 8


Последние три настройки базируются исключительно на вашей железке и способе деплоя.
Красивый пример как считать как эти параметры настраивать:
parallelism = ваши cpu * 2 очень грубо, но это близко к правде. Как вы успели заметить на наших 4 cpu ни о каких 16ти речи идти не может. Но мы люди творческие че хотим то и делаем 😆

max_active_tasks_per_dag = тут просто сколько одновременно можно задач в даге делать. Это важно, чтобы один DAG не занял всё. У нас маленькая тачка 4 вполне оптимально. Да и сам даг подразумевает некую последовательность в выполнении тасок, но парарлелить их тоже можно 😠

max_active_runs_per_dag = ну 8 для нашей машины много, оставим потом 2-3. Суть в том что если один запуск еще не закончился а расписание подошло то таких случаев 8 к ряду можно сделать, ситуация очень редкая и такое вообще допускать плохая практика. Однако это роляет при каче исторических данных за большой период. Пока что для нас параметр бесполезный в целом, но его тоже важно уметь настраивать.

🥳 Что ж поздравляю вы дошли до финиша этого образотаельного трэка дальше будем упражняться это самое веселое 🤪
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥6👍31
#DEMeme

Вся правда в одной картинке 📣
Please open Telegram to view this post
VIEW IN TELEGRAM
😁5
#DEMeme

За каждым дашбордом свой проект «Манхеттен»
Мем смешной ситуация не простая 📃✂️🪨
Please open Telegram to view this post
VIEW IN TELEGRAM
😁3
#PyTrickMonday

dict.get() : маленькая таблетка от KeyError-ов

Вы уверен знаете что из словаря можно взять значение по ключу несколькими способами


payload = {"user": {"name": "Alice"}}

# Явное обращение по ключу
nickname = payload["user"]["nickname"] # KeyError, DAG падает

# Спокойный вариант c .get()
nickname = payload.get("user", {}).get("nickname", "Anon")

# "Если ты сигма" - способ (когда поле может отсутствовать целиком)
nickname = (payload.get("user") or {}).get("nickname", "Anon")


Так вот я вам предлагаю использовать .get() ВСЕГДА и у меня на это пять три причины:

1️⃣Безопасность: API вернул 200 OK, но ключа "user" нет - прямое обращение бросит KeyError и уронит скрипт. .get() вернёт None (или ваш дефолт) и программа продолжит работу.

2️⃣Меньше грязного кода: вместо громоздкого: (Плоское лучше вложенного ZenOfPython)

if "key" in resp:
value = resp["key"]

# одна строчка
value = resp.get("key")

3️⃣Гибкий дефолт: jobs.get("DA", "DE") # вернёт 'DE', если ключа 'DA' нет

💡 Если надо "нырнуть" глубоко и не облажаться по пути, так как .get есть только у словарей, можно дефлтом указывать пустой словарь и ваш код не упадет.
email = payload.get("user", {}).get("contacts", {}).get("email")

📣 Отправь этот полезный пост коллеге чтобы он/она не ловили KeyError и были тебе всегда благодарны 😎

#DETip #Python #DataJungle
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥11
#SQLWednesday
Как же на самом деле крутится "SELECT" 🤪

Как же "движок" базы данных обрабатывает ваш запрос в каком порядке.
Когда то на собеседовании на джуновскую позицию мне задали такой вопрос, может и вам пригодится. И кстати может вы избавитесь от ошибок в текущей работе ⚠️
Логический порядок такой:
FROM → JOIN → WHERE → GROUP BY → HAVING → WINDOW → SELECT → DISTINCT → ORDER BY → LIMIT/OFFSET

Давайте поближе поглядим что происходит на каждом этапе(Да не все запросы содержат ВСЕ операторы но порядок в любом случае такой):

1. FROM --Сначала SQL надо сказать откуда мы берем данные из какой таблицы.
2. JOIN --Логично если на первом месте таблицы то JOIN точно на втором.
3. WHERE --Фильтруем строки после соединения. Теперь понимаете почему нельзя использовать алиасы из SELECT, потому что он только 7 на очереди👀
4. GROUP BY --Лепим группы, считаем агрегаты.
5. HAVING --Фильтруем уже сведённые группы.
6. WINDOW --Считаем оконные функции (ROW_NUMBER, avg() over…). Они видят итог после WHERE, но до SELECT (уверен вы сейчас такие ЧЕ??)
7. SELECT --Выбираем финальные столбцы, присваиваем алиасы. Только сейчас ⚠️
8. DISTINCT --Убираем дубликаты.
9. ORDER BY --Сортируем. Уже видим алиасы из SELECT.
10. LIMIT/OFFSET --Отрезаем кусок результата.

❗️Ставь 🔥 если не знал что порядок именно такой 👀

Давайте примеры посмотрим:

-- Сколько авторов имеют > 3 статей
SELECT author_id,
COUNT(*) AS article_cnt
FROM articles
GROUP BY author_id
HAVING COUNT(*) > 3;

1. FROM - берём articles
2. GROUP BY - группируем по author_id
3. HAVING - фильтруем готовые группы (в WHERE так не выйдет - агрегатов ещё нет)
4. SELECT - выводим автора и число статей


-- Хотим все ордера + успешные доставки
SELECT o.id,
d.tracking_code
FROM orders o
LEFT JOIN deliveries d
ON d.order_id = o.id
AND d.status = 'shipped' -- фильтр СРАЗУ при соединении
WHERE o.created_at >= current_date - interval '30 days';

Фильтр в JOIN … ON ускоряет LEFT JOIN
Предикат status = 'shipped' выполняется до WHERE, поэтому пустые доставки всё ещё вернутся как NULL, но оптимизатор не тащит лишние статусы.
Это один из способов как можно ускорить ваш запрос вынося из WHERE в JOIN.


WITH active AS (
SELECT *
FROM sessions
WHERE started_at >= current_date - 7
)
SELECT user_id,
started_at,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY started_at) AS rn
FROM active
ORDER BY user_id, rn;

Оконка «видит» фильтр из WHERE, но идёт раньше SELECT
WHERE уже отрезал старые сессии.
ROW_NUMBER считается, потом идёт SELECT, где мы выводим rn.

Красиво не правда ли? 😃
Пересылайте коллегам и друзьям шпаргалку по SQL сохраняйте себе в телеграм 😮


Давайте в комментариях обсудим как улучшить этот запрос:


-- Ищем заказы c дорогими товарами и уже оплаченным счётом
SELECT o.id,
o.created_at,
c.total_amount,
c.status AS charge_status,
p.name AS product_name,
p.price
FROM orders o
JOIN order_items oi ON oi.order_id = o.id
JOIN products p ON p.id = oi.product_id
JOIN charges c ON c.order_id = o.id
WHERE p.price > 1000 -- фильтр по цене
AND c.status = 'paid' -- фильтр по чеку
AND o.created_at >= current_date - interval '90 days';

За лучший ответ получите статус "Порядочного" в чате DE Data Talks 🙂
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥4👍3
Media is too big
VIEW IN TELEGRAM
Ну что сегодня пятница а значит время продолжить большой разбор #Airflow 🚀

Сегодня в большом видео примере вы узнаете:

1. Как писать свой первый #DAG.
2. В чем разница между requests.get() и requests.Session().get() она есть и приличная.
3. Напишем на #python красивый класс для API #CoinMarketCap.
4. Запустим наш даг, получим ошибку и еще разок запустим 😎


Также напоминаю что весь код лежит тут в репозитории, он публичный и доступен для Вас.

Пересылайте друзьям и коллегам кто хочет научится писать свой первый даг но не знает с чего начать 📣

Любые вопросы в комментариях, всегда к вашим услугам.

В следующем видео сделаем #CICD и установим #Postgres на наш сервер чтобы там хранить данные не пропускайте! 🙄
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥7
#DEmeme

Сколько дашбордов не делай, а бизнесс все равно в эксель смотрит 😶
Please open Telegram to view this post
VIEW IN TELEGRAM
👍3
#DEMeme
Слово AI из каждого утюга, возможно даже сам утюг под собой скрывает нейросеть ⚙️

В этом меме много боли на самом деле 😄
Please open Telegram to view this post
VIEW IN TELEGRAM
💯5😁1