Spring Cloud Stream
Introducción
La comunicación asíncrona es sin duda un gran aliado de los microservicios. Si de una cosa pueden fardar las aplicaciones con microservicios frente a las aplicaciones monolíticas es del desacoplamiento que tienen estas, y este desacoplamiento también lo vemos en la comunicación asíncrona.
En el último artículo expliqué como funciona Kafka y cómo permite que nuestros servicios se comuniquen asíncronamente. Si no sabes lo que es Kafka te recomiendo encarecidamente ver ese artículo para poder comprender los términos que se utilizen(eventos, producer, consumer …)
Pero hoy veremos cómo implementar esta comunicación asíncrona con Spring Cloud Stream
Spring Cloud Stream
Spring Cloud Stream va a hacer todo el trabajo de la conexión con Kafka, RabbitMQ o cualquier otra tecnología de mensajería que decidamos usar. De esta manera nosotros nos podemos encargar directamente de la lógica de negocio y el procesamiento de los eventos, olvidándonos de toda la configuración.
Ejemplo
Considero que explicar la implementación al mismo tiempo que se muestra un ejemplo va a ser la mejor manera de interiorizar todo este maravillosos conocimiento.
- Imaginémonos una aplicación con dos microservicios, uno llamado
Noticias
encargado de todo lo relacionado a las noticias de una aplicación, y un segundo microservicio llamadoMensajería
que se encarga de enviar emails,sms … a los usuarios.
- De vez en cuando el microservicio
Noticias
quiere notificar a sus usuarios de una nueva noticia importante. Pero claro, quiere hacer esto de manera asíncrona, es decir, publicar la noticia y en un segundo plano avisar a los usuarios. - Como hemos decidido usar kafka, tendremos un servidor kafka activo:
Implementación en el microservicio Mensajería
El MS de mensajería quiere estar en escucha, para que cuando se cree una nueva noticia este se entere y envíe un correo a sus usuarios, para ello en el microservicio Mensajería
:
- Necesitaremos estas dependencias:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
</dependency>
-
La dependencia de
spring-cloud-stream
va a ser la que realize toda la configuración por detrás, tú utilizas canales(input y output) para los eventos, y te olvidas de topics, offsets, particiones … - Por otra parte
spring-cloud-stream-binder-kafka
conecta Apache Kafka con esta ‘abstracción’ que hemos conseguido con la anterior dependencia, la cual nos permitiría no solo integrar kafka, sino, también otros sistemas de mensajería como RabbitMQ(habría que usar la dependenciaspring-cloud-stream-binder-rabbit
para ese caso) spring-cloud-stream-test-binder
simulará el funcionamiento de los binders(luego veremos qué son) para el testing.
- Este microservicio podrá recibir los mensajes gracias a las Functions que proporciona Java. En este caso, estas funciones las podemos implementar en una carpeta que llamaremos
functions
, que ubicaremos en el paquete principal del microservicio(i.ecom.myApp.mensajeria.functions.MensajeríaFunctions
)
Concretamente crearemos una clase MensajeriaFunctions
:
package com.myApp.mensajeria.functions;
import java.util.function.Consumer;
@Configuration
@AllArgsConstructor
public class MensajeriaFunctions {
private static final Logger log = LoggerFactory.getLogger(MensajeriaFunctions.class);
@Bean
public Consumer<EmailDto> sendEmail() {
return emailDto -> {
log.info("sending Email to user " + emailDto.getRecipientEmail());
...
};
}
}
- Contendrá la anotación
@Configuration
. E implementará una función Consumer la cual se encargará de recibir los detalles del email, para luego enviarlo al usuario(He quitado la implementación de enviar un email ya que se desvía del tema)
Vale bien, hemos creado la función, pero cómo sabe Spring Cloud a qué topic/queue conectarse para poder recibir esos eventos?
- Para esto, en el application.yml necesitamos configurar un binding. Un binding es la conexión entre la función y el topic de Kafka:
spring: application: name: "mensajeria" cloud: function: definition: sendEmail stream: bindings: sendEmail-in-0: destination: send-email kafka: binder: brokers: - localhost:9092
Vayamos explicando cada parte:
-
spring.cloud.function.definition
: Aquí hemos añadido el nombre de la función que habíamos creado. Si queremos añadir otra función independiente las separaremos por un ;sendEmail;sendSMS
. Pero si quiseramos concatenar funciones, entonces tendríamos que usar el carácter **** spring.cloud.stream.bindings.sendEmail-in-0
: Para el nombre del binding, seguimos la convención oficial. Que consiste en poner en primer lugar el nombre de la funciónsendEmail
, luego indicarin
oout
dependiendo de si es un consumer o un producer respectivamente, y por último indicar el índice, por si quisieramos que nuestra función se conectara a más topics(o queues para RabbitMQ).- Por otra parte con la propiedad
spring.cloud.stream.bindings.sendEmail-in-0.destination
indicamos el nombre que le queremos dar al topic, en este caso send-email - Y por último hay que indicar donde se encuentra nuestro servicio kafka corriendo, en mi caso en el puerto localhost:9092. Lo indicamos de la siguiente manera: ``spring.cloud.stream.kafka.binder.brokers.
`
Hasta aquí ya hemos configurado totalmente el Consumer, es decir el microservicio que escucha los mensajes a través de un topic. Ahora nos queda por configurar el microservicio que envía los eventos(mensajes) al topic
Implementación en el microservicio Noticias
Para este ejemplo el microservicio noticias quiere publicar en un topic los emails a los que quiere enviar una notificación sobre esta nueva noticia, para ello en el microservicio Noticias
:
- Añadiremos exactamente las mismas dependencias que hemos añadido en el microservicio Mensajería
- Añadimos la configuración oportuna:
spring: application: name: "noticias" cloud: stream: bindings: sendEmail-out-0: destination: send-email kafka: binder: brokers: - localhost:9092
Es prácticamente la misma configuración que hemos hecho en el microservicio anterior (MS Mensajería), solo que el nombre del binding es sendEmail-out-0, es decir en vez de in hemos puesto out Y ya, ya hemos preparado todo el entorno, solo queda ver cómo enviar el mensaje desde el consumer de manera asíncrona:
- Lo único que hay que hacer es desde donde queramos enviar el mensaje ejecutar el siguiente comando :
streamBridge.send("event-creation-trigger", emailDto);
StreamBridge se puede inyectar en tu clase con el Bean
StreamBridge
, este bean lo tenemos gracias a la dependenciaspring-cloud-stream
. Indicaremos el topic como primer argumento, y como segundo argumento el dto del email que recibirá el microserviciomensajeria
.