Java: Ejemplo de JMS

1

JMS es una especificación de Java que define en esta plataforma una forma comunicación entre aplicaciones basada en el intercambio de mensajes. Los mensajes permiten a las aplicaciones no conocerse entre sí y comunicarse de forma asíncrona pudiendo hacer que los mensajes de una cola solo sean consumidos por un único receptor o por varios suscriptores interesados en un determinado tema. En el código de ejemplo muestro tanto la comunicación con colas (queues) como con temas (topics).

La platforma Java EE (Java Enterprise Edition) pone a disposición de los desarrolladores varias especificaciones, estas especificaciones describen las funcionalidades y la API que deben proporcionar las implementaciones y proporcionan al desarrollador herramientas para facilitar, hacer mejor las aplicaciones y de forma estándar según estas especificaciones. Haciendo uso de una de estas especificaciones es posible cambiar de una implementación a otra de forma transparente y sin modificar ninguna línea de código de la aplicación (en teoría). Hay especificaciones para persistencia en base de datos (JPA), para transacciones (JTA), para servicios web (JAX-WS ) y REST (JAX-RS) entre otras

Una de ellas es el servicio de mensajería JMS (Java Message Service). JMS es un sistema de comunicación entre aplicaciones en base a mensajes. El usar mensajes como forma de comunicación entre aplicaciones tiene los siguientes beneficios o ventajas:

  • Integración de sistemas: las aplicaciones que se comunican intercambiando mensajes puede ser desarrolladas con tecnologías diferentes el único requisito es que cada una de ellas tenga una forma de enviar y recibir los mensajes.
  • Escalabilidad: en caso de necesitar más capacidad para procesar los mensajes se pueden añadir más procesadores de mensajes sin que los emisores tengan ningún conocimiento de ello.
  • Asincronía: los mensajes puede ser procesados de forma asíncrona de forma que si un mensaje desencadena un proceso largo en tiempo el emisor del mensaje no tiene que esperar a que el proceso termine, el emisor puede enviar el mensaje y olvidarse. Además, el emisor no necesita que un receptor exista para enviar el mensaje tampoco el receptor necesita que el emisor exista para recibir el mensaje. Cuando haya un receptor este se encargará de procesar los mensajes que se hayan enviado y estén aún sin procesar.
  • No acoplamiento: el emisor y el receptor no se conocen directamente de forma que cada uno de ellos puede reemplazarse por una nueva implementación de forma transparente para el otro.

La comunicación puede realizarse de dos formas cada una con sus características:

  • Punto a punto (P2P): mediante esta comunicación el mensaje se garantiza que es procesado únicamente una vez independientemente del numero de posibles procesadores que podrían recibir el mensaje. El procesado del mensaje puede ser síncrono o asíncrono. En caso de que no haya un receptor disponible el mensaje se guarda hasta poder entregarse a un receptor. Se realiza mediante colas (Queue). En este modelo al emisor se le denomina Sender y al receptor Receiver.
  • Pub/Sub: en este modelo un mensaje es recibido por todos los receptores suscritos a un tema (Topic) de forma similar a una emisión broadcast. Al emisor se le denomina Publisher y al receptor Subscriber. El emisor y receptor están más desacoplados ya que el emisor no conoce cual de los receptores procesará el mensaje.

Esta comunicación de mensajes entre aplicaciones o entre diferentes partes de una aplicación tiene muchas posibilidades, podría ser utilizado para que un receptor enviase mensajes electrónicos en base a los mensajes enviados a una cola o para actualizar o pre calcular datos de una base de datos que puede llevar un considerable tiempo y que de hacerlo en la misma petición de una aplicación web haría que el cliente estuviese esperando hasta que el proceso terminase, en ambos casos no es necesario que los procesos se hagan inmediatamente, son solo dos ejemplos de aplicación real.

Los mensajes pueden tener cabeceras (asignada automáticamente por JMS o por el desarrollador), atributos y  los datos (payload) que pueden transportar texto, un stream de objetos primitivos, en función del tipo de mensaje. Cualquiera de estas cabeceras, atributos y datos puede utilizarse como información para procesar el mensaje.

A continuación se expone  el código de una sencilla aplicación que se conecta al servicio JMS de un  WildFly de forma remota y envía y recibe unos pocos mensajes de texto.

Primero el código de un modelo Pub/Sub. Como es propio de este modelo los mensajes se reciben por todos los receptores (los dos threads que escuchan en un topic que se crean), en este caso hay un publicador y dos suscriptores:


package com.jsm.mvit;

import java.util.Date;
import java.util.Properties;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Ejemplo que muestra como como enviar y recibir mensajes JMS de un Topic de forma remota.
*/
public class ModeloPubSub {

private static final Logger logger = LoggerFactory.getLogger(ModeloPubSub.class);
/**
* Antes de ejecutar este ejemplo, usando WildFly se ha de crear un usuario guest y clave guest con el
* script WILDFLY_HOME/bin/add-user.sh.
*/
public static void main(String[] args) throws Exception {
// Usuario y password para conectarse al servidor JNDI y al Topic
String usuario = "guest";
String contrasena = "guest";

// Propiedades para crear el contexto: clase factoría, url del servidor JNDI y credenciales
Properties env = new Properties();
logger.info("Inicio");
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
env.put(Context.PROVIDER_URL, "http-remoting://localhost:8080");
env.put(Context.SECURITY_PRINCIPAL, usuario);
env.put(Context.SECURITY_CREDENTIALS, contrasena);
logger.info("El ambiente ya está configurado");

// El objeto InitialContext permite obtener la referencias de los objetos registrado en el ábol JNDI
InitialContext ic = new InitialContext(env);
logger.info("El contexto IC está inicializado");
// Objetos a obtener para usar JMS:
// - TopicConnectionFactory
// - TopicConection
// - Topic
// - TopicSession
// - TopicSubscriber
// - TopicPublisher
TopicConnectionFactory connectionFactory = (TopicConnectionFactory) ic.lookup("jms/RemoteConnectionFactory");

logger.info("Conexión");
TopicConnection connection = connectionFactory.createTopicConnection(usuario, contrasena);
logger.info("Conxión establecida");
// Obtener el Topic en el cual se publicarán y del cual se recibirán los mensajes
javax.jms.Topic topic = (javax.jms.Topic) ic.lookup("jms/topic/test");

// Preparar el publicador y subscriptor al Topic
Subscriber subscriber1 = new Subscriber(connection, topic);
Subscriber subscriber2 = new Subscriber(connection, topic);
Publisher publisher = new Publisher(connection, topic);

// Inicializar la recepción y envío de los mensajes
connection.start();

// Empezar a publicar mensajes en el Topic (y a recibirlos)
Thread thread = new Thread(publisher);
thread.start();
// Esperar a que el publicador termine de enviar mensajes
thread.join();

// Parar el envío y recepción de mensajes
connection.stop();

// Terminar liberando los recursos
subscriber1.close();
subscriber2.close();
publisher.close();
connection.close();
ic.close();
}

private static class Subscriber implements MessageListener {

private TopicSession session;
private TopicSubscriber subscriber;

public Subscriber(TopicConnection connection, javax.jms.Topic topic) throws Exception {
this.session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
this.subscriber = this.session.createSubscriber(topic, null, false);
this.subscriber.setMessageListener(this);
}

public void close() throws Exception {
subscriber.close();
session.close();
}

public void onMessage(Message message) {
try {
TextMessage text = (TextMessage) message;
System.out.printf("Suscriptor solicita la hora (%s): El publicador dice: «%s»\n", this, text.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}

private static class Publisher implements Runnable {

private TopicSession session;
private TopicPublisher publisher;

public Publisher(TopicConnection connection, javax.jms.Topic topic) throws Exception {
this.session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
this.publisher = this.session.createPublisher(topic);
}

public void close() throws Exception {
publisher.close();
session.close();
}

public void run() {
try {
for (int i = 0; i < 10; ++i) {
Message mensaje = session.createTextMessage(String.format("La hora es (%d) " + new Date(), i));
publisher.publish(mensaje);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

A continuación el código de utilizando un modelo punto a punto en el que vuelve a haber un emisor y dos receptores. En el resultado de la ejecución puede observarse que a pesar de haber dos receptores solo uno de los dos recibe cada mensaje:


package com.jsm.mvit;

import java.util.Date;
import java.util.Properties;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Ejemplo que muestra como como enviar y recibir mensajes JMS de un Queue de forma remota.
*/
public class Queue {

/**
* Antes de ejecutar este ejemplo, usando WildFly se ha de crear un usuario guest y clave guest con el
* script WILDFLY_HOME/bin/add-user.sh.
* levantar wildfly con standalone.bat --server-config=standalone-full.xml
*/

private static final Logger logger = LoggerFactory.getLogger(Queue.class);

public static void main(String[] args) throws Exception {
// Usuario y password para conectarse al servidor JNDI y al Queue
String usuario = "guest";
String contrasena = "guest";

// Propiedades para crear el contexto: clase factoría, url del servidor JNDI y credenciales
logger.info("inicio");
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
env.put(Context.PROVIDER_URL, "http-remoting://localhost:8080");
env.put(Context.SECURITY_PRINCIPAL, usuario);
env.put(Context.SECURITY_CREDENTIALS, contrasena);

// El objeto InitialContext permite obtener la referencias de los objetos registrado en el ábol JNDI
InitialContext ic = new InitialContext(env);

// Objetos a obtener para usar JMS:
// - QueueConnectionFactory
// - QueueConection
// - Queue
// - QueueSession
// - QueueSubscriber
// - QueuePublisher
QueueConnectionFactory connectionFactory = (QueueConnectionFactory) ic.lookup("jms/RemoteConnectionFactory");
QueueConnection connection = connectionFactory.createQueueConnection(usuario, contrasena);

// Obtener el Queue en el cual se publicarán y del cual se recibirán los mensajes
javax.jms.Queue queue = (javax.jms.Queue) ic.lookup("jms/queue/test");

// Preparar el publicador y subscriptor al Queue
Receiver receiver1 = new Receiver(connection, queue);
Receiver receiver2 = new Receiver(connection, queue);
Sender sender = new Sender(connection, queue);

// Inicializar la recepción y envío de los mensajes
connection.start();

// Empezar a enviar mensajes en el Queue (y a recibirlos)
Thread thread = new Thread(sender);
thread.start();
// Esperar a que el enviador termine de enviar mensajes
thread.join();

// Parar el envío y recepción de mensajes
connection.stop();

// Terminar liberando los recursos
receiver1.close();
receiver2.close();
sender.close();
connection.close();
ic.close();
}

private static class Receiver implements MessageListener {

private QueueSession session;
private QueueReceiver receiver;

public Receiver(QueueConnection connection, javax.jms.Queue queue) throws Exception {
this.session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.receiver = this.session.createReceiver(queue);
this.receiver.setMessageListener(this);
}

public void close() throws Exception {
receiver.close();
session.close();
}

public void onMessage(Message message) {
try {
TextMessage text = (TextMessage) message;
System.out.printf("Receptor ¿qué hora es? (%s): Un publicador dice: «%s»\n", this, text.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}

private static class Sender implements Runnable {

private QueueSession session;
private QueueSender sender;

public Sender(QueueConnection connection, javax.jms.Queue queue) throws Exception {
this.session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.sender = this.session.createSender(queue);
}

public void close() throws Exception {
sender.close();
session.close();
}

public void run() {
try {
for (int i = 0; i < 10; ++i) {
Message mensaje = session.createTextMessage(String.format("La hora es (%d) " + new Date(), i));
sender.send(mensaje);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

Comentar que los mensajes se procesan en serie por cada MessageListener, esto es, hasta que no termina el consumo de uno no se consume el siguiente. Esto se aplica por MensajeListener y sesión.

Si queremos probar los ejemplos deberemos disponer del servidor de aplicaciones WildFly. Para que los ejemplos funcionen deberemos configurarlo añadiendo un usuario «guest», de contraseña «guest» y de rol «guest», el añadirlo lo podemos hacer con la utilidad add-user.sh. También se debe modificar el archivo standalone-full.xml añadiendo el topic y el queue en la sección de JMS e iniciar WildFly usando esa configuración:


./standalone.sh --server-config=standalone-full.xml

Dentro del nodo <subsystem xmlns=”

</pre>
<subsystem xmlns="urn:jboss:domain:messaging:2.0">

<hornetq-server>

...

<jms-destinations>

<jms-queue name="testQueue">

<entry name="jms/queue/test"/>

<entry name="java:jboss/exported/jms/queue/test"/>

</jms-queue>

<jms-topic name="testTopic">

<entry name="jms/topic/test"/>

<entry name="java:jboss/exported/jms/topic/test"/>

</jms-topic>

</jms-destinations>

</hornetq-server>

</subsystem>
<pre>

 

 

 

 

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s