Luft: cómo desarrollaron un datastore que consulta 1,000 millones de datos en 10 segundos
(tv.naver.com)En un entorno con más de 10 mil millones de eventos mensuales en promedio, surgió la necesidad de analizar datos rápidamente para hacer análisis de comportamiento de usuarios (cohortes).
(Ej. mujeres de 30 y tantos que gastaron más de 100 mil wones al mes en nuestra app durante los últimos 6 meses → su tasa de regreso)
Este es el relato de cómo implementaron directamente un datastore que antes los desarrolladores solo usaban.
Para implementar consultas de análisis de comportamiento de usuarios…
-
Debe ser posible consultar métricas que no hayan sido precalculadas de antemano (+ también deben poder hacerse nuevos tipos de análisis sin reindexación)
-
Al hacer
Group Byde datos de eventos por usuario, el cuello de botella del high-cardinality shuffle debe ser mínimo
Evaluaron si usar una solución existente o crear una propia
-
Ya usaban Druid en otro lugar, pero por las limitaciones de
Pre-Aggregation(un enfoque en el que solo se leen valores ya calculados) no era adecuado para implementar esta funcionalidad -
Es posible operar data warehouses como Snowflake o Redshift a gran escala, pero por su naturaleza de uso general se necesita correr un clúster demasiado grande para el objetivo, así que resulta costoso
-
Para cubrir necesidades diversas como funnel, emparejamiento de ID y otras, una base de datos basada en SQL tiene limitaciones
Al final, crearon su propio datastore
-
Luft = un datastore optimizado desde el inicio para ejecutar rápidamente consultas de análisis de comportamiento de usuarios agrupadas por ID de usuario
-
Está construido con Golang
-
Analiza decenas de TB de datos de usuarios con menos de 5 nodos, en un promedio de 3 segundos y un máximo de 10 segundos
-
A diferencia de un RDBMS convencional, tiene inmutabilidad (si hace falta, sobrescribe los datos del mismo período) → diseño de clúster simple, alto rendimiento sin implementar un page manager complejo, y posibilidad de diseñar el formato de almacenamiento deseado
Revisando la base técnica
- TrailDB (motor de almacenamiento) - un rowstore de eventos de series de tiempo optimizado para particionamiento por ID de usuario
→ Diccionariza los valores y guarda solo sus ID
→ Ordena los eventos del usuario cronológicamente y almacena solo el incremento de tiempo respecto al evento anterior y las columnas que cambiaron (porque la mayoría de los atributos del usuario no cambian)
→ No tiene índices. Siempre hay que hacer full scan.
→ Pero sorprendentemente presume una tasa de compresión altísima (CSV 13GB → TrailDB ~300mb)
→ Como la complejidad temporal es O(n), pensaron que bastaba con reducir la complejidad espacial
- LLVM (motor de consultas)
→ Pero TrailDB solo ofrece equals con forma OR-AND, y además las consultas parseadas en Go deben pasarse a C/C++
→ Descubrieron que PostgreSQL compila consultas con LLVM JiT
→ Como las consultas amplían funcionalidad con frecuencia, escribirlas en C/C++ incrementaría el costo de desarrollo; eso puede evitarse si Golang solo genera LLVM IR y luego C/C++ lo ejecuta con compilación JiT
- Crearon directamente su propia capa de procesamiento
→ Se usa mucho MapReduce, pero no podían usarlo por trabajar con Golang
→ Spark/Hadoop están optimizados para long-running jobs, así que incluso al integrarlos el rendimiento no era bueno
→ También lo hicieron por su cuenta → https://github.com/ab180/lrmr
→ Combinación de gRPC + Protobuf + etcd, tomando bastante del diseño familiar de Spark
→ Decidieron renunciar a la resiliencia → si se lleva el rendimiento al extremo, incluso si ocurre una falla, volver a empezar desde cero toma menos de 10 segundos
→ Como el procesamiento de grandes volúmenes de datos causaba con frecuencia problemas de buffer overflow (backpressure), lo cambiaron por un pull-based event stream (adoptado en Kafka, Armeria, etc.)
- Implementaron el sharding por su cuenta
→ Shard = nodo histórico
→ ¿Y si se usa el rango de fechas de la partición como clave de sharding?
→ Todas las consultas tienen tiempo → fácil de filtrar
→ En el mismo rango temporal hay volúmenes de datos similares → fácil de distribuir
→ Los entornos distribuidos no son elegantes…
→ ¿Qué pasa si un nodo se cae o se agrega uno nuevo?
→ ¿Qué pasa si se llena el almacenamiento?
→ ¿Qué pasa si por una falla la carga se concentra en un solo nodo?
→ Personalizaron la cost function de Druid para que, mientras más cercanos y superpuestos sean los rangos de fecha de las particiones, mayor sea el costo
→ Para asegurar la disponibilidad de los shards, hicieron lo siguiente
→ Aplicaron TTL a la información de shards y la actualizan periódicamente (etcd)
→ Guardan las particiones en S3 y administran la lista de particiones con DynamoDB
Estado actual en producción
- Con solo 4 instancias
c5.2xlarge, escanean 500GB de datos en menos de 15 segundos
Objetivos a futuro (o tareas pendientes)
-
Quieren hacer análisis de funnel en tiempo real con un clúster de menos de 10 nodos
-
Quieren dar soporte a Spark para habilitar integración con ML y otros casos
-
Están desarrollando su propio column store (Ziegel) para reemplazar TrailDB
→ Optimización con SIMD y multinúcleo
→ Filtrado previo basado en atributos de usuario con Bitmap Index
2 comentarios
traildb es divertido. https://www.youtube.com/watch?v=-oPFxSwn0lM Está bueno. Aunque es un video viejo, probablemente traildb no haya cambiado en todo este tiempo.
Ahora que lo veo, también hay una entrada en el blog del desarrollador,
https://engineering.ab180.co/stories/introducing-luft
Es la primera vez que escucho de TrailDB, pero es algo así...
https://github.com/traildb/traildb