RxJava concurrency demo application

Filed under: development, java, rxJava — Tags: , , , — digitaljoel @ 2:25 pm


Back when I was in a reading group and had 2 weeks to learn Erlang I wrote a little air traffic control application to highlight the concurrency capabilities of Erlang.  Here is my blog post regarding that.

A few months ago I volunteered to do a presentation on rxJava at my employer’s internal technical conference.  What I didn’t tell them is that I didn’t know anything about it other than it was a current buzz word.  I thought it would be interesting and that giving a presentation on it would be a great reason to learn it.  Fortunately for me, a brilliant co-worker also proposed to present on it so we were paired up together.  I’m not going to lie, he did nearly all the powerpoint work, including the flow of the presentation.  I got to talk about operators (combining, filtering, subscribing) and testing/debugging/error handling.  All in all I thought it went quite well.

Since I had no experience, I decided I needed some application to get up to speed on rxJava.  I wanted to be able to answer questions and have more experience than just having read the documentation before the people attending the session.

So, I decided to write the air traffic control application in java.  Sadly, I don’t know that I could even really read the Erlang anymore, but the ideas are pretty easy to understand.

Enough jabbering about that, let’s got to the code.  I will do some explaining as we go, but you should have some familiarity with reactive concepts.  If you need a primer, head to and spend some time reading.

The Setup

Here’s the premise.  There are airplanes that need to land.  There is a flight tower that directs them where to go.  If two planes enter the same place then they collide.  If an airplane enters the space occupied by the tower then it has landed.  Airspace is represented by a square grid and the tower is in the middle of the grid.  The air traffic controllers are not very smart (but smarter than the Erlang version!)

Here’s how I broke things out.

  • The Radio – Responsible for transmitting messages from the tower to the plane.
  • The Radar – Responsible for broadcasting the position of the planes.
  • The Planes
    • Each plane receives messages from the radar so they can determine if another plane has entered their space (in which case they collide
    • Each plane also receives messages from the radio.  These tell the plane where to move to next.
    • Each plane sends a blip on the radar to broadcast its current location.
  • The Tower
    • Receives blips on the radar with the location of each plane.
    • Sends messages on the radio that tell the plane where to go next.
  • Radar Screen
    • Receives blips from the radar
    • Displays a graph showing where each plane is on the grid.

The Code

The code can be found in it’s fullness at The following does actually contain most of it, but if you want to run things, you’ll want to fetch it from github.


PublishSubject<RadioMessage> radio = PublishSubject.create();

Ok, this one is simple.  The radio is a PublishSubject for RadioMessages.  A RadioMessage simply contains the target flight number and the location that the target flight should fly to next.  Because it’s a PublishSubject it can subscribe to Observables that emit RadioMessages, and will also pass those through to any subscribers that are observing the radio.


PublishSubject<Blip> radar = PublishSubject.create();

Another nice, simple one.  The radar is a PublishSubject for Blips.  A Blip contains the id of the blip source (in this case a flight number), and the location of the blip source.  Finally, it contains a blip type, like MOVE, LAND, or CRASH.  It being a PublishSubject here has the same benefits as the Radio.


Now we start getting into some of the guts and putting rxJava to use.  First, here is the code that creates the Tower.

 // create the tower, which is where the planes try to get to land.
 Tower tower = new Tower(TOWER_LOCATION, radar);
 // and allow the tower to emit on the radio
 Observable.create( tower ).subscribe(radio);

So first we create the tower with a nice, simple constructor.  I went back and forth a few time on how to handle the tower and the radar.  The subscription isn’t a simple one like the one above with the radio.  I decided to encapsulate it within the Tower class.

Here is the bulk of the interesting code within the Tower class:

public class Tower implements Observable.OnSubscribe<RadioMessage>{

  private Pair location;

  Subscriber<? super RadioMessage> radio;
  Observable<Blip.  radar;

  public Tower( Pair location, Observable<Blip> radar ) {
    this.location = location;
    this.radar = radar;


   * Implementation of the OnSubscribe interface
  public void call(Subscriber<? super RadioMessage> t) {
    radio = t;

  private void connectRadar() {
    // on a MOVE blip from a plane we will send them information on where to go next.
    radar.filter(b -> b.type == MOVE &amp;amp;&amp;amp; !b.location.equals(location))
        .subscribe(b -> {
          if ( !radio.isUnsubscribed()) {
            radio.onNext(new RadioMessage(, getNewCoordinates(b.location)));

The interesting part here is in the connectRadar method.  The rxJava APIs are very fluent, but here’s a simple english explanation.

  1. First we only want to see the blips on the radar that are of the MOVE type and that are not within the tower’s location.
  2. We don’t want to handle all these blips on the main thread, so we observe on the computation scheduler.
  3. In onNext we use the current coordinates of the blip and then emit on the radio a message that tells the plane where to go next.

That’s really it for the tower.


Here is the code that creates the planes:

    long totalSleep = 0;
    // create all the planes.
    for ( int i = 0; i < PLANES; i++ ) { Plane plane = new Plane(i, getNewSpeed(), getStartingPair(GRID_SIZE), TOWER_LOCATION, radar); // subscribe the plane to the radio // if we subscribe on a different thread, then we may not get our first message radio.filter(msg -> msg.flightNumber == plane.flightNumber )
          .subscribe( plane );
      // tell the radar to listen to blips from the plane.
      // rather than start them all at once, they will enter the grid when this Observable calls onNext.
      Observable.timer(totalSleep, TimeUnit.MILLISECONDS)
          .subscribe( n -> plane.takeoff());
      totalSleep += getNextSleep();

I ran into one gotcha on this one.  As you can see by the comment, if I subscribe on a different scheduler then I would occasionally see the case where the plane would send a blip, the tower would receive the blip and send a radio message, all before the plane subscribed to the radio.  Ah the joys of concurrency.  While rxJava makes this a lot easier, there are still all the same concerns when it comes to threaded code.

Another tidbit you’ll see here.  I wanted to create all the planes at once and then have them come onto the grid using a staggered schedule.  To accomplish this I used an Observable.timer that when it fires would tell the plane to take off.  I accumulate the timeout so that each plane takes off some random time after the previous one.

Here are the details of the Plane class:

public class Plane implements Observer<RadioMessage>, Observable.OnSubscribe<Blip> {

  public final int flightNumber;
  private Pair location;
  private Pair towerLocation;
  private int speed;
  private AtomicBoolean flying = new AtomicBoolean(false);

  Observable<Blip> radar;
  Subscription radarSubscription;
  Subscriber<? super Blip> blipSubscriber;

  public Plane( int flightNumber, int speed, Pair startingLocation,
        Pair towerLocation, Observable<Blip> radar ) {
    this.flightNumber = flightNumber;
    this.speed = speed;
    this.radar = radar;
    this.location = startingLocation;
    // when we get to the tower location we have landed.
    this.towerLocation = towerLocation;

   * Implementation of onNext for the Observer interface.  The way we subscribe
   * means that we will only get RadioMessages that are directed at our flightNumber
  public void onNext(RadioMessage m) {
    Observable.timer( speed, TimeUnit.MILLISECONDS)
    .subscribe( n -> {
        // when the timer goes off, it calls this onNext message
        if (flying.get()) {
          // if we haven't crashed while traveling to our new location then set our current to the new.
          this.location = m.location;
          if ( this.location.equals(towerLocation) ) {
          else {
            // if we haven't landed, then send a blip to tell the tower our new location.

   * Implementation of the OnSubscribe interface
  public void call( Subscriber<? super Blip> t ) {
    blipSubscriber = t;

  private void sendBlip( Blip blip ) {
    if ( !blipSubscriber.isUnsubscribed()) {

  public void takeoff() {
    sendBlip( new Blip( flightNumber, location, MOVE ));

  private void land() {
    sendBlip(new Blip(flightNumber, location, LAND));

  private void move() {
    sendBlip(new Blip(flightNumber, location, MOVE));

   * Subscribe to the radar
  private void connectRadar() {
    // get blips that are in our airspace that is not us.
    // if we get a blip it must be another airplane that will cause us to crash.
    radarSubscription = radar.filter(b -> != this.flightNumber)
      .filter(b -> b.location.equals(this.location))
      .filter(b -> b.type == MOVE || b.type == CRASH )
      .subscribe(blip -> {
          // on any blip on the radar in our space, it will cause us to crash if we are still flying.
          if ( flying.get()) {

  * We have received a blip on the radar.  That means someone else has entered our space.
  * Because of that we must crash.
  private Blip getRadarResponse( Blip blip ) {
    // whether landing or crashing we are done with this flight.
    // we will unsubscribe from the radar because once crashed we don't need any more blips
    // send a crash blip on the radar.  This will notify the plane that hit us that 
    // we were already in this space and cause them to crash also.
    return new Blip( flightNumber, location, Blip.BlipType.CRASH);

The plane class has comments that hopefully explain the bulk of the code, but here are a few notes on it anyway.

First, for brevity I left off the onCompleted and onError methods that would normally be required of an implementation of the Observer interface.

Second, I would have liked this class to implement Observer AND Observer but that’s illegal in java.  This is why I had to pass the radar in to the constructor, so the plane could subscribe to it.  I could have at that point also just had the radar subject subscribe back to the plane, but that made it more tightly coupled.  This way, having the Blip Observable and the Blip Observer separate, the plane doesn’t need to know that it’s implemented as a subject.

One way I may have been able to get around not being able to have the Plane implement Observer twice would be to go to composition.  The Plane could have had an accessor method that would return the Observer and another that would return the Observer.  Then it would only be implementing the OnSubscribe interface.

I should probably also point out that the Tower and the Planes each expect to only have one subscriber.  It should probably be a thread-safe collection of subscribers so that more than one Observer can subscribe.  The PublishSubject helps obviate this in my case, but I’m not sure my way is the best way here.

Ancillary Code

Since my application creates all the planes at once on different schedulers, If I don’t have a mechanism to make the main thread wait for all the planes to complete then the application would start and finish nearly instantly.  To get around this I use a CountDownLatch initialized with the number of planes that we create.  Then we simply subscribe to the radar and look for crash and land events and decrement the latch on those.  Finally, we just wait for the latch to complete and then the program can terminate.  Here’s the code:

// Create a latch so we don't end the program prematurely.
CountDownLatch latch = new CountDownLatch(PLANES);

// subscribe to the radar so we countdown whenever a plane lands or crashes.
radar.filter(b -> != -1 &amp;&amp; (b.type == LAND || b.type == CRASH))
.subscribe(b -> latch.countDown());

// plane creation code here

// complete when all planes have landed or crashed.

Next, we have the radar screen.  This is purely for output.  The Erlang version didn’t have this nicety, but it sure was great for debugging… and entertainment.

public class RadarScreen implements Observer<Blip> {

  private BiMap<Integer, Pair> flightMap = Maps.synchronizedBiMap(HashBiMap.create());
  private Map<Pair, Integer> crashes = Maps.newConcurrentMap();

  private int gridSize;
  private Pair location;

  public RadarScreen( int gridSize, Pair location ) {
    this.gridSize = gridSize;
    this.location = location; // location is the location of the tower.

  public void onNext(Blip t) {
    if ( t.type == LAND ) {
      // if they are landing just remove by id.
    else {
      // remove by location
      if ( t.type == MOVE ) {
        // if they are moving then put them back on the map
        flightMap.put(, t.location);
      else if ( t.type == CRASH ) {
        // if they are crashing do not put them back on the map, but add it to the crashes collection.
        crashes.put(t.location, 10);

It is a very simple implementation that simply rewrites the entire graph to System.out when a radar event is received.  On my mac I can watch it in the STS console and it looks fairly animated.  Fortunately, since you could subscribe to the radar with anything, you could just as easily put together a swing UI to show the planes.

I left out the implementation that prints the graph (since it’s just creating a StringBuffer and then spitting it to System.out) and I also left out the onCompleted and onError methods.  I also left out some extra code that is used to print asterisks in crash locations.

Lessons Learned

This was quite a fun experiment, and was a great way to learn about reactive programming in general, and rxJava specifically.  My part of the presentation had to do with the operators, debugging, testing, and error handling.  I didn’t build much error handling into this example, but I did get to play with the creation, filtering, and mapping operators, and spent plenty of time testing and debugging.

My first implementation was kind of similar to this implementation, but had a lot more garbage that I just didn’t need.  I had extra layers of Observables and Observers and it was just a mess.  My second implementation was almost purely functional.  It was all contained in one class with a lot of chained calls to map, flatMap, filter, etc.  I kind of liked it, but these both had a fatal flaw.  Both implementations depended on some global state.  This is something I didn’t have in the Erlang implementation because the state was always passed around from function to function.

My final implementation is the one you see here.  It’s not perfect, but I managed to get the global state hidden in the RadarScreen class.  It feels more properly reactive than the first sample, but still has some of the strengths of object oriented programming using the encapsulation of the plane, tower, and radar screen objects.  I’ve learned plenty of times that whatever you first implement with a new technology is just not going to be right.  I’m sure there are holes in this project, things that could have been done differently, and perhaps things that a reactive pro will simply look at and say, “huh?!” but in any case, it gave me a good start.  Here are a few things I learned:

  1. Reactive code is still concurrent code.  Treat it as such
  2. System.out and log.debug are your friends.
  3. Being able to subscribe to an Observable simply for debug purposes is really awesome.  I did this a couple times with the subjects I created just so I could validate the messages going through.  For instance when the message would come through the radio but would not be received by the plane because the plane hadn’t subscribed yet.
  4. If you can’t subscribe with a purely debug Observer then doOnNext is also really awesome.  See #3.
  5. Reactive programming is mind bending for someone with 15 years of experience in core java.  You have to start thinking a little bit more like Javascript and less like java
  6. Java 8 lambdas are really awesome.
  7. There are a lot of ways to do the same thing with rxJava.
  8. I want to do more reactive programming.


How to make Jackson serialize null strings differently than null Objects

Filed under: development — Tags: , , , , — digitaljoel @ 3:36 pm

We’ve got an API.  It uses a certain technology to write JSON data, and this certain technology writes null strings as empty string (“”), but writes null objects as ‘null’.  It’s in production, and therefore changes to it need to be backward compatible.  Jackson is awesome. We want to replace “certain technology” with Jackson.

My first thought was that I would write a custom StringSerializer and associate it with the String type and then whenever a String is null I would output empty string instead of null and poof, done.  Sadly, in Jackson, if the value is null it will always call the NullValueSerializer instead of the Serializer that is configured for the type of field that contains the null value.

So, how to overcome this conundrum?  It can still be approached with a custom serializer implementation, but you also need to touch the SerializerProvider.  Here’s a quick solution we threw together that appears to be working.

// We need to customize the DefaultSerializerProvider so that when it is looking for a NullSerializer it
// will use one that is class sensitive, writing strings as "" and everything else using the default value.
public static class CustomNullStringSerializerProvider extends DefaultSerializerProvider {

  // A couple of constructors and factory methods to keep the compiler happy
  public CustomNullStringSerializerProvider() { super(); }
  public CustomNullStringSerializerProvider(CustomNullStringSerializerProvider provider, SerializationConfig config,
    SerializerFactory jsf) {
    super(provider, config, jsf);
  public CustomNullStringSerializerProvider createInstance(SerializationConfig config,
    SerializerFactory jsf) {
    return new CustomNullStringSerializerProvider(this, config, jsf);

  // This is the interesting part.  When the property has a null value it will call this method to get the
  // serializer for that null value.  At this point, we have the BeanProperty, which contains information about
  // the field that we are trying to serialize (including the type!)  So we can discriminate on the type to determine
  // which serializer is used to output the null value.
  public JsonSerializer<Object> findNullValueSerializer(BeanProperty property) throws JsonMappingException {
    if (property.getType().getRawClass().equals(String.class)) {
      return EmptyStringSerializer.INSTANCE;
    } else {
      return super.findNullValueSerializer(property);

// This is our fancy serializer that takes care of writing the value desired in the case of a null string.  We could
// write whatever we want in here, but in order to maintain backward compatibility we choose the empty string
// instead of something like "joel is awesome."
public static class EmptyStringSerializer extends JsonSerializer<Object> {
  public static final JsonSerializer<Object> INSTANCE = new EmptyStringSerializer();

  private EmptyStringSerializer() {}

  // Since we know we only get to this seralizer in the case where the value is null and the type is String, we can
  // do our handling without any additional logic and write that empty string we are so desperately wanting.
  public void serialize(Object o, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
    throws IOException, JsonProcessingException {


Now, when we are configuring the ObjectMapper we can simply call setSerializerProvider and give it our custom provider and voila.


Pair Programming Without The Bad Breath

Filed under: general — Tags: , , — digitaljoel @ 11:05 pm

Let’s say you are working from home, but you need to do a code review or want to do pair programming or something. I was in this situation last week with a good friend. He lives about 45 minutes from me and neither of us were really in the mood to do any driving. We are both working on macbooks. I was planning on driving the programming session, so he put together the following instructions for setting up SSH access to my computer

Steps for setup:

  1. Open TCP port of your choice on the router and forward that to port 22 of the IP for your Mac (ifconfig @ command prompt, or use the network control panel to get your IP).
  2. Create a user/pwd that I can use to connect with in the Accounts Preferences Panel*
  3. Turn on Remote Login in the Sharing Preferences Panel (System Preferences).
  4. Turn on Screen Sharing.
  5. Add that user to the list of who can connect in the Allow access list box.
  6. Go to the Firewall preference panel and click on Advanced… Ensure that ssh is allowed through.

Finally, I gave him my IP address and his account information and he logged in through SSH, started screen sharing and was then viewing EVERYTHING I was viewing on the screen.  We also started up iChat and started a voice chat through our AIM connection.  The sound was actually very good and the screen sharing performance was also very good.  My friend said he could see the change on the screen as soon as he heard the key press through the microphone.

In order to connect, he issued the following command line.

ssh -p <yourport> -f -L 1200:localhost:5900 <iphere> sleep 10 ; open vnc://localhost:1200

where <iphere> is replaced by the ip address I sent to him, and <yourport> is the port specified in step one.  Note that the username I setup on my machine was the same as the username he uses on his machine so he didn’t have to specify a username at login time.

We worked for about 3 hours this way, talking and programming without any issues.  Will it 100% replace in-person pair programming or code reviews, certainly not, but I plan to use it a lot with my buddy.

*You probably shouldn’t be sending the username/password combination to your friend through email or something like that.  I called mine to give him that sensitive information.


So Long Kenai

Filed under: development, java, tools — Tags: , , , — digitaljoel @ 11:25 pm

A couple of my recent posts have dealt with JSF 2 and Google’s app engine for Java. I was experimenting with this stuff in a small hobby project that was role-playing game related. It’s been benched for another role-playing game related project that I’m going to be writing in C# in XNA, but before I get to that, I want to say a fond farewell to Kenai.

This morning I had an email from the Kenai team in my inbox. Now that Sun is a part of Oracle, it has been decided by the powers that be on the Oracle side that hosting free, open source projects doesn’t help the bottom line. I think this is really unfortunate. Kenai had a really great integration story with Netbeans.

Kenai included free use of Jira, a wiki, Subversion hosting, downloads, and all of it integrated seamlessly with Netbeans. Setting up my project on Kenai was actually a joy. I simply clicked the menu in Netbeans 6.8 to “share this project on Kenai” and everything just worked. When I then went to my desktop from my laptop, I was able to “get” the project from Kenai and it checked it out, allowed me to login to Kenai and look at my jira tasks and modify them within Netbeans.

I haven’t used any other project hosting solution, but I’ve viewed plenty of projects in google code and sourceforge and have yet to see such a complete story. If I had the money to spare for a pet/hobby project I would certainly go to jira studio, but I suspect I’m going to have to find something a little more free to move my project too. I am happy that I didn’t have too much invested in Kenai, but it’s pretty sad that it’s been killed.

So long Kenai, it was good to know you while it lasted.

I just got this email from the Kenai folks. Fortunately, it looks like good news, and I think it will be a great benefit to all the projects on


In an effort to get information out to the Kenai community quickly, while trying to manage the integration of our two companies, I think we did a poor job at communicating our plans for to you. I would like to remedy that now. Our strategy is simple. We don’t believe it makes sense to continue investing in multiple hosted development sites that are basically doing the same thing. Our plan is to shut down and focus our efforts on as the hosted development community. We are in the process of migrating to the kenai technology. This means that any project currently hosted on will be able to continue as you are on We are still working out the technical details, but the goal is to make this migration as seamless as possible for the current projects. So in the meantime I suggest that you stay put on and let us work through the details and get back to you later this month.

Thanks for your feedback and patience.

Ted Farrell
Oracle Corporation

Blog at