Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DSL support for declaring named streams for Bolts #129

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

cattywampus
Copy link

This is an initial attempt at adding support for declaring named streams and emitting messages on streams for bolts. The intent of this feature is to make it easier to build storm topologies where bolts can control where they emit messages to by directing flow using streams and enabling other bolts to subscribe to a source with those declared streams. These modifications should not introduce any breaking changes to the RedStorm API and enable existing users to upgrade without having to rewrite their bolts or topologies.

I am submitting this as in initial draft implementation and hoping to receive comments or open a discussion about what the API should look like to enable this type of support to the user.

Declaring a new stream occurs with the call to output_fields within the bolt implementation. Instead of an array of field names, streams are declared using a hash where the key is the name of the stream and the value is a single field or array of fields.

class Bolt2 < RedStorm::DSL::Bolt
  output_fields :custom_stream => :f1

  on_receive :stream => :custom_stream do |tuple|
    tuple
  end
end

In this example, a new stream is declared called custom_stream with a single output field. The on_receive method defines the default stream that it will emit to, since auto emit is enabled. For more granular control, a Bolt that defines multiple streams can emit values using the emit_tuple_stream method of the OutputCollector, which is called from the `(un)anchored_stream_emit' helper method of the bolt.

class Bolt1 < RedStorm::DSL::Bolt
  output_fields :f1, :another_stream => :f2

  on_receive :emit => false  do |tuple|
    if something
      unanchored_stream_emit 'custom_stream', tuple
    else
      unanchored_emit tuple
    end
  end
end

Once a bolt defines what streams it will emit messages on, other bolts can subscribe to those streams when they are configured in their topology. I added a third parameter to the source method which allows the bolt to provide the name of the stream it wants to receive messages from. This introduces a small API change because if the user original provided a Hash for the grouping, then that hash will need to be wrapped in curly braces to avoid a syntax error with the Ruby parser.

class Topology1 < RedStorm::DSL::Topology
  spout SomeSpout

  bolt Bolt1 do
    source SomeSpout, :fields => ["f1"]
  end

  bolt Bolt2 do 
    source Bolt1, { :fields => ["f1"] }, "custom_stream"
  end

  ...
end

What's not implemented (yet)

Currently only bolts can declare streams. I still need to add support for Spouts. Which means that bolts can only subscribe to other bolt streams.

Also, you cannot declare streams and fields for bolts within the bolt definition of a topology. The output_fields that get declared in the definition block of the topology are passed the the constructor of the JRubyShellBolt class which currently accepts an array of Strings for the fields parameter. I wasn't sure what the best path was to add stream support here. I thought an overloaded constructor which expects a HashMap instead of an array might work, but that would prevent users from declaring custom streams and fields for the default stream, like this:

output_fields :f1, :f2, :my_stream => [:f3, :f4]

I think a decision needs to be made about how this should impact the API. Restricting the fields parameter of that Java class to use a map would then result in:

output_fields :default => [:f1, :f2], :my_stream => [:f3, :f4]

...where the default stream needs to be explicitly declared if the user wants to use it. Alternatively, the BoltDefinition code in Ruby could be smart enough to parse out this data structure and invoke the appropriate constructor and pass in the default fields if provided through a setter on the Java class.

Anyways, I am looking for lots of feedback and an open discussion about what direction makes the most sense for adding in this type of feature. Any and all comments are welcome and I'm happy to make any changes necessary to help get this accepted and into the RedStorm baseline.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.15%) to 97.91% when pulling dbf7888 on cattywampus:streams into cbb62ef on colinsurprenant:master.

This commit makes it possible to declare streams and fields for bolts
and streams inside of the Topology definition.
@coveralls
Copy link

Coverage Status

Coverage decreased (-76.98%) to 20.78% when pulling 7b7f3f7 on cattywampus:streams into cbb62ef on colinsurprenant:master.

Fixes an issue where outfields that are defined in the Spout or Bolt are
not recognized by the Nimbus server when it validates the existence of
streams for all bolts in the topology.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.04%) to 97.8% when pulling db8547e on cattywampus:streams into cbb62ef on colinsurprenant:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.09%) to 97.85% when pulling b82a4a9 on cattywampus:streams into cbb62ef on colinsurprenant:master.

Not all bolts defined in a topology inherit from the RedStorm::DSL. This
ensures the output fields in the ComponentDefinition are only copied if
the class uses the OutputFields module. Also, redefining field names for
the default stream will replace any previously defined default field
names.
@coveralls
Copy link

Coverage Status

Coverage increased (+0.09%) to 97.85% when pulling f593b21 on cattywampus:streams into cbb62ef on colinsurprenant:master.

@rtyler
Copy link

rtyler commented Apr 17, 2015

@cattywampus I've been maintaining a fork of redstorm which includes some patches we've developed at @lookout and build tooling changes for Gradle at jruby-gradle/redstorm. Would you be interested in merging this code into that repository?

@cattywampus
Copy link
Author

@rtyler Definitely. These changes fundamentally expand the way topologies can exchange data between bolts so I would love to fold this into a baseline that gets more eyes on it. Especially since I'm very new to Storm and hope I am not enabling any storm cluster faux-pas.

I'll check out your fork and rebase my changes on top of it to make sure everything still works.

Thanks!

@rtyler
Copy link

rtyler commented Aug 28, 2015

@cattywampus FWIW, this change has been merged in and released as part of the jruby-gradle/redstorm 0.8.0 release:. This version of the library is available in jcenter and is what is consumed by the jruby-gradle-storm-plugin by default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants