Канал Андрея про бекенд
1.68K subscribers
9 photos
13 links
👨🏻‍🎓 Я Андрей Суховицкий - tech lead и лектор Университета ИТМО, @sukhoa

🔥 Темы в канале: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.

Посты по средам

Присоединяйтесь!
Download Telegram
🔼 В посте выше мы обсудили, что для передачи данных между группами потоков удобно использовать потокобезопасную очередь, а для оптимизации дорогостоящего создания потоков стоит использовать "пул" потоков, которые можно переиспользовать в течение работы приложения.

👂 Нет необходимости реализовывать вышеописанные концепции вручную. Интерфейс ExecutorService и его реализации, например, ThreadPoolExecutor, берут эту работу на себя. Они уже поддерживают внутри себя очередь (по умолчанию LinkedBlockingQueue), пул предсозданных потоков, выставляют довольно простое API для отправки задач на выполнение и дают 100500 возможностей изменять свое поведение, как вам необходимо.

✍️ Как создать такой экзекутор? Воспользоваться статическими фабричными методами класса Executors. Самый простой из них принимает на вход только один параметр: необходимое количество потоков в пуле:

val threadsNum = 16 // number of threads-consumers
val executor = Executors.newFixedThreadPool(threadsNum)


🕶️ API экзекуторов дает вам возможность влиять на различные параментры и аспекты его поведения, например:

1. Количество потоков
2. Виды и размеры очередей
3. Стратегии поведения при переполнении очереди (интерфейс RejectedExecutionHandler): выбросить непоместившийся элемент / ждать, пока не появится место / любая кастомная стратегия: сохранене в базу данных, логирование и тд.
4. Стратегия создания потоков (интерфейс ThreadFactory): как будут именоваться потоки, будут ли они потоками демонами или нет и тд.

✍️ Как отправить задачу на выполнение в пул потоков? Достаточно вызвать метод submit и передать туда функцию (интерфейсы Runnable/Callable). Эта функция не будет выполнена мгновенно на текущем потоке, а будет помещена в очередь и выполнится на одном из потоков, поддерживаемых в пуле. Поток вызова в этот момент может продолжать выполняться дальше и жить своей жизнью.

executor.submit({ System.out.println(“я исполняюсь в потоке-консьюмере”) })


#kotlin #java #threads
🔥13👍32🤝2
Кейс с собеседования

⬇️ Ниже приведен фрагмент кода. Представьте, что этот код работает у вас на продакшене прямо сейчас.

Один поток Producer генерирует последовательность целых чисел. Несколько потоков Consumer получают эти числа из очереди и производят маппинг этих чисел на другие.

class Consumer(
private val codesQueue: BlockingQueue<Int>,
private val codeMappings: List<Int>
) {
@Volatile
var isActive: Boolean = true

fun consume() {
while (isActive || codesQueue.isNotEmpty()) {
val code = codesQueue.take()
log.info("Code $code mapped to ${codeMappings[code]}")
// do something with errorCodeMapping
}
}
}

class Producer(private val codesQueue: BlockingQueue<Int>) {
@Volatile
var isActive: Boolean = true

fun supply() {
while (isActive) {
codesQueue.put(Random().nextInt())
}
}
}


🦥 В какой-то момент вы замечаете, что маппинг не происходит: программа перестает писать в файл info-логи. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью ps -aux | grep java, он жив — то есть программа не завершилась.

👩🏻‍⚕️ Вы не знаете воспроизведется ли в следующий раз такая ситуация, поэтому не хотите останавливать процесс. Ваша задача придумать способ провести диагностику происходящего "на лету", здесь и сейчас. Что бы вы сделали?

🥟 Не спешите открывать ответ, подумайте, какие варианты вы бы могли предложить на собеседовании. Пишите ваши варианты в комментарии. Ответ, который я ожидал бы услышать: thread dump.Подробности в следующем посте.

💡 Попробуйте проанализировать код, попытайтесь найти обстоятельства, которые привели к симптомам, описанным выше. Какие изменения вы бы внесли в код для предотвращения подобных ситуаций? Пишите в комменты!

#java #kotlin #threads #case #interview
🔥11👍53🤝1
Thread dump и анализ кейса

📸 Дампом потоков называют снимок состояния всех текущих потоков JVM. Он очень полезен для диагностики проблем с производительностью, блокировками, утечками ресурсов. Дамп представляет из себя текстовый файл, где для каждого из существующих потоков java-процесса вы можете найти его состояние (RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED) и полный стек вызовов.

⬆️ Вспоминаем ситуацию, описанную в предыдущем посте:

Программа перестает писать info-логи в файл. В логах нет никаких ошибок. Вы проверяете ваш java-процесс с помощью ps -aux | grep java, - он жив, то есть программа не завершилась.

🔧 В этом случае для диагностики вы можете прибегнуть к thread dump. Примеры утилит, которые помогут вам сделать снимок потоков: jstack, jvisualvm, JMC (Java Mission Control), jcmd. Ниже пример команды для утилиты jstack, на вход необходимо передать pid java-процесса.

jstack <pid> > threaddump.txt


🧨 Ниже приведен фрагмент thread dump, представляющий интересующий нас поток-producer.

"Thread-0" #16 daemon prio=5 os_prio=31 cpu=0.97ms elapsed=3.79s tid=0x00007fd90d0ed800 nid=0x5f03 waiting on condition  [0x000070000a578000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java[email protected]/Native Method)
. . . . . . .
at java.util.concurrent.LinkedBlockingQueue.put(java[email protected]/LinkedBlockingQueue.java:343)
at ru.quipy.Producer.supply(Example.kt:30)
at ru.quipy.ExampleKt.main$lambda-4(Example.kt:48)
at ru.quipy.ExampleKt$$Lambda$32/0x0000000132014658.run(Unknown Source)
at java.lang.Thread.run(java[email protected]/Thread.java:840)


🧐 Как мы видим, поток находится в состоянии перманентного (не ограниченного временем) ожидания (java.lang.Thread.State: WAITING) в методе LinkedBlockingQueue.put. Если вы находитесь на собеседовании, то можете сказать, что лучше использовать аналогичный метод LinkedBlockingQueue.offer, который не блокируется навечно, а использует ограниченную временем блокировку, что даст нам возможность выйти из ожидания, понять, что операция не удалась и сделать об этом запись в лог. Аналогично и для метода LinkedBlockingQueue.take: вообще все блокирущие методы стоит ограничивать временем, тогда не возникнет ситуация с блокировкой потока при завершении программы, которую вы описали в комментах к предыдущему посту.

🔍 Очень подозрительно, но мы не можем найти дамп потока consumer из чего может следовать вывод, что такового в JVM нет. Вероятно, данный поток уже завершился. Это объясняет, почему он больше не делает записи в логи и почему поток supplier не может поместить ничего в очередь: она достигла максимального размера из-за отсутствия потока-потребителя.

🕵🏽 Поток завершился в следствие появления неожиданного исключения. Это могло произойти в следующей строке: log.info("Consumed $code. Mapped to ${codeMappings[code]}"), если элемента с индексом code в списке не существует.

🤯 А исключения мы могли не увидеть в логах по нескольким причинам. Например, вызывающий нас код мог поймать исключение, завершить поток и не записать об этом в лог. Или же был использован базовый обработчик исключения в потоке (interface UncaughtExceptionHandler), дефолтная версия которого (class ThreadGroup) просто делает запись в System.err, который мог не перенаправляться в лог.

🥇 Какие выводы мы можем сделать?

1. Следует оборачивать происходящее в потоке в глобальный обработчик исключений try-catch, логировать перехваченные исключения и поддерживать работу потока, чтобы приложение могло прогрессировать даже вопреки ошибочным ситуациям.

2. Следует явно ограничивать временем методы, которые блокируют поток исполнения или использовать их варианты/альтернативы с таймаутом!

#java #kotlin #threads #interview #case
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥18👍42🤝1
👨🏻‍🎓 Всем привет! Меня зовут Андрей Суховицкий. Я tech lead и лектор Университета ИТМО. На этом канале я рассказываю про backend. С предложениями тем, фидбеком пишите: @sukhoa

🔥 Основные темы: kotlin, java, coroutines, многопоточное программирование, system design, реализация высоконагруженных и надежных систем.

Вы можете проголосовать за канал
_______________________________

Мой Youtube-канал
Мой курс по event-sourcing. Промокод TG_TEAM на -30%. Сейчас доступен только тариф без ментора.
_______________

Навигация:
#java, #kotlin - посты, релевантные для разработчика на этих языках
#threads - многопоточка и другое полезное о потоках
#highload - все, связанное с реализацией высоконагруженных систем
#coroutines - связанное с kotlin-coroutines, их применением и реализацией
#queues - кейсы, связанные с различными видами очередей - in-memory, брокеры и тд
#case - описание реальных ситуаций и их траблшутинг
#interview - потенциальные вопросы на собеседовании, анализ, разбор
#metrics_basics - основы сбора, хранения и визуализации метрик, prometheus
🤝7🔥1
Давайте потокам осмысленные имена

🚨 Это архиважная вещь в промышленной разработке на java. Вы могли заметить, что необходимый нам поток из поста выше 👆🏼носит название Thread-0 . Я без труда нашел его в thread dump только потому, что программа была мала. Но даже ее дамп включал 21 поток, подавляющее большинство которых было служебными потоками JVM (в основном потоками сборщика мусора).

🔍 Крупные сервисы могут иметь сотни и даже тысячи потоков. Может быть очень сложно найти среди них именно те, что нужны вам. Но задача будет гораздо проще, если вы будете давать осмысленные имена потокам и пулам потоков, которые работают над одним типом задач.

🧵 Давайте начнем с одиночных потоков. Конечно, мы не часто создаем потоки через конструктор класса Thread, но полезно знать, что этот конструктор позволяет вам дать потоку имя.

val producerThread = Thread(null, { producer.supply() }, "producer-thread")


🏭Теперь давайте перейдем к пулам потоков. Статический метод Executors.newFixedThreadPool, через который удобно создавать пулы потоков, принимает на вход необходимый размер пула и еще один интересный параметр с типом ThreadFactory. У этого интерфейса всего один метод Thread newThread(Runnable task). Пул потоков делегирует переданному объекту ThreadFactory задачу создания потоков. Давайте реализуем свой вариант ThreadFactory, который именует потоки необходимым образом и передадим такой объект нашему пулу:

class NamedThreadFactory(private val prefix: String) : ThreadFactory {
private val sequence = AtomicInteger(1)

override fun newThread(r: Runnable): Thread {
val thread = Thread(r)
val seq = sequence.getAndIncrement()
thread.name = prefix + (if (seq > 1) "-$seq" else "")
return thread
}
}


🚲 На вход классу NamedThreadFactory передаем строку, которая будет являться префиксом имени всех потоков, созданных этой фабрикой. С помощью атомарного каунтера sequence мы сможем создавать уникальную часть имени для наших потоков🥇🥈🥉. Теперь давайте передадим эту фабрику пулу:

val httpCallsExecutor = Executors.newFixedThreadPool(16, NamedThreadFactory(“http-calls-pool”))


🎯 Вуаля, теперь в thread dump ваши потоки будут иметь осмысленные имена и вам не придется тратить уйму времени на изучение стека ненужных потоков!

#java #kotlin #threads
Please open Telegram to view this post
VIEW IN TELEGRAM
👍16🔥4🤝1
Использование Thread.sleep для тестирования асинхронных программ

📲 Тестируем сервис отправки push-уведомлений. Внешние процессы вызвают его, чтобы отправить push-уведомление c заданным текстом на мобильное устройство. Сервис возвращает идентификатор уведомления, по которому можно проверить его статус. Само уведомление может быть отправлено позже, для этого наш сервис совершает http-вызов к внешнему провайдеру.

interface NotificationService {
fun submitNotification(deviceId: UUID, text: String): UUID
}


Как будем тестировать?

👺 Если тестирование модульное, то, вероятно, мы хотим сделать mock для http-client-a, с помощью которого будет посылаться уведомление и проверить, что один из его методов был вызван. Аналогично можно сделать mock классов доступа к базе данных и проверить, что статус оповещения был обновлен. Однако, если написать такой код, то может оказаться, что тест в большом количестве случаев не проходит:

notificationService.submitNotification(deviceId, “Hello”)
verify(dbService, times(1)).updateStatus(any());


Между вызовом сервиса и проверкой прошло очень мало времени, запрос мог провести его в буферах-очередях, ждать выполнения http-вызова, или выполнения запроса в БД. Вероятность этого еще выше, если тест интеграционный - сразу после самбита уведомления мы пытаемся получить его статус из базы данных, но он не успевает измениться.

Чтобы сделать тест "зеленым", частенько выбирают некоторую “взятую с потолка” константу, например, 5 секунд и помещают инструкцию, которая заставляет поток "уснуть", между вызовом сервиса и проверкой:

notificationService.submitNotification(deviceId, “Hello”)
Thread.sleep(5000L) // 5000 milliseconds
verify(dbService, times(1)).updateStatus(any());


Как правило, это решает проблему прохождения теста, но генерирует несколько новых:

1. Пяти секунд может быть недостаточно в определенных обстоятельствах и тест будет падать, но не всегда, а, например, в 1% случаев. Не так много, чтобы переписать тест, но достаточно, чтобы с завидной регулярностью фейлить весь билд и раздражать 😤. То есть вы своими руками делаете свои тесты недетерминированными, flucky тестами.

2. Вторая причина часто более заметна для команды. Инструкция Thread.sleep(5000L) выставляет нижнюю границу выполнения теста равной пяти секундам, тогда как в удачном сценарии тест мог занять 10, 50, 100, 200 миллисекунд, секунду. Что, если у вас 200 тестов? Время выполнения 1000 секунд или около 16 минут, тогда как в удачном сценарии ваш билд мог занять во много раз меньше - 1, 2 … 4 минуты. Часто проблема нарастает очень плавно с увеличением количества тестов и неопытные команды не могут понять, в чем причина увеличения времени сборки 🤔.

Ставьте 🔥, если интересно прочитать про решение данной проблемы. Кто уже сталкивался - кидайте в комментарии ваши методы и интересные библиотеки, которые используются в вашей команде.

#kotlin #java #async
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥682👍1🤝1
Как “не спать” при тестировании асинхронных программ.

🛌 В предыдущем посте я описал мотивацию использования Thread.sleep при тестировании асинхронного кода и связанные с этим проблемы. Однако, пассивное ожидание потока очень помогает нам при тестировании асинхронного кода, ведь тесту нужно дождаться завершения асинхронной операции, чтобы начать проводить различные проверки.

Проблему для нас создает тот факт, что мы не знаем, сколько времени в действительности займет тестируемая операция (от выполнения к выполнению ее длительность может варьироваться). Поэтому приходится подбирать некую константу, которая в большинстве случаев превышает максимальную длительность операции. Пример: отправка оповещения в среднем занимает около 150ms, но в 3% случаев превышает 5 секунд. Thread.sleep(5_000) даст нам зеленый тест в c 97% вероятностью.

Нам бы хотелось, чтобы поток ожидал примерно столько же, сколько в действительности выполняется тестируемая операция. Если ее фактическая длительность 150ms, то поток должен находиться в ожидании (150ms + некоторая фиксированная константа, например, еще 50ms). Если операция выполнялась 5s, то и время ожидания должно быть 5s + 50ms.

Для этого необходим механизм, умеющий с заданной периодичностью проверять является ли некоторое условие истинным. Если условие не выполняется, то “засыпать” на короткий промежуток времени, если условие выполняется, то выходить из ожидания.

🔔Пример: клиент отправляет уведомление. Мы хотим протестировать, что после отправки его статус в базе данных будет обновлен на processed. Будем с периодичностью в 50ms делать запрос в базу данных, получать статус и, если он еще не изменен на processed, то ждать следующие 50ms. Нам станет известно, что статус обновился, через кратчайший промежуток времени.

Давайте посмотрим на пример кода, где в качестве такого механизма используется библиотека Awaitility.

id = service.submitNotification(deviceId, “Hello”)

with()
.pollInterval(50_MILLISECONDS)
.await()
.atMost(8, SECONDS)
.until(notificationStatusIsUpdated());


⚙️ У библиотеки есть большое количество конфигурируемых параметров. Например:
1. Период времени между проверкой условия (poll interval)
2. Начальная задержка перед первой проверкой
3. Минимальное время ожидания (например, вы хотите, чтобы оповещение отправлялось не раньше, чем через 5 секунд)
4. Максимальное время ожидания, за которое ваше условие должно выполниться
5. Игнорируемые исключения
6. Пул потоков, на котором будет выполняться тестирование

🐈‍⬛ В библиотеке также присутствует модуль для Kotlin, который помогает использовать API библиотеки в конструкциях, похожих на естественный язык, используя инфиксные функции.

🏋️‍♀️ Вы можете для тренировки написать собственный небольшой инструмент для ожидания (на java, kotlin, kotlin-coroutines) и поделиться кодом в комментариях 😊

#java #kotlin #async
Please open Telegram to view this post
VIEW IN TELEGRAM
👍25🔥8👎3
Недавно у меня возникла следующая задача: необходимо запросить данные у трех сервисов, но достаточно получить первый валидный ответ от любого их них, остальные 2 вызова можно не ожидать.

🤝 Интерфейс вызова следующий:
fun performCall(key: Key) : CompletableFuture<Value>


🤯 Я попытылся написать это на java, но ничего адекватного по количеству строк и общей читаемости кода у меня не получилось.

🐣 Решение я нашел в использовании корутин. Первое, что я сделал: превратил CompletableFuture<Value> в Deferred<Value>, используя extension-функцию public fun <T> CompletionStage<T>.asDeferred(): Deferred<T>. Это необходимо, чтобы переместить наш код в плоскость корутин.

🪢 Далее я использовал интересную функцию - select. Ей на вход можно передать любое количество “ожидающих (suspended)” функций и select будет ждать, пока любая из этих функций не вернет результат. Тогда исполнение продолжится и select вернет вам результат этой "победившей" корутины.

🍎 Таким образом, моя задача сводится к довольно простой вещи - поместим все наши вызовы в список и передадим функции select, а она вернет нам первый полученный результат. Это код, который нам нужен:

val calls = services.map { performCall(key).asDeferred() }.toList()
val res = select {
calls.forEach { call ->
call.onAwait { it } // it это результат вызова
}
}


💩 В реальности сервисы могут кидать исключения или возвращать пустой результат null. А нам необходимо дождаться именно первого валидного результата. Мы можем сделать следующее - проанализируем результат и в случае null будем снова заходить в select.

🚨 И тут надо помнить еще один интересный нюанс: если вы снова передадите в select тот же самый список, то он повторно выдаст вам результат ошибочного вызова, ведь он и правда завершен :) Поэтому давайте удалять из списка те вызовы, которые выполнились и не являются валидными. Только не забывайте поместить их в thread-safe контейнер, иначе легко словите ConcurrentModificationException.

Финальное решение выглядит как-то так (за исключением try-catch, чтобы сократить):
val calls = services.map { performCall(key).asDeferred() }.toCollection(ConcurrentHashMap.newKeySet())

var res: Value? = null
while (calls.isNotEmpty()) {
val res = select {
calls.forEach { call ->
call.onAwait { callRes ->
calls.remove(call)
callRes
}
}
}
if (res != null) break
}


☄️ Отмечу еще, что select бывает невероятно полезен в сочетании с использованием каналов (kotlinx.coroutines.Channel). Канал - аналог BlockingQueue в мире корутин, позже выложу пост о них. Если у вас есть задача мониторить множество очередей на предмет новых элементов и сразу же передавать их на обработку, то лучше средства не найти.

🪐 Интересно, что функция select по свой сути является аналогом системного вызова select на основе которого реализован "неблокирующий" ввод-вывод. Он позволяет мониторить множество TCP соединений на предмет новых событий IO и превращать их в единый "уплотненный" или иначе "мультиплексированный" поток событий, что позволяет очень эффективно использовать ресурсы системы. О "мультиплексировании" уже говорили тут и будем говорить еще.

Ставьте 🔥, если было интересно. Если вы знаете код на java, который решает аналогичную задачу и является плюс-минус таким же лаконичным, делитесь в комментах!

#kotlin #coroutines #java
Please open Telegram to view this post
VIEW IN TELEGRAM
Please open Telegram to view this post
VIEW IN TELEGRAM
🔥37👍73🆒1
Ждать вечно не лучший выбор

☠️ Помните задачку про перевод денег с одного аккаунта на другой? Одно из проблемных мест там - возможность взаимной блокировки (deadlock), когда первый поток выполняет перевод с аккаунта с id = 1 на аккаунт с id = 2, а второй поток переводит наоборот со второго на первый аккаунт. Соответственно, может возникнуть ситуация захвата ресурса “крест-накрест”. Один поток захватил блокировку на аккаунт 1 и ждет аккаунта 2, второй поток захватил второй аккаунт и ждет первого.

📌 На собеседовании могут спросить, как разрешить данную ситуацию. Каноничным ответом будет: “использовать иерархическую блокировку”. Суть подхода заключается в том, чтобы всегда захватывать и освобождать блокировки в одинаковом порядке. В данном случае, мы могли бы всегда сначала захватывать блокировку для аккаунта с меньшим ID, потом с большим. Это гарантирует отсутствие взаимной блокировки. Можете почитать про эту технику в интернете. Знать про иерархическую блокировку, конечно, стоит. Однако на практике я не видел ее использование в промышленном (не библиотечном) коде.

Есть более простая техника, которая по моему мнению должна применяться разработчиками гораздо чаще. Суть техники - не блокировать поток навечно в ожидании мьютекса. Большинство примитивов синхронизации включают в свой API методы, которые ограничивают максимальное время ожидания.

Пример 1: интерфейс java.util.concurrent.locks.Lock. Метод tryLock позволяет ограничивать максимальное время, за которое должна быть получена блокировка. Если этого не происходит, то метод возвращает false и поток может продолжать свое выполнение.

boolean tryLock(long timeout, TimeUnit unit)


Аналогичный метод выставляет и java.util.concurrent.Semaphore.

Пример 2: интерфейс java.util.concurrent.BlockingQueue. Методы offer и poll позволяют ограничить максимальное время ожидания при помещении и изъятии элемента из очереди соответственно.

boolean offer(E e, long timeout, TimeUnit unit)
boolean poll(long timeout, TimeUnit unit)


Само по себе использование блокировок с ограниченным временем ожидания уже поможет вам избежать deadlock. И все, что для этого придется сделать - внести минимальное изменение в код, добавив желаемый таймаут.

Может возникнуть логичный вопрос: “Но мне ведь нужен лок, а тут метод вернет false и что дальше?”. Если вам нужен лок, то вы можете пытаться захватить его в цикле (spin lock), делая несколько попыток. Таймаут даст вам возможность подсветить неудачные попытки захвата лока (логировать, писать метрики, сколько в среднем времени проходит до его успешного захвата). Иногда проблема заключается не в deadlock, а в банальном скоплении очереди на использование лока.

⚡️ Если проблема будет подсвечена, вы сможете перейти к ее решению более дорогостоящими, но и более эффективными методами. Например, реализовывать иерархическую блокировку.

🐕 Мой совет: поток - не Хатико, он не должен ждать вечно. Всегда используйте возможность ограничения максимального времени ожидания при использовании блокирующих 🔥 вызовов.

#java #kotlin #threads
5👍24🔥11💯42