From 54e43e39dc0b110b09fe8e0a6af1ff9f67afbcff Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 3 Mar 2015 22:47:37 +0000 Subject: [PATCH 1/7] Select stream when setting source for bolt --- lib/red_storm/dsl/bolt.rb | 36 ++++- lib/red_storm/dsl/output_collector.rb | 9 ++ lib/red_storm/dsl/topology.rb | 24 +-- spec/red_storm/dsl/bolt_spec.rb | 70 ++++++++- spec/red_storm/dsl/output_collector_spec.rb | 6 + spec/red_storm/dsl/topology_spec.rb | 159 +++++++++++++++++--- 6 files changed, 275 insertions(+), 29 deletions(-) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index c3895ce..8db2f2c 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -58,14 +58,26 @@ def log self.class.log end + def stream + self.class.stream + end + def unanchored_emit(*values) @collector.emit_tuple(Values.new(*values)) end + def unanchored_stream_emit(stream, *values) + @collector.emit_tuple_stream(stream, Values.new(*values)) + end + def anchored_emit(tuple, *values) @collector.emit_anchor_tuple(tuple, Values.new(*values)) end + def anchored_stream_emit(stream, tuple, *values) + @collector.emit_anchor_tuple_stream(stream, tuple, Values.new(*values)) + end + def ack(tuple) @collector.ack(tuple) end @@ -80,7 +92,21 @@ def execute(tuple) output = on_receive(tuple) if output && self.class.emit? values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output - values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)} + values_list.each do |values| + if self.class.anchor? + if self.class.stream? + anchored_stream_emit(self.stream, tuple, *values) + else + anchored_emit(tuple, *values) + end + else + if self.class.stream? + unanchored_stream_emit(self.stream, *values) + else + unanchored_emit(*values) + end + end + end @collector.ack(tuple) if self.class.ack? end end @@ -137,6 +163,14 @@ def self.anchor? !!self.receive_options[:anchor] end + def self.stream? + self.receive_options[:stream] && !self.receive_options[:stream].empty? + end + + def self.stream + self.receive_options[:stream] + end + # below non-dry see Spout class def self.inherited(subclass) path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}") diff --git a/lib/red_storm/dsl/output_collector.rb b/lib/red_storm/dsl/output_collector.rb index d7a56e2..1cca8d1 100644 --- a/lib/red_storm/dsl/output_collector.rb +++ b/lib/red_storm/dsl/output_collector.rb @@ -6,4 +6,13 @@ class OutputCollector java_alias :emit_tuple, :emit, [java.lang.Class.for_name("java.util.List")] java_alias :emit_anchor_tuple, :emit, [Tuple.java_class, java.lang.Class.for_name("java.util.List")] + java_alias :emit_tuple_stream, :emit, [ + java.lang.String, + java.lang.Class.for_name("java.util.List") + ] + java_alias :emit_anchor_tuple_stream, :emit, [ + java.lang.String, + Tuple.java_class, + java.lang.Class.for_name("java.util.List") + ] end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index 121bca2..b09429e 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -60,29 +60,33 @@ def initialize(*args) @sources = [] end - def source(source_id, grouping) - @sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}] + def source(source_id, grouping, stream = 'default') + @sources << [ + source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, + grouping.is_a?(Hash) ? grouping : {grouping => nil}, + stream.to_s + ] end def define_grouping(declarer) - @sources.each do |source_id, grouping| + @sources.each do |source_id, grouping, stream| grouper, params = grouping.first # declarer.fieldsGrouping(source_id, Fields.new()) case grouper when :fields - declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s)))) + declarer.fieldsGrouping(source_id, stream, Fields.new(*([params].flatten.map(&:to_s)))) when :global - declarer.globalGrouping(source_id) + declarer.globalGrouping(source_id, stream) when :shuffle - declarer.shuffleGrouping(source_id) + declarer.shuffleGrouping(source_id, stream) when :local_or_shuffle - declarer.localOrShuffleGrouping(source_id) + declarer.localOrShuffleGrouping(source_id, stream) when :none - declarer.noneGrouping(source_id) + declarer.noneGrouping(source_id, stream) when :all - declarer.allGrouping(source_id) + declarer.allGrouping(source_id, stream) when :direct - declarer.directGrouping(source_id) + declarer.directGrouping(source_id, stream) else raise("unknown grouper=#{grouper.inspect}") end diff --git a/spec/red_storm/dsl/bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb index 52b05a1..83cdcd4 100644 --- a/spec/red_storm/dsl/bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -603,6 +603,74 @@ def on_receive(tuple) bolt.prepare(nil, nil, collector) bolt.execute("output") end + + it "should emit tuple on a stream" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :stream => :custom_stream do |tuple| + tuple + end + end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :stream => :custom_stream + def my_method(tuple); tuple; end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :stream => :custom_stream + def on_receive(tuple); tuple; end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit_tuple_stream).with(:custom_stream, "values").exactly(3).times + + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt3.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + end + + it "should emit anchored tuple on a stream" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :anchor => true, :stream => :custom_stream do |tuple| + "output" + end + end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :anchor => true, :stream => :custom_stream + def my_method(tuple) + "output" + end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :anchor => true, :stream => :custom_stream + def on_receive(tuple) + "output" + end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit_anchor_tuple_stream).with(:custom_stream, "tuple", "values").exactly(3).times + + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt3.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + end end describe "prepare" do @@ -701,4 +769,4 @@ class Bolt1 < RedStorm::SimpleBolt; end end end -end \ No newline at end of file +end diff --git a/spec/red_storm/dsl/output_collector_spec.rb b/spec/red_storm/dsl/output_collector_spec.rb index d604337..82ddeb7 100644 --- a/spec/red_storm/dsl/output_collector_spec.rb +++ b/spec/red_storm/dsl/output_collector_spec.rb @@ -10,5 +10,11 @@ # We should have an alias for #emit_anchor_tuple it { should respond_to :emit_anchor_tuple } + + # We should have an alias for #emit_tuple_stream + it { should respond_to :emit_tuple_stream } + + # We should have an alias for #emit_anchor_tuple_stream + it { should respond_to :emit_anchor_tuple_stream } end end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 9500679..64866ea 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -340,6 +340,7 @@ class Topology1 < RedStorm::SimpleTopology configurator = mock(RedStorm::Configurator) jruby_bolt1 = mock(RedStorm::JRubyBolt) jruby_bolt2 = mock(RedStorm::JRubyBolt) + jruby_bolt3 = mock(RedStorm::JRubyBolt) declarer = mock("Declarer") RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) @@ -394,7 +395,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support single string fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => "f1" }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -407,7 +421,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("s1").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support single symbolic fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => :s1 }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("s1").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -420,7 +447,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support string array fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => ["f1", "f2"] }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -433,7 +473,20 @@ class Topology1 < RedStorm::SimpleTopology end RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support symbolic array fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => [:s1, :s2] }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -445,7 +498,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("shuffleGrouping").with('1') + @declarer.should_receive("shuffleGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support shuffle with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :shuffle, 'custom_stream' + end + end + + @declarer.should_receive("shuffleGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -457,7 +522,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("localOrShuffleGrouping").with('1') + @declarer.should_receive("localOrShuffleGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support local_or_shuffle with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :local_or_shuffle, 'custom_stream' + end + end + + @declarer.should_receive("localOrShuffleGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -469,7 +546,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("noneGrouping").with('1') + @declarer.should_receive("noneGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support none" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :none, 'custom_stream' + end + end + + @declarer.should_receive("noneGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -481,7 +570,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("globalGrouping").with('1') + @declarer.should_receive("globalGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support global with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :global, 'custom_stream' + end + end + + @declarer.should_receive("globalGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -493,7 +594,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("allGrouping").with('1') + @declarer.should_receive("allGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support all with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :all, 'custom_stream' + end + end + + @declarer.should_receive("allGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -505,7 +618,19 @@ class Topology1 < RedStorm::SimpleTopology end end - @declarer.should_receive("directGrouping").with('1') + @declarer.should_receive("directGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support direct with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :direct, 'custom_stream' + end + end + + @declarer.should_receive("directGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end end @@ -563,7 +688,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == '1' Topology1.bolts.first.id.should == '2' - Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}, 'default'] end it "should support explicit string ids" do @@ -577,7 +702,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "id1" Topology1.bolts.first.id.should == "id2" - Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}, 'default'] end it "should support implicit string ids" do @@ -591,7 +716,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default'] end it "should support implicit symbol ids" do @@ -605,7 +730,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}, 'default'] end it "should support implicit class ids" do @@ -619,7 +744,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default'] end it "should raise on unresolvable" do @@ -633,7 +758,7 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}, 'default'] lambda {Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=dummy" end @@ -651,4 +776,4 @@ class Topology1 < RedStorm::SimpleTopology end end -end \ No newline at end of file +end From 32cceb724ea4b1514e0d2a642c2ad51b89489cc4 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 10 Mar 2015 16:09:21 +0000 Subject: [PATCH 2/7] Bolts support declaring output streams --- lib/red_storm/dsl/bolt.rb | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index 8db2f2c..b37537a 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -23,7 +23,14 @@ def self.log end def self.output_fields(*fields) - @fields = fields.map(&:to_s) + @fields ||= [] + fields.each do |field| + if field.kind_of? Hash + @fields << Hash[ field.map { |k, v| [k.to_s, v.to_s] } ] + else + @fields << field.to_s + end + end end def self.configure(&configure_block) @@ -124,7 +131,18 @@ def cleanup end def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) + default_fields = [] + self.class.fields.each do |field| + if field.kind_of? Hash + field.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) + end + else + default_fields << field + end + end + + declarer.declare(Fields.new(default_fields.flatten)) end def get_component_configuration From dbf7888bb79fa81c782170fcb37cc55d2bd786f0 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 10 Mar 2015 21:37:22 +0000 Subject: [PATCH 3/7] New test/specs for bolt stream support --- lib/red_storm/dsl/bolt.rb | 6 +- spec/red_storm/dsl/bolt_spec.rb | 101 ++++++++++++++++++++++++++++---- 2 files changed, 93 insertions(+), 14 deletions(-) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index b37537a..f0f851c 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -26,7 +26,9 @@ def self.output_fields(*fields) @fields ||= [] fields.each do |field| if field.kind_of? Hash - @fields << Hash[ field.map { |k, v| [k.to_s, v.to_s] } ] + @fields << Hash[ + field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } + ] else @fields << field.to_s end @@ -142,7 +144,7 @@ def declare_output_fields(declarer) end end - declarer.declare(Fields.new(default_fields.flatten)) + declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? end def get_component_configuration diff --git a/spec/red_storm/dsl/bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb index 83cdcd4..97872e6 100644 --- a/spec/red_storm/dsl/bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -62,6 +62,27 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:fields).should == ["f1", "f2"] end + it "should parse single hash argument" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :stream => :f1 + end + Bolt1.send(:fields).should == [{"stream" => "f1"}] + end + + it "should parse hash of string and symbols" do + class Bolt1 < RedStorm::SimpleBolt + output_fields "stream" => [:f1, :f2] + end + Bolt1.send(:fields).should == [{"stream" => ["f1", "f2"]}] + end + + it "should parse string and hash arguments" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :f1, :stream => :f2 + end + Bolt1.send(:fields).should == ["f1", {"stream" => "f2"}] + end + it "should not share state over mutiple classes" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1 @@ -115,6 +136,7 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -147,16 +169,27 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:anchor?).should be_true end + it "should parse :stream option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :stream => "test" do + end + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + it "should parse multiple option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true do + on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test" do end end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end @@ -166,13 +199,13 @@ class Bolt1 < RedStorm::SimpleBolt class Bolt1 < RedStorm::SimpleBolt def test_method; end on_receive :test_method - end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -186,8 +219,7 @@ class Bolt1 < RedStorm::SimpleBolt it "should parse :ack option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :ack => true do - end + on_receive :test_method, :ack => true end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) @@ -196,24 +228,32 @@ class Bolt1 < RedStorm::SimpleBolt it "should parse :anchor option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :anchor => true do - end + on_receive :test_method, :anchor => true end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) Bolt1.send(:anchor?).should be_true end + it "should parse :stream option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :test_method, :stream => "test" + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + it "should parse multiple option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true do - end + on_receive :test_method, :emit => false, :ack =>true, :anchor => true, :stream => "test" end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end @@ -227,6 +267,7 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -256,15 +297,25 @@ class Bolt1 < RedStorm::SimpleBolt Bolt1.send(:anchor?).should be_true end + it "should parse :stream option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :stream => "test" + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + it "should parse multiple option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true + on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test" end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end end @@ -757,6 +808,32 @@ class RedStorm::Fields; end RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") bolt.declare_output_fields(declarer) end + + it "should declare stream with fields" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :stream => [:f1, :f2] + end + bolt = Bolt1.new + class RedStorm::Fields; end + declarer = mock("Declarer") + declarer.should_receive(:declareStream).with("stream", "fields") + RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") + bolt.declare_output_fields(declarer) + end + + it "should declare default stream fields and custom stream fields" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :f1, :f2, :stream => [:f3, :f4] + end + bolt = Bolt1.new + class RedStorm::Fields; end + declarer = mock("Declarer") + declarer.should_receive(:declareStream).with("stream", "stream_fields") + declarer.should_receive(:declare).with("default_fields") + RedStorm::Fields.should_receive(:new).with(["f3", "f4"]).and_return("stream_fields") + RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("default_fields") + bolt.declare_output_fields(declarer) + end end describe "get_component_configuration" do From 7b7f3f7c993cb208f40e613bb32ce504626d944f Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 31 Mar 2015 19:56:04 +0000 Subject: [PATCH 4/7] Bolt/Spouts in topos need to support streams This commit makes it possible to declare streams and fields for bolts and streams inside of the Topology definition. --- lib/red_storm/dsl/bolt.rb | 47 +-------------- lib/red_storm/dsl/output_fields.rb | 58 +++++++++++++++++++ lib/red_storm/dsl/spout.rb | 15 +---- lib/red_storm/dsl/topology.rb | 29 ++++++++-- spec/red_storm/dsl/topology_spec.rb | 25 ++++---- src/main/redstorm/storm/jruby/JRubyBolt.java | 15 +++-- src/main/redstorm/storm/jruby/JRubySpout.java | 14 +++-- 7 files changed, 125 insertions(+), 78 deletions(-) create mode 100644 lib/red_storm/dsl/output_fields.rb diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index f0f851c..a0c6e9b 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' java_import 'backtype.storm.tuple.Fields' @@ -14,6 +15,8 @@ class BoltError < StandardError; end class Bolt attr_reader :collector, :context, :config + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end # DSL class methods @@ -22,19 +25,6 @@ def self.log @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields ||= [] - fields.each do |field| - if field.kind_of? Hash - @fields << Hash[ - field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } - ] - else - @fields << field.to_s - end - end - end - def self.configure(&configure_block) @configure_block = block_given? ? configure_block : lambda {} end @@ -67,10 +57,6 @@ def log self.class.log end - def stream - self.class.stream - end - def unanchored_emit(*values) @collector.emit_tuple(Values.new(*values)) end @@ -132,21 +118,6 @@ def cleanup on_close end - def declare_output_fields(declarer) - default_fields = [] - self.class.fields.each do |field| - if field.kind_of? Hash - field.each do |stream, fields| - declarer.declareStream(stream, Fields.new(fields)) - end - else - default_fields << field - end - end - - declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? - end - def get_component_configuration configurator = Configurator.new configurator.instance_exec(&self.class.configure_block) @@ -159,10 +130,6 @@ def get_component_configuration def on_init; end def on_close; end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end @@ -183,14 +150,6 @@ def self.anchor? !!self.receive_options[:anchor] end - def self.stream? - self.receive_options[:stream] && !self.receive_options[:stream].empty? - end - - def self.stream - self.receive_options[:stream] - end - # below non-dry see Spout class def self.inherited(subclass) path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}") diff --git a/lib/red_storm/dsl/output_fields.rb b/lib/red_storm/dsl/output_fields.rb new file mode 100644 index 0000000..a506711 --- /dev/null +++ b/lib/red_storm/dsl/output_fields.rb @@ -0,0 +1,58 @@ + +module RedStorm + module DSL + module OutputFields + + def self.included(base) + base.extend ClassMethods + end + + def declare_output_fields(declarer) + default_fields = [] + self.class.fields.each do |field| + if field.kind_of? Hash + field.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) + end + else + default_fields << field + end + end + + declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? + end + + def stream + self.class.stream + end + + module ClassMethods + def output_fields(*fields) + @fields ||= [] + fields.each do |field| + if field.kind_of? Hash + @fields << Hash[ + field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } + ] + else + @fields << field.to_s + end + end + end + + def fields + @fields ||= [] + end + + def stream? + self.receive_options[:stream] && !self.receive_options[:stream].empty? + end + + def stream + self.receive_options[:stream] + end + end + end + end +end + diff --git a/lib/red_storm/dsl/spout.rb b/lib/red_storm/dsl/spout.rb index 4846742..14ab5e8 100644 --- a/lib/red_storm/dsl/spout.rb +++ b/lib/red_storm/dsl/spout.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' module RedStorm @@ -11,6 +12,8 @@ class SpoutError < StandardError; end class Spout attr_reader :config, :context, :collector + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end # DSL class methods @@ -23,10 +26,6 @@ def self.log @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields = fields.map(&:to_s) - end - def self.on_send(*args, &on_send_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first @@ -126,10 +125,6 @@ def deactivate on_deactivate end - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) - end - def ack(msg_id) on_ack(msg_id) end @@ -154,10 +149,6 @@ def on_deactivate; end def on_ack(msg_id); end def on_fail(msg_id); end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index b09429e..ecb6dcb 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -26,16 +26,37 @@ def initialize(component_class, constructor_args, id, parallelism) @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = [] + @output_fields = Hash.new([]) end def output_fields(*args) - args.empty? ? @output_fields : @output_fields = args.map(&:to_s) + args.each do |field| + if field.kind_of? Hash + field.each { |k, v| merge_fields(k.to_s, v) } + else + merge_fields('default', field) + end + end + @output_fields end def is_java? @clazz.name.split('::').first.downcase == 'java' end + + private + + def java_safe_fields + java_hash = java.util.HashMap.new() + @output_fields.each do |k, v| + java_hash.put(k, v.to_java('java.lang.String')) + end + java_hash + end + + def merge_fields(stream, fields) + @output_fields[stream] |= fields.kind_of?(Array) ? fields.map(&:to_s) : [fields.to_s] + end end class SpoutDefinition < ComponentDefinition @@ -47,7 +68,7 @@ def new_instance elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end @@ -100,7 +121,7 @@ def new_instance elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 64866ea..c1b25ce 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -1,6 +1,8 @@ require 'spec_helper' require 'red_storm/dsl/topology' +require 'pry' + describe RedStorm::SimpleTopology do # mock Storm imported classes @@ -112,8 +114,11 @@ class Topology1 < RedStorm::SimpleTopology output_fields :f3 end end - Topology1.spouts.first.output_fields.should == ["f1", "f2"] - Topology1.spouts.last.output_fields.should == [ "f3"] + # Pry.config.input = STDIN + # Pry.config.output = STDOUT + # binding.pry + Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } end end @@ -195,8 +200,8 @@ class Topology1 < RedStorm::SimpleTopology output_fields :f3 end end - Topology1.bolts.first.output_fields.should == ["f1", "f2"] - Topology1.bolts.last.output_fields.should == [ "f3"] + Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.bolts.last.output_fields.should == { "default" => ["f3"] } end end @@ -307,8 +312,8 @@ class Topology1 < RedStorm::SimpleTopology RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout1) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", []).and_return(jruby_spout2) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout1) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", {}).and_return(jruby_spout2) builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer) builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer) @@ -345,8 +350,8 @@ class Topology1 < RedStorm::SimpleTopology RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt1) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt1) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", {}).and_return(jruby_bolt2) builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer) builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer) @@ -377,8 +382,8 @@ class Topology1 < RedStorm::SimpleTopology backtype_config = mock(Backtype::Config) Backtype::Config.should_receive(:new).any_number_of_times.and_return(backtype_config) backtype_config.should_receive(:put) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout) builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer) builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer) @declarer.should_receive("addConfigurations").twice diff --git a/src/main/redstorm/storm/jruby/JRubyBolt.java b/src/main/redstorm/storm/jruby/JRubyBolt.java index a4106e5..abbb49f 100644 --- a/src/main/redstorm/storm/jruby/JRubyBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBolt.java @@ -6,9 +6,11 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; +import org.jruby.RubyHash; import org.jruby.RubyObject; import org.jruby.runtime.Helpers; import org.jruby.runtime.builtin.IRubyObject; @@ -27,7 +29,7 @@ */ public class JRubyBolt implements IRichBolt { private final String _realBoltClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +43,7 @@ public class JRubyBolt implements IRichBolt { * @param realBoltClassName the fully qualified JRuby bolt implementation class name * @param fields the output fields names */ - public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) { + public JRubyBolt(String baseClassPath, String realBoltClassName, Map fields) { _realBoltClassName = realBoltClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -72,8 +74,13 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp bolt instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_bolt = initialize_ruby_bolt(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); diff --git a/src/main/redstorm/storm/jruby/JRubySpout.java b/src/main/redstorm/storm/jruby/JRubySpout.java index b30d292..a6d81f2 100644 --- a/src/main/redstorm/storm/jruby/JRubySpout.java +++ b/src/main/redstorm/storm/jruby/JRubySpout.java @@ -6,6 +6,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; @@ -27,7 +28,7 @@ */ public class JRubySpout implements IRichSpout { private final String _realSpoutClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +42,7 @@ public class JRubySpout implements IRichSpout { * @param realSpoutClassName the fully qualified JRuby spout implementation class name * @param fields the output fields names */ - public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) { + public JRubySpout(String baseClassPath, String realSpoutClassName, Map fields) { _realSpoutClassName = realSpoutClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -93,8 +94,13 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp spout instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_spout = initialize_ruby_spout(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); From db8547ebd681ebaebe9510ce594b104686aa06e3 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Thu, 16 Apr 2015 21:16:33 +0000 Subject: [PATCH 5/7] Output fields in ruby classes are declared in java Fixes an issue where outfields that are defined in the Spout or Bolt are not recognized by the Nimbus server when it validates the existence of streams for all bolts in the topology. --- lib/red_storm/dsl/output_fields.rb | 28 +++++++++------------------- lib/red_storm/dsl/topology.rb | 23 +++++++++++------------ spec/red_storm/dsl/bolt_spec.rb | 22 +++++++++++----------- spec/red_storm/dsl/spout_spec.rb | 16 ++++++++-------- spec/red_storm/dsl/topology_spec.rb | 15 ++++++--------- 5 files changed, 45 insertions(+), 59 deletions(-) diff --git a/lib/red_storm/dsl/output_fields.rb b/lib/red_storm/dsl/output_fields.rb index a506711..fb09d9e 100644 --- a/lib/red_storm/dsl/output_fields.rb +++ b/lib/red_storm/dsl/output_fields.rb @@ -1,4 +1,3 @@ - module RedStorm module DSL module OutputFields @@ -8,18 +7,9 @@ def self.included(base) end def declare_output_fields(declarer) - default_fields = [] - self.class.fields.each do |field| - if field.kind_of? Hash - field.each do |stream, fields| - declarer.declareStream(stream, Fields.new(fields)) - end - else - default_fields << field - end + self.class.fields.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) end - - declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? end def stream @@ -27,21 +17,21 @@ def stream end module ClassMethods + def output_fields(*fields) - @fields ||= [] + @output_fields ||= Hash.new([]) fields.each do |field| - if field.kind_of? Hash - @fields << Hash[ - field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } - ] + case field + when Hash + field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } else - @fields << field.to_s + @output_fields['default'] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] end end end def fields - @fields ||= [] + @output_fields ||= Hash.new([]) end def stream? diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index ecb6dcb..de1efdb 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -4,6 +4,7 @@ java_import 'backtype.storm.topology.TopologyBuilder' java_import 'backtype.storm.generated.SubmitOptions' +java_import 'backtype.storm.utils.Utils' module RedStorm module DSL @@ -26,17 +27,19 @@ def initialize(component_class, constructor_args, id, parallelism) @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = Hash.new([]) + @output_fields = @clazz.fields.clone end - def output_fields(*args) - args.each do |field| - if field.kind_of? Hash - field.each { |k, v| merge_fields(k.to_s, v) } + def output_fields(*fields) + fields.each do |field| + case field + when Hash + field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } else - merge_fields('default', field) + @output_fields[Utils::DEFAULT_STREAM_ID] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] end end + @output_fields end @@ -49,14 +52,10 @@ def is_java? def java_safe_fields java_hash = java.util.HashMap.new() @output_fields.each do |k, v| - java_hash.put(k, v.to_java('java.lang.String')) + java_hash.put(k, v.to_java('java.lang.String')) unless v.empty? end java_hash end - - def merge_fields(stream, fields) - @output_fields[stream] |= fields.kind_of?(Array) ? fields.map(&:to_s) : [fields.to_s] - end end class SpoutDefinition < ComponentDefinition @@ -81,7 +80,7 @@ def initialize(*args) @sources = [] end - def source(source_id, grouping, stream = 'default') + def source(source_id, grouping, stream = Utils::DEFAULT_STREAM_ID) @sources << [ source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}, diff --git a/spec/red_storm/dsl/bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb index 97872e6..fc7ada6 100644 --- a/spec/red_storm/dsl/bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -45,42 +45,42 @@ class Bolt1 < RedStorm::SimpleBolt output_fields :f1 end bolt = Bolt1.new - Bolt1.send(:fields).should == ["f1"] + Bolt1.send(:fields).should == {"default" => ["f1"]} end it "should parse multiple arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, :f2 end - Bolt1.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse string and symbol arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, "f2" end - Bolt1.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse single hash argument" do class Bolt1 < RedStorm::SimpleBolt output_fields :stream => :f1 end - Bolt1.send(:fields).should == [{"stream" => "f1"}] + Bolt1.send(:fields).should == {"stream" => ["f1"]} end it "should parse hash of string and symbols" do class Bolt1 < RedStorm::SimpleBolt output_fields "stream" => [:f1, :f2] end - Bolt1.send(:fields).should == [{"stream" => ["f1", "f2"]}] + Bolt1.send(:fields).should == {"stream" => ["f1", "f2"]} end it "should parse string and hash arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, :stream => :f2 end - Bolt1.send(:fields).should == ["f1", {"stream" => "f2"}] + Bolt1.send(:fields).should == {"default" => ["f1"], "stream" => ["f2"]} end it "should not share state over mutiple classes" do @@ -90,9 +90,9 @@ class Bolt1 < RedStorm::SimpleBolt class Bolt2 < RedStorm::SimpleBolt output_fields :f2 end - RedStorm::SimpleBolt.send(:fields).should == [] - Bolt1.send(:fields).should == ["f1"] - Bolt2.send(:fields).should == ["f2"] + RedStorm::SimpleBolt.send(:fields).should == {} + Bolt1.send(:fields).should == {"default" => ["f1"]} + Bolt2.send(:fields).should == {"default" => ["f2"]} end end @@ -804,7 +804,7 @@ class Bolt1 < RedStorm::SimpleBolt bolt = Bolt1.new class RedStorm::Fields; end declarer = mock("Declarer") - declarer.should_receive(:declare).with("fields") + declarer.should_receive(:declareStream).with("default", "fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") bolt.declare_output_fields(declarer) end @@ -829,7 +829,7 @@ class Bolt1 < RedStorm::SimpleBolt class RedStorm::Fields; end declarer = mock("Declarer") declarer.should_receive(:declareStream).with("stream", "stream_fields") - declarer.should_receive(:declare).with("default_fields") + declarer.should_receive(:declareStream).with("default", "default_fields") RedStorm::Fields.should_receive(:new).with(["f3", "f4"]).and_return("stream_fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("default_fields") bolt.declare_output_fields(declarer) diff --git a/spec/red_storm/dsl/spout_spec.rb b/spec/red_storm/dsl/spout_spec.rb index ece2a8d..e04d677 100644 --- a/spec/red_storm/dsl/spout_spec.rb +++ b/spec/red_storm/dsl/spout_spec.rb @@ -65,21 +65,21 @@ class Spout1 < RedStorm::SimpleSpout output_fields :f1 end - Spout1.send(:fields).should == ["f1"] + Spout1.send(:fields).should == {"default" => ["f1"]} end it "should parse multiple arguments" do class Spout1 < RedStorm::SimpleSpout output_fields :f1, :f2 end - Spout1.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse string and symbol arguments" do class Spout1 < RedStorm::SimpleSpout output_fields :f1, "f2" end - Spout1.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should not share state over mutiple classes" do @@ -89,9 +89,9 @@ class Spout1 < RedStorm::SimpleSpout class Spout2 < RedStorm::SimpleSpout output_fields :f2 end - RedStorm::SimpleSpout.send(:fields).should == [] - Spout1.send(:fields).should == ["f1"] - Spout2.send(:fields).should == ["f2"] + RedStorm::SimpleSpout.send(:fields).should == {} + Spout1.send(:fields).should == {"default" => ["f1"]} + Spout2.send(:fields).should == {"default" => ["f2"]} end end @@ -787,7 +787,7 @@ class Spout1 < RedStorm::SimpleSpout spout = Spout1.new class RedStorm::Fields; end declarer = mock("Declarer") - declarer.should_receive(:declare).with("fields") + declarer.should_receive(:declareStream).with("default", "fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") spout.declare_output_fields(declarer) end @@ -879,4 +879,4 @@ class Spout1 < RedStorm::SimpleSpout; end end end end -end \ No newline at end of file +end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index c1b25ce..2e52208 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' require 'red_storm/dsl/topology' - -require 'pry' +require 'red_storm/dsl/spout' +require 'red_storm/dsl/bolt' describe RedStorm::SimpleTopology do @@ -23,10 +23,10 @@ class RedStorm::Fields; end Object.send(:remove_const, "SpoutClass2") if Object.const_defined?("SpoutClass2") Object.send(:remove_const, "BoltClass1") if Object.const_defined?("BoltClass1") Object.send(:remove_const, "BoltClass2") if Object.const_defined?("BoltClass2") - class SpoutClass1; end - class SpoutClass2; end - class BoltClass1; end - class BoltClass2; end + class SpoutClass1 < RedStorm::DSL::Spout; end + class SpoutClass2 < RedStorm::DSL::Spout; end + class BoltClass1 < RedStorm::DSL::Bolt; end + class BoltClass2 < RedStorm::DSL::Bolt; end SpoutClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path") SpoutClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path") SpoutClass1.should_receive(:java_proxy).at_least(0).times.and_return("RedStorm::JRubySpout") @@ -114,9 +114,6 @@ class Topology1 < RedStorm::SimpleTopology output_fields :f3 end end - # Pry.config.input = STDIN - # Pry.config.output = STDOUT - # binding.pry Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } end From 67fd211d4d77b792ba2682df610f878dfbf04bc6 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Thu, 16 Apr 2015 22:58:34 +0000 Subject: [PATCH 6/7] Wrongly assumed topo bolts inherit from redstorm Not all bolts defined in a topology inherit from the RedStorm::DSL. This ensures the output fields in the ComponentDefinition are only copied if the class uses the OutputFields module. Also, redefining field names for the default stream will replace any previously defined default field names. --- lib/red_storm/dsl/topology.rb | 14 +++++++-- spec/red_storm/dsl/topology_spec.rb | 45 +++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index de1efdb..f259522 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -27,18 +27,22 @@ def initialize(component_class, constructor_args, id, parallelism) @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = @clazz.fields.clone + @output_fields = Hash.new([]) + + initialize_output_fields end def output_fields(*fields) + default_fields = [] fields.each do |field| case field when Hash field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } else - @output_fields[Utils::DEFAULT_STREAM_ID] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] + default_fields |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] end end + @output_fields[Utils::DEFAULT_STREAM_ID] = default_fields unless default_fields.empty? @output_fields end @@ -49,6 +53,12 @@ def is_java? private + def initialize_output_fields + if @clazz.ancestors.include?(RedStorm::DSL::OutputFields) + @output_fields = @clazz.fields.clone + end + end + def java_safe_fields java_hash = java.util.HashMap.new() @output_fields.each do |k, v| diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 2e52208..1c24b25 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -118,6 +118,28 @@ class Topology1 < RedStorm::SimpleTopology Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } end + it "should default output_fields to the class defined fields" do + class SpoutClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1 + end + Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } + end + + it "should override class defined fields with topology output fields" do + class SpoutClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1 do + output_fields :f3, :f4 + end + end + Topology1.spouts.first.output_fields.should == { "default" => ["f3", "f4"] } + end + end describe "bolt statement" do @@ -201,6 +223,29 @@ class Topology1 < RedStorm::SimpleTopology Topology1.bolts.last.output_fields.should == { "default" => ["f3"] } end + it "should default output_fields to the class defined fields" do + class BoltClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + end + end + Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] } + end + + it "should override class defined fields with topology output fields" do + class BoltClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + output_fields :f3, :f4 + end + end + Topology1.bolts.first.output_fields.should == { "default" => ["f3", "f4"] } + end + end describe "configure statement" do From f593b216ca628ef6d489f95343ae64f3991be095 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Thu, 16 Apr 2015 23:23:08 +0000 Subject: [PATCH 7/7] Use Pry for helpful debugging --- Gemfile.lock | 12 ++++++++++++ redstorm.gemspec | 1 + 2 files changed, 13 insertions(+) diff --git a/Gemfile.lock b/Gemfile.lock index feaa583..124deed 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -7,6 +7,7 @@ PATH GEM remote: https://rubygems.org/ specs: + coderay (1.1.0) colorize (0.5.8) coveralls (0.6.7) colorize @@ -15,8 +16,15 @@ GEM simplecov (>= 0.7) thor diff-lcs (1.2.4) + ffi (1.9.8-java) + method_source (0.8.2) mime-types (1.23) multi_json (1.7.7) + pry (0.10.1-java) + coderay (~> 1.1.0) + method_source (~> 0.8.1) + slop (~> 3.4) + spoon (~> 0.0) rake (10.0.4) redis (3.0.4) rest-client (1.6.7) @@ -33,6 +41,9 @@ GEM multi_json (~> 1.0) simplecov-html (~> 0.7.1) simplecov-html (0.7.1) + slop (3.6.0) + spoon (0.0.4) + ffi thor (0.18.1) PLATFORMS @@ -40,6 +51,7 @@ PLATFORMS DEPENDENCIES coveralls + pry redis redstorm! rspec (~> 2.13) diff --git a/redstorm.gemspec b/redstorm.gemspec index fe760b3..7c4ef68 100644 --- a/redstorm.gemspec +++ b/redstorm.gemspec @@ -21,5 +21,6 @@ Gem::Specification.new do |s| s.executables = ['redstorm'] s.add_development_dependency 'rspec', '~> 2.13' + s.add_development_dependency 'pry' s.add_runtime_dependency 'rake' end