class MCollective::Aggregate
Attributes
action[RW]
ddl[RW]
failed[RW]
functions[RW]
Public Class Methods
new(ddl)
click to toggle source
# File lib/mcollective/aggregate.rb 8 def initialize(ddl) 9 @functions = [] 10 @ddl = ddl 11 @action = ddl[:action] 12 @failed = [] 13 14 create_functions 15 end
Public Instance Methods
call_functions(reply)
click to toggle source
Call all the appropriate functions with the reply data received from RPC::Client
# File lib/mcollective/aggregate.rb 45 def call_functions(reply) 46 @functions.each do |function| 47 Log.debug("Calling aggregate function #{function} for result") 48 begin 49 function.process_result(reply[:data][function.output_name], reply) 50 rescue Exception => e 51 Log.error("Could not process aggregate function for '#{function.output_name}'. #{e.to_s}") 52 @failed << {:name => function.output_name, :type => :process_result} 53 @functions.delete(function) 54 end 55 end 56 end
contains_output?(output)
click to toggle source
Check if the function param is defined as an output for the action in the ddl
# File lib/mcollective/aggregate.rb 40 def contains_output?(output) 41 @ddl[:output].keys.include?(output) 42 end
create_functions()
click to toggle source
Creates instances of the Aggregate
functions and stores them in the function array. All aggregate call and summarize method calls operate on these function as a batch.
# File lib/mcollective/aggregate.rb 19 def create_functions 20 @ddl[:aggregate].each_with_index do |agg, i| 21 output = agg[:args][0] 22 23 if contains_output?(output) 24 arguments = agg[:args][1] 25 format = (arguments.delete(:format) if arguments) || nil 26 begin 27 @functions << load_function(agg[:function]).new(output, arguments, format, @action) 28 rescue Exception => e 29 Log.error("Cannot create aggregate function '#{output}'. #{e.to_s}") 30 @failed << {:name => output, :type => :startup} 31 end 32 else 33 Log.error("Cannot create aggregate function '#{output}'. '#{output}' has not been specified as a valid ddl output.") 34 @failed << {:name => output, :type => :create} 35 end 36 end 37 end
load_function(function_name)
click to toggle source
Loads function from disk for use
# File lib/mcollective/aggregate.rb 76 def load_function(function_name) 77 function_name = function_name.to_s.capitalize 78 79 PluginManager.loadclass("MCollective::Aggregate::#{function_name}") unless Aggregate.const_defined?(function_name) 80 Aggregate.const_get(function_name) 81 rescue Exception 82 raise "Aggregate function file '#{function_name.downcase}.rb' cannot be loaded" 83 end
summarize()
click to toggle source
Finalizes the function returning a result object
# File lib/mcollective/aggregate.rb 59 def summarize 60 summary = @functions.map do |function| 61 begin 62 function.summarize 63 rescue Exception => e 64 Log.error("Could not summarize aggregate result for '#{function.output_name}'. #{e.to_s}") 65 @failed << {:name => function.output_name, :type => :summarize} 66 nil 67 end 68 end 69 70 summary.reject{|x| x.nil?}.sort do |x,y| 71 x.result[:output] <=> y.result[:output] 72 end 73 end