class MCollective::Client
Helpers for writing clients that can talk to agents, do discovery and so forth
Attributes
Public Class Methods
# File lib/mcollective/client.rb 6 def initialize(options) 7 @config = Config.instance 8 @options = nil 9 10 if options.is_a?(String) 11 # String is the path to a config file 12 @config.loadconfig(options) unless @config.configured 13 elsif options.is_a?(Hash) 14 @config.loadconfig(options[:config]) unless @config.configured 15 @options = options 16 @connection_timeout = options[:connection_timeout] 17 else 18 raise "Invalid parameter passed to Client constructor. Valid types are Hash or String" 19 end 20 21 @connection_timeout ||= @config.connection_timeout 22 23 @connection = PluginManager["connector_plugin"] 24 @security = PluginManager["security_plugin"] 25 26 @security.initiated_by = :client 27 @subscriptions = {} 28 29 @discoverer = Discovery.new(self) 30 31 # Time box the connection if a timeout has been specified 32 # connection_timeout defaults to nil which means it will try forever if 33 # not specified 34 begin 35 Timeout::timeout(@connection_timeout, ClientTimeoutError) do 36 @connection.connect 37 end 38 rescue ClientTimeoutError => e 39 Log.error("Timeout occured while trying to connect to middleware") 40 raise e 41 end 42 end
# File lib/mcollective/client.rb 45 def self.request_sequence 46 @@request_sequence 47 end
Public Instance Methods
Returns the configured main collective if no specific collective is specified as options
# File lib/mcollective/client.rb 51 def collective 52 if @options[:collective].nil? 53 @config.main_collective 54 else 55 @options[:collective] 56 end 57 end
# File lib/mcollective/client.rb 73 def createreq(msg, agent, filter ={}) 74 if msg.is_a?(Message) 75 request = msg 76 agent = request.agent 77 else 78 ttl = @options[:ttl] || @config.ttl 79 request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl}) 80 request.reply_to = @options[:reply_to] if @options[:reply_to] 81 end 82 83 @@request_sequence += 1 84 85 request.encode! 86 subscribe(agent, :reply) unless request.reply_to 87 request 88 end
Disconnects cleanly from the middleware
# File lib/mcollective/client.rb 60 def disconnect 61 Log.debug("Disconnecting from the middleware") 62 @connection.disconnect 63 end
Performs a discovery of nodes matching the filter passed returns an array of nodes
An integer limit can be supplied this will have the effect of the discovery being cancelled soon as it reached the requested limit of hosts
# File lib/mcollective/client.rb 147 def discover(filter, timeout, limit=0) 148 @discoverer.discover(filter.merge({'collective' => collective}), timeout, limit) 149 end
# File lib/mcollective/client.rb 300 def discovered_req(body, agent, options=false) 301 raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework" 302 end
Prints out the stats returns from req and discovered_req
in a nice way
# File lib/mcollective/client.rb 305 def display_stats(stats, options=false, caption="stomp call summary") 306 options = @options unless options 307 308 if options[:verbose] 309 puts("\n---- #{caption} ----") 310 311 if stats[:discovered] 312 puts(" Nodes: #{stats[:discovered]} / #{stats[:responses]}") 313 else 314 puts(" Nodes: #{stats[:responses]}") 315 end 316 317 printf(" Start Time: %s\n", Time.at(stats[:starttime])) 318 printf(" Discovery Time: %.2fms\n", stats[:discoverytime] * 1000) 319 printf(" Agent Time: %.2fms\n", stats[:blocktime] * 1000) 320 printf(" Total Time: %.2fms\n", stats[:totaltime] * 1000) 321 322 else 323 if stats[:discovered] 324 printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000) 325 else 326 printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000) 327 end 328 end 329 330 if stats[:noresponsefrom].size > 0 331 puts("\nNo response from:\n") 332 333 stats[:noresponsefrom].each do |c| 334 puts if c % 4 == 1 335 printf("%30s", c) 336 end 337 338 puts 339 end 340 341 if stats[:unexpectedresponsefrom].size > 0 342 puts("\nUnexpected response from:\n") 343 344 stats[:unexpectedresponsefrom].each do |c| 345 puts if c % 4 == 1 346 printf("%30s", c) 347 end 348 349 puts 350 end 351 end
# File lib/mcollective/client.rb 231 def publish(request) 232 Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'") 233 request.publish 234 end
Blocking call that waits for ever for a message to arrive.
If you give it a requestid this means you've previously send a request with that ID and now you just want replies that matches that id, in that case the current connection will just ignore all messages not directed at it and keep waiting for more till it finds a matching message.
# File lib/mcollective/client.rb 115 def receive(requestid = nil) 116 reply = nil 117 118 begin 119 reply = @connection.receive 120 reply.type = :reply 121 reply.expected_msgid = requestid 122 123 reply.decode! 124 125 unless reply.requestid == requestid 126 raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}") 127 end 128 129 Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}") 130 rescue SecurityValidationFailed => e 131 Log.warn("Ignoring a message that did not pass security validations") 132 retry 133 rescue MsgDoesNotMatchRequestID => e 134 Log.debug("Ignoring a message for some other client : #{e.message}") 135 retry 136 end 137 138 reply 139 end
Send a request, performs the passed block for each response
times = req(“status”, “mcollectived”, options, client) {|resp|
pp resp
}
It returns a hash of times and timeouts for discovery and total run is taken from the options hash which in turn is generally built using MCollective::Optionparser
# File lib/mcollective/client.rb 159 def req(body, agent=nil, options=false, waitfor=[], &block) 160 if body.is_a?(Message) 161 agent = body.agent 162 waitfor = body.discovered_hosts || [] 163 @options = body.options 164 end 165 166 @options = options if options 167 threaded = @options[:threaded] 168 timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter]) 169 request = createreq(body, agent, @options[:filter]) 170 publish_timeout = @options[:publish_timeout] || @config.publish_timeout 171 stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0} 172 STDOUT.sync = true 173 hosts_responded = 0 174 175 begin 176 if threaded 177 hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block) 178 else 179 hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block) 180 end 181 rescue Interrupt => e 182 ensure 183 unsubscribe(agent, :reply) 184 end 185 186 return update_stat(stat, hosts_responded, request.requestid) 187 end
Sends a request and returns the generated request id, doesn't wait for responses and doesn't execute any passed in code blocks for responses
# File lib/mcollective/client.rb 67 def sendreq(msg, agent, filter = {}) 68 request = createreq(msg, agent, filter) 69 publish(request) 70 request.requestid 71 end
Starts the request publishing routine
# File lib/mcollective/client.rb 220 def start_publisher(request, publish_timeout) 221 Log.debug("Starting publishing with publish timeout of #{publish_timeout}") 222 begin 223 Timeout.timeout(publish_timeout) do 224 publish(request) 225 end 226 rescue Timeout::Error => e 227 Log.warn("Could not publish all messages. Publishing timed out.") 228 end 229 end
Starts the response receiver routine Expected to return the amount of received responses.
# File lib/mcollective/client.rb 238 def start_receiver(requestid, waitfor, timeout, &block) 239 Log.debug("Starting response receiver with timeout of #{timeout}") 240 hosts_responded = 0 241 242 if (waitfor.is_a?(Array)) 243 unfinished = Hash.new(0) 244 waitfor.each {|w| unfinished[w] += 1} 245 else 246 unfinished = [] 247 end 248 249 begin 250 Timeout.timeout(timeout) do 251 loop do 252 resp = receive(requestid) 253 254 if block.arity == 2 255 yield resp.payload, resp 256 else 257 yield resp.payload 258 end 259 260 hosts_responded += 1 261 262 if (waitfor.is_a?(Array)) 263 sender = resp.payload[:senderid] 264 if unfinished[sender] <= 1 265 unfinished.delete(sender) 266 else 267 unfinished[sender] -= 1 268 end 269 270 break if !waitfor.empty? && unfinished.empty? 271 else 272 break unless waitfor == 0 || hosts_responded < waitfor 273 end 274 end 275 end 276 rescue Timeout::Error => e 277 if waitfor.is_a?(Array) 278 if !unfinished.empty? 279 Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}") 280 end 281 elsif (waitfor > hosts_responded) 282 Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}") 283 end 284 end 285 286 hosts_responded 287 end
# File lib/mcollective/client.rb 90 def subscribe(agent, type) 91 unless @subscriptions.include?(agent) 92 subscription = Util.make_subscriptions(agent, type, collective) 93 Log.debug("Subscribing to #{type} target for agent #{agent}") 94 95 Util.subscribe(subscription) 96 @subscriptions[agent] = 1 97 end 98 end
Starts the client receiver and publisher in threads. This is activated when the 'threader_client' configuration option is set.
# File lib/mcollective/client.rb 199 def threaded_req(request, publish_timeout, timeout, waitfor, &block) 200 Log.debug("Starting threaded client") 201 publisher = Thread.new do 202 start_publisher(request, publish_timeout) 203 end 204 205 # When the client is threaded we add the publishing timeout to 206 # the agent timeout so that the receiver doesn't time out before 207 # publishing has finished in cases where publish_timeout >= timeout. 208 total_timeout = publish_timeout + timeout 209 hosts_responded = 0 210 211 receiver = Thread.new do 212 hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block) 213 end 214 215 receiver.join 216 hosts_responded 217 end
# File lib/mcollective/client.rb 100 def unsubscribe(agent, type) 101 if @subscriptions.include?(agent) 102 subscription = Util.make_subscriptions(agent, type, collective) 103 Log.debug("Unsubscribing #{type} target for #{agent}") 104 105 Util.unsubscribe(subscription) 106 @subscriptions.delete(agent) 107 end 108 end
Starts the client receiver and publisher unthreaded. This is the default client behaviour.
# File lib/mcollective/client.rb 191 def unthreaded_req(request, publish_timeout, timeout, waitfor, &block) 192 start_publisher(request, publish_timeout) 193 start_receiver(request.requestid, waitfor, timeout, &block) 194 end
# File lib/mcollective/client.rb 289 def update_stat(stat, hosts_responded, requestid) 290 stat[:totaltime] = Time.now.to_f - stat[:starttime] 291 stat[:blocktime] = stat[:totaltime] - stat[:discoverytime] 292 stat[:responses] = hosts_responded 293 stat[:noresponsefrom] = [] 294 stat[:unexpectedresponsefrom] = [] 295 stat[:requestid] = requestid 296 297 @stats = stat 298 end