- xkafka es una librería open source que permite usar Kafka en entornos Go de forma tan simple como un servicio HTTP
- Al usar confluent-kafka-go normalmente se necesitan loops de procesamiento complejos y mucho código boilerplate, pero xkafka permite enfocarse en la lógica central con una estructura de Handler, Middleware y Message
- El publishing y consumo de mensajes se maneja de forma intuitiva, como si fuera un esquema de solicitud/respuesta HTTP, y oculta gran parte de la complejidad de Kafka, como la gestión de offsets, la configuración de concurrencia y el manejo de errores
- Soporta de forma sencilla varios patrones requeridos en servicios reales, como Streaming/Batch, procesamiento secuencial/asíncrono y garantías de At-most-once/At-least-once
- Permite aplicar fácilmente patrones necesarios en producción, como manejo de errores por capas y reintentos/logging/métricas basados en middleware
Kafka estilo HTTP
- xkafka es una librería que abstrae Kafka en Go como si fuera un servicio HTTP
- Message es similar a una solicitud HTTP e incluye topic, partición, offset, clave, valor, headers, callbacks, etc.
- Handler procesa la lógica de negocio, igual que un HTTP Handler
- Middleware permite aplicar funcionalidades adicionales como logging, métricas y reintentos por separado de la lógica de negocio
Publicación de mensajes (Publishing Messages)
- Se crea un Producer con
xkafka.NewProducer, luego se crea un objeto de mensaje y se publica con la función Publish
- Es posible hacer publicación asíncrona (
AsyncPublish) y registrar callbacks, lo que facilita el procesamiento de eventos asíncronos o de alto rendimiento
- La entrega de mensajes se procesa en una goroutine en segundo plano, y el estado de entrega puede rastrearse mediante callbacks
Consumo de mensajes (Consuming Messages)
- Al crear un Consumer, se especifican la función Handler y opciones como topic, broker y configuración
- Se pueden agregar middlewares con
consumer.Use()
- El consumo de mensajes comienza con
consumer.Run(ctx)
Streaming vs. Batch
- Streaming: procesa cada mensaje de inmediato, uno por uno, en cuanto llega. Es conveniente para cargas bajas, ahorro de memoria o garantías de procesamiento más fuertes
- Batch: procesa mensajes agrupados por cantidad o por ventana de tiempo. Es conveniente para sistemas de alto throughput o para reducir la carga en sistemas downstream
Secuencial o asíncrono
- Por defecto, el procesamiento es secuencial (Sequential): no se lee el siguiente mensaje hasta terminar el anterior
- Al usar
xkafka.Concurrency(N), se habilita un modo asíncrono (Async) que procesa en paralelo N mensajes (o batches)
Gestión de offsets
- En el comportamiento base de Kafka, el offset avanza apenas se entrega el mensaje, por lo que puede haber pérdida de mensajes ante una falla
- xkafka configura
enable.auto.offset.store=false, de modo que el offset solo se guarda después de completar el procesamiento del mensaje (o batch)
- Así, se puede lograr garantía de procesamiento en Kafka sin necesidad de gestionar el estado del mensaje en una base de datos o cola separada
-
Garantía At-Most-Once
- Por defecto, el offset se commitea en segundo plano según
enable.auto.commit=true de Kafka
- Con
xkafka.ManualCommit(true) y procesamiento secuencial, el offset se commitea antes de leer cada mensaje o batch, garantizando At-most-once
-
Garantía At-Least-Once
- Al combinar
xkafka.ManualCommit(true) con concurrencia (N>1), los offsets se commitean de forma síncrona y en orden incluso durante el procesamiento en paralelo
- Esto permite aplicar fácilmente un patrón con garantía At-least-once
Manejo de errores
-
Nivel Handler
- Dentro del Handler se pueden gestionar errores de aplicación o enviar mensajes a una Dead Letter Queue
- Se puede controlar explícitamente el resultado con
msg.AckSuccess() en caso de éxito, msg.AckSkip() para omitir y msg.AckFail(err) en caso de fallo
-
Nivel Middleware
- En el middleware se puede reutilizar lógica común, como reintentos o logging de errores, en múltiples Handlers
- También es fácil aplicar diferentes políticas de reintento o métodos de manejo según el tipo de error
-
Nivel Global
- Los errores del broker Kafka o de la librería se gestionan de forma centralizada con la opción obligatoria
xkafka.ErrorHandler
- Si este handler devuelve un error distinto de nil, se detiene el funcionamiento del Consumer o Producer
Conclusión
- xkafka transforma la experiencia compleja de usar Apache Kafka en una estructura familiar de servidor HTTP para desarrolladores Go
- Reduce el boilerplate innecesario y ofrece un entorno donde es posible concentrarse solo en la lógica de negocio
- Es mucho más conciso e intuitivo que el código tradicional con confluent-kafka-go
- Puedes empezar de inmediato consultando la documentación oficial y los ejemplos
1 comentarios
Mmm, según entendía, en golang
sarama era lo que más se prefería..
Más de lo que uno pensaría, el cliente de Kafka es... muy complejo cuando hay fallas del broker o excepciones,
y no sé si podrá cubrir todos los casos..