Saturday, January 23, 2010

Flex data push with BlazeDS and EJB3

With BlazeDS we can push data from the J2EE container to the Flex clients just like Adobe Life Cycle Data Services can. In blog I will use the Message Broker and Service Adapter of BlazeDS in combination with EJB3 Entity / Session Bean to push every change to the client. The Message Broker routes the messages with the EJB entities to the custom service adapter. The service Adapter is a service, which registers all the subscribed Flex clients and publish the changes to these client.

For this example I used JDeveloper 11g and Weblogic. ( For more information see my previous blog ) .
This are the steps we need to do to make this work.
  • Generate an Entity Bean with Entity from Tables in JDeveloper
  • Generate an EJB Session Bean and Add the Message Broker code
  • Make an ServiceAdapter
  • Configure the BlazeDS files
  • Add an Flex Class which maps to the Entity Bean
  • Add the Flex producer and consumer code
  • Create an EJB client for the changes.
We start by creating an Employee Entity Bean. ( Based on the employees table of the Oracle HR schema )

package nl.whitehorses.model.entities;

import java.sql.Timestamp;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;

@NamedQueries( { @NamedQuery(name = "Employees.findAll", query = "select o from Employees o") })
public class Employees implements Serializable {
private Double commissionPct;
private Long departmentId;
@Column(nullable = false, unique = true, length = 25)
private String email;
@Column(name="EMPLOYEE_ID", nullable = false)
private Long employeeId;
@Column(name="FIRST_NAME", length = 20)
private String firstName;
@Column(name="HIRE_DATE", nullable = false)
private Timestamp hireDate;
@Column(name="JOB_ID", nullable = false, length = 10)
private String jobId;
@Column(name="LAST_NAME", nullable = false, length = 25)
private String lastName;
@Column(name="PHONE_NUMBER", length = 20)
private String phoneNumber;
private Double salary;

public Employees() {


My HRSessionEJBBean Session Bean, this Bean has a remote interface. JDeveloper can generate this Session Bean for you with its remote interface. This Bean contains the Message Broker code which routes the messages with all the employees when the persist, merge or remove method of the EntityManager is called.


import java.util.List;

import javax.ejb.Remote;
import javax.ejb.Stateless;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

import nl.whitehorses.model.entities.Employees;

import flex.messaging.MessageBroker;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.util.UUIDUtils;

@Stateless(name = "HRSessionEJB", mappedName = "flex_ejb2-Model-HRSessionEJB")
public class HRSessionEJBBean implements HRSessionEJB {
private EntityManager em;

public HRSessionEJBBean() {

private void pushEmployees() {
String clientId = UUIDUtils.createUUID();
MessageBroker msgBroker = MessageBroker.getMessageBroker(null);
AsyncMessage msg = new AsyncMessage();

public Employees persistEmployees(Employees employees) {
return employees;

public Employees mergeEmployees(Employees employees) {
Employees emp = em.merge(employees);
return emp;

public void removeEmployees(Employees employees) {
employees = em.find(Employees.class, employees.getEmployeeId());

/** <code>select o from Employees o</code> */
public List<Employees> getEmployeesFindAll() {
return em.createNamedQuery("Employees.findAll").getResultList();

Create the Blaze Custom Service Adapter, we will add this adapter later to the BlazeDS configuration files. This adapter will do a JNDI lookup of our EJB Session Bean and pushes the messages to the connected Flex clients.

package nl.whitehorses.blazeds.adapter;

import java.util.List;

import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.Message;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import nl.whitehorses.model.entities.Employees;

public class EmployeesServiceAdapter extends ServiceAdapter {
Context context = null;
HRSessionEJB hRSessionEJB = null;

public EmployeesServiceAdapter() {
try {
context = new InitialContext();
hRSessionEJB =
} catch (NamingException e) {
System.out.println("Adapter initilized");

public void start() {
System.out.println("Adapter started");


public void stop() {
System.out.println("Adapter stopped");

private List<Employees> getEmployees() {
return hRSessionEJB.getEmployeesFindAll();

public Object invoke(Message msg) {
if (msg.getBody().equals("New")) {
System.out.println("Adapter received new");
return getEmployees();
} else {
System.out.println("Adapter sending message");
AsyncMessage newMessage = (AsyncMessage)msg;
MessageService msgService =
msgService.pushMessageToClients(newMessage, true);
return null;

In the messaging-config.xml we need add this custom Service Adapter and add a Employee destination which connect this to our custom adapter and the streaming AMF channel

<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" class="">

<adapter-definition id="EmployeesServicePushAdapter" class="nl.whitehorses.blazeds.adapter.EmployeesServiceAdapter"/>

<destination id="EmployeesServicePush">
<channel ref="my-streaming-amf" />
<adapter ref="EmployeesServicePushAdapter"/>


The services-config.xml which import the messaging-config.xml configuration file and contains the streaming-amf channel configuration.

<?xml version="1.0" encoding="UTF-8"?>
<service-include file-path="messaging-config.xml" />
<channel ref="my-amf"/>

<channel-definition id="my-amf"
<endpoint url="http://{}:{server.port}/{context.root}/messagebroker/amf"

<channel-definition id="my-streaming-amf"
<endpoint url="http://{}:{server.port}/{context.root}/messagebroker/streamingamf"
<user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="3"/>
<user-agent match-on="Firefox" kickstart-bytes="2048" max-streaming-connections-per-session="3"/>

<target class="flex.messaging.log.ConsoleTarget" level="Error">

We are finished with the java part and we can work on the Flex part.
In Flex we need to add a simple Employees class which is connected to the entity bean

package entities
public class Employees
public var commissionPct:Number;
public var departmentId:int;
public var email:String;
public var employeeId:int;
public var firstName:String;
public var hireDate:Date;
public var jobId:int;
public var lastName:String;
public var phoneNumber:String;
public var salary:Number;

public function Employees()


The Flex application mxml with the producer and consumer component

<?xml version="1.0" encoding="utf-8"?>
<mx:Application xmlns:mx="" width="1200" height="500">
import mx.collections.ArrayCollection;
import mx.messaging.messages.IMessage;
import mx.messaging.messages.AsyncMessage;
import entities.Employees;

private function init():void{
var message:AsyncMessage = new AsyncMessage();
message.body = "New";

private function onMsg(event:MessageEvent):void{
grid.dataProvider = event.message.body as ArrayCollection;

private function pub():void {
var message:AsyncMessage = new AsyncMessage();
message.body = "New";

private function ack(event:MessageAckEvent):void{
grid.dataProvider = event.message.body as ArrayCollection;


<mx:Producer id="producer" destination="EmployeesServicePush" acknowledge="ack(event)"/>
<mx:Consumer id="consumer" destination="EmployeesServicePush" message="onMsg(event)"/>

<mx:DataGrid id="grid">
<mx:DataGridColumn dataField="firstName" headerText="First Name"/>
<mx:DataGridColumn dataField="lastName" headerText="Last Name"/>
<mx:DataGridColumn dataField="departmentId" headerText="Department"/>


We can start the J2EE container and the Flex client. To test this we can use an EJB test client which adds a new Employee and look in the Flex application if we can see this new Employee.

package test;

import java.util.Calendar;
import java.util.Hashtable;

import javax.naming.Context;
import javax.naming.InitialContext;

import javax.naming.NamingException;

import nl.whitehorses.model.entities.Employees;

public class HRSessionEJBClient {
public static void main(String [] args) {
try {
final Context context = getInitialContext();
HRSessionEJB hRSessionEJB = (HRSessionEJB)context.lookup("");

Employees emp = new Employees();
emp.setHireDate(new java.sql.Timestamp(Calendar.getInstance().getTime().getTime()));

} catch (Exception ex) {

private static Context getInitialContext() throws NamingException {
Hashtable env = new Hashtable();
// WebLogic Server 10.x connection details
env.put( Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory" );
env.put(Context.PROVIDER_URL, "t3://");
return new InitialContext( env );


  1. It is a magic jobb , I havent tried yet but I is elegant

  2. Could you please provide full ziped project

  3. Ok,

    Here we go the link is


  4. Very nice explanation and complete example. Well done.

    I have one question though. What if some wants to notify back just one specific user?

    Ex. Progress indication of a long operation. This could be initiated by users and each one of course should see his own progress.
    In other words not a global server push but based on id or something else.

    Any ideas would really be appreciated.
    Thank you.

  5. Hi

    Great question, I don't know this exactly but I think it works the same as in jms , topic queue.

    I think the solution must be in the message broker with the client id and message id

    Let me know if you got it working


  6. Great job. This is exactly what I am looking for.