Skip to content

Understanding Broadcaster

jfarcand edited this page Jul 9, 2012 · 32 revisions

Understanding Broadcaster

A Broadcaster implements the publish/subscribe paradigm. An application can subscribe to one or many Broadcaster to get notified about events. By default, a single Broadcaster is created by the framework and associated with every new AtmosphereResource. The default Broadcaster's id is always "/*", which means that if you invoke

 Future<Object> f = broadcaster.broadcast("hello");

all AtmosphereResource will gets notified and will have a chance to handle the event. By default, broadcaster.broadcast("hello") is an asynchronous operation and won't block. If you want to get notified when the asynchronous execution complete, you can either use the returned Future

  Future<Object> f = broadcaster.broadcast("hello");
  f.get(); // blocking until broadcast completed.

or you can add a BroadcasterListener that will be invoked when the broadcast operation completes.

broadcaster.addBroadcasterListener(new BroadcasterListener() {
   public void onComplete(Broadcater b) {
       // do something
   }
}.broadcast("hello");

You can associate a Broadcaster with an AtmosphereResource by doing

  atmosphereResource.setBroadcaster(broadcaster);

You can associate an AtmosphereResource to several Broadcasters by doing

 broadcaster.addAtmosphereResource(atmosphereResource);

As an example, implementing a Twitter like application using the Atmosphere Framework would consist of creating one Broadcaster per Twitter account:

 Broadcaster b = BroadcasterFactory.getDefault().get("jfarcand");
 Broadcaster b2 = BroadcasterFactory.getDefault().get("atmo_framework");

If user 'jfarcand" wants to be notified when 'atmo_framework' publish a tweet, you just associate the AtmosphereResource representing 'jfarcand' to the 'atmo_framework' Broadcaster:

  b2.addAtmosphereResource(atmosphereResource);

You can create channel (or Broadcaster) on the fly, using the BroadcasterFactory class.

MetaBroadcaster

The MetaBroadcaster utility class can be really helpful when you want to broadcast messages to all or a subset of existing Broadcaster. For example, executing

MetaBroadcaster.getDefault().broadcastTo("/", "hello world");

will broadcast the "hello world" message to all AtmosphereResource associated to all Broadcaster. To be more specific, assume you create

  BroadcasterFactory.getDefault().get("/a");
  BroadcasterFactory.getDefault().get("/a/a1");
  BroadcasterFactory.getDefault().get("/a/a2");
  BroadcasterFactory.getDefault().get("/b");
  BroadcasterFactory.getDefault().get("/c");

Doing

   MetaBroadcaster.getDefault().broadcastTo("/a/*", "hello world");

is equivalent of doing:

  // Retrieve the Broadcaster named "/a/a1"
  BroadcasterFactory.getDefault().lookup("/a/a1", true).broadcast("hello world");
  // Retrieve the Broadcaster named "/a/a2"
  BroadcasterFactory.getDefault().lookup("/a/a2", true).broadcast("hello world");

You can also schedule periodic tasks using the

   MetaBroadcaster.getDefault().scheduleTo("/a/*", new Callable<String>() {
         public String call() {
             return "Hello world";   
         }
   }, 10, TimeUnits.SECONDS);

is equivalent of doing:

  // Retrieve the Broadcaster named "/a/a1"
  BroadcasterFactory.getDefault().lookup("/a/a1", true).scheduleFixedBroadcast(new Callable<String>() {
         public String call() {
             return "Hello world";   
         }
   }, 10, TimeUnits.SECONDS)
  // Retrieve the Broadcaster named "/a/a2"
  BroadcasterFactory.getDefault().lookup("/a/a2", true).scheduleFixedBroadcast(new Callable<String>() {
         public String call() {
             return "Hello world";   
         }
   }, 10, TimeUnits.SECONDS)

Finally, MetaBroadcaster also support BroadcastListener

MetaBroadcaster.getDefault().addBroadcasterListener(new BroadcasterListener() {
   public void onComplete(Broadcater b) {
       // do something
   }
}.broadcastTo("/*", hello");

BroadcasterConfig

A unique instance of BroadcasterConfig is always associated with a Broadcaster. You can use a BroadcasterConfig to set ExecutorServices, add BroadcastFilter and set a BroadcasterCache.

Configuring the default Broadcaster

By default, Atmosphere is using the DefaultBroadcaster and JerseyBroadcaster if atmosphere-jersey is used. You can either extends those Broadcaster or write your own. You can configure it by doing: In web.xml

        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterClass</param-name>
            <param-value>org....</param-value>
        </init-param>

or in atmosphere.xml

        <applicationConfig>
            <param-name>org.atmosphere.cpr.broadcasterClass</param-name>
            <param-value>org...</param-value>
        </applicationConfig>

You can also annotate your Broadcaster implementation by using the BroadcasterService annotation:

   @BroadcasterService
   public class MyBroadcaster implements Broadcaster {...}

Atmosphere auto discovery of Broadcaster

Atmosphere is able to auto discover the following Broadcaster when available on the classpath

That means you don't have to specify them by default.

Asynchronous I/O and Broadcast.

By default, a Broadcaster always creates three ExecutorServices: one for supporting asynchronous broadcast, one for supporting asynchronous write and one for scheduling tasks. If you don't need asynchronous I/O, it is recommended you use the SimpleBroadcaster or SimpleJerseyBroadcaster. Both Broadcaster aren't using any ExecutorServices.

Preventing Out Of Memory

Using shareable ExecutorServices

If your application creates a lot of Broadcaster, you may experiment some Out Of Memory error because too many instance of ExecutorServices has been created, e.g number of broadcaster * 3. If that's the case, you can configure Atmosphere to share ExecutorServices amongst Broadcaster. In that case only two ExecutorServices will be created:

Programatically
 Broadcaster b = BroadcasterFactory.getDefault().get();
 b.getBroacasterConfig().setExecutorServices(...).setAsyncWriteService(...);
Using web/atmosphere.xml

In web.xml

        <init-param>
            <param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
            <param-value>true</param-value>
        </init-param>

or in atmosphere.xml

        <applicationConfig>
            <param-name>org.atmosphere.cpr.broadcaster.shareableThreadPool</param-name>
            <param-value>true</param-value>
        </applicationConfig>

Using BroasdcasterFilter

Instead of creating several Broadcaster, you can always reduce the number by instead adding BroadcasterFilter and PerRequestBroadcasterFilter to a Broadcaster. You can then filter which broadcasted messages get delivered to which AtmosphereResource (client).

 Broadcaster b = BroadcasterFactory.getDefault().get();
 b.getBroacasterConfig().addFilter(...);

Using BroadcasterLifeCyclePolicy

Another way to prevent or reduce memory usage is by configuring a BroadcasterFactoryLifecyclePolicy. Supported policy are:

  • IDLE: Release all resources associated with the Broadcaster when the idle time expires. Suspended AtmosphereResource will NOT get resumed and instead be closed right away.
  • IDLE_DESTROY: Release all resources associated with the Broadcaster when the idle time expires and destroy the Broadcaster. This operation remove the Broadcaster from it's associated BroadcasterFactory. Suspended AtmosphereResource will NOT get resumed and instead be closed right away.
  • IDLE_RESUME:Release all resources associated with the Broadcaster when the idle time expires. All associated AtmosphereResource WILL BE resumed and this broadcaster destroyed.
  • EMPTY: If there is no AtmosphereResource associated with the Broadcaster release all resources.
  • EMPTY_DESTROY: If there is no AtmosphereResource associated with the Broadcaster, release all resources and destroy the broadcaster. This operation remove the Broadcaster from it's associated BroadcasterFactory
  • NEVER: Never release or destroy the Broadcaster from it's associated BroadcasterFactory

The default is NEVER, which means that a fair amount of Broadcaster may "polute' the BroadcasterFactory if not handled properly. BroadcasterFactoryLifecyclePolicy can be configured

Programmatically

You can configure the policy on a Broadcaster directly:

 Broadcaster b = BroadcasterFactory.getDefault().get();
 b.setBroadcasterLifeCyclePolicy(BroadcasterLifeCyclePolicy.IDLE);

You can also associate BroadcasterLifeCyclePolicyListener to a Broadcaster so you get notified when a policy is exectuted.

 b.addBroadcasterLifeCyclePolicyListener(new BroadcasterLifeCyclePolicyListener() {...});
Using web/atmosphere.xml

In web.xml

        <init-param>
            <param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
            <param-value>IDLE</param-value>
        </init-param>

or in atmosphere.xml

        <applicationConfig>
            <param-name>org.atmosphere.cpr.broadcasterLifeCyclePolicy</param-name>
            <param-value>IDLE</param-value>
        </applicationConfig>

Step by Step Tutorials

Concepts & Architecture

15 Minutes Tutorial

Advanced Topics

API

Known WebServer Issues

References

External Documentations

githalytics.com alpha

Clone this wiki locally