]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | # |
2 | # Licensed to the Apache Software Foundation (ASF) under one | |
3 | # or more contributor license agreements. See the NOTICE file | |
4 | # distributed with this work for additional information | |
5 | # regarding copyright ownership. The ASF licenses this file | |
6 | # to you under the Apache License, Version 2.0 (the | |
7 | # "License"); you may not use this file except in compliance | |
8 | # with the License. You may obtain a copy of the License at | |
9 | # | |
10 | # http://www.apache.org/licenses/LICENSE-2.0 | |
11 | # | |
12 | # Unless required by applicable law or agreed to in writing, | |
13 | # software distributed under the License is distributed on an | |
14 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | # KIND, either express or implied. See the License for the | |
16 | # specific language governing permissions and limitations | |
17 | # under the License. | |
18 | # | |
19 | ||
20 | require 'rubygems' | |
21 | $:.unshift File.dirname(__FILE__) + '/../lib' | |
22 | require 'thrift' | |
23 | require 'stringio' | |
24 | ||
25 | HOST = '127.0.0.1' | |
26 | PORT = 42587 | |
27 | ||
28 | ############### | |
29 | ## Server | |
30 | ############### | |
31 | ||
32 | class Server | |
33 | attr_accessor :serverclass | |
34 | attr_accessor :interpreter | |
35 | attr_accessor :host | |
36 | attr_accessor :port | |
37 | ||
38 | def initialize(opts) | |
39 | @serverclass = opts.fetch(:class, Thrift::NonblockingServer) | |
40 | @interpreter = opts.fetch(:interpreter, "ruby") | |
41 | @host = opts.fetch(:host, ::HOST) | |
42 | @port = opts.fetch(:port, ::PORT) | |
43 | end | |
44 | ||
45 | def start | |
46 | return if @serverclass == Object | |
47 | args = (File.basename(@interpreter) == "jruby" ? "-J-server" : "") | |
48 | @pipe = IO.popen("#{@interpreter} #{args} #{File.dirname(__FILE__)}/server.rb #{@host} #{@port} #{@serverclass.name}", "r+") | |
49 | Marshal.load(@pipe) # wait until the server has started | |
50 | sleep 0.4 # give the server time to actually start spawning sockets | |
51 | end | |
52 | ||
53 | def shutdown | |
54 | return unless @pipe | |
55 | Marshal.dump(:shutdown, @pipe) | |
56 | begin | |
57 | @pipe.read(10) # block until the server shuts down | |
58 | rescue EOFError | |
59 | end | |
60 | @pipe.close | |
61 | @pipe = nil | |
62 | end | |
63 | end | |
64 | ||
65 | class BenchmarkManager | |
66 | def initialize(opts, server) | |
67 | @socket = opts.fetch(:socket) do | |
68 | @host = opts.fetch(:host, 'localhost') | |
69 | @port = opts.fetch(:port) | |
70 | nil | |
71 | end | |
72 | @num_processes = opts.fetch(:num_processes, 40) | |
73 | @clients_per_process = opts.fetch(:clients_per_process, 10) | |
74 | @calls_per_client = opts.fetch(:calls_per_client, 50) | |
75 | @interpreter = opts.fetch(:interpreter, "ruby") | |
76 | @server = server | |
77 | @log_exceptions = opts.fetch(:log_exceptions, false) | |
78 | end | |
79 | ||
80 | def run | |
81 | @pool = [] | |
82 | @benchmark_start = Time.now | |
83 | puts "Spawning benchmark processes..." | |
84 | @num_processes.times do | |
85 | spawn | |
86 | sleep 0.02 # space out spawns | |
87 | end | |
88 | collect_output | |
89 | @benchmark_end = Time.now # we know the procs are done here | |
90 | translate_output | |
91 | analyze_output | |
92 | report_output | |
93 | end | |
94 | ||
95 | def spawn | |
96 | pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{"-log-exceptions" if @log_exceptions} #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}") | |
97 | @pool << pipe | |
98 | end | |
99 | ||
100 | def socket_class | |
101 | if @socket | |
102 | Thrift::UNIXSocket | |
103 | else | |
104 | Thrift::Socket | |
105 | end | |
106 | end | |
107 | ||
108 | def collect_output | |
109 | puts "Collecting output..." | |
110 | # read from @pool until all sockets are closed | |
111 | @buffers = Hash.new { |h,k| h[k] = '' } | |
112 | until @pool.empty? | |
113 | rd, = select(@pool) | |
114 | next if rd.nil? | |
115 | rd.each do |fd| | |
116 | begin | |
117 | @buffers[fd] << fd.readpartial(4096) | |
118 | rescue EOFError | |
119 | @pool.delete fd | |
120 | end | |
121 | end | |
122 | end | |
123 | end | |
124 | ||
125 | def translate_output | |
126 | puts "Translating output..." | |
127 | @output = [] | |
128 | @buffers.each do |fd, buffer| | |
129 | strio = StringIO.new(buffer) | |
130 | logs = [] | |
131 | begin | |
132 | loop do | |
133 | logs << Marshal.load(strio) | |
134 | end | |
135 | rescue EOFError | |
136 | @output << logs | |
137 | end | |
138 | end | |
139 | end | |
140 | ||
141 | def analyze_output | |
142 | puts "Analyzing output..." | |
143 | call_times = [] | |
144 | client_times = [] | |
145 | connection_failures = [] | |
146 | connection_errors = [] | |
147 | shortest_call = 0 | |
148 | shortest_client = 0 | |
149 | longest_call = 0 | |
150 | longest_client = 0 | |
151 | @output.each do |logs| | |
152 | cur_call, cur_client = nil | |
153 | logs.each do |tok, time| | |
154 | case tok | |
155 | when :start | |
156 | cur_client = time | |
157 | when :call_start | |
158 | cur_call = time | |
159 | when :call_end | |
160 | delta = time - cur_call | |
161 | call_times << delta | |
162 | longest_call = delta unless longest_call > delta | |
163 | shortest_call = delta if shortest_call == 0 or delta < shortest_call | |
164 | cur_call = nil | |
165 | when :end | |
166 | delta = time - cur_client | |
167 | client_times << delta | |
168 | longest_client = delta unless longest_client > delta | |
169 | shortest_client = delta if shortest_client == 0 or delta < shortest_client | |
170 | cur_client = nil | |
171 | when :connection_failure | |
172 | connection_failures << time | |
173 | when :connection_error | |
174 | connection_errors << time | |
175 | end | |
176 | end | |
177 | end | |
178 | @report = {} | |
179 | @report[:total_calls] = call_times.inject(0.0) { |a,t| a += t } | |
180 | @report[:avg_calls] = @report[:total_calls] / call_times.size | |
181 | @report[:total_clients] = client_times.inject(0.0) { |a,t| a += t } | |
182 | @report[:avg_clients] = @report[:total_clients] / client_times.size | |
183 | @report[:connection_failures] = connection_failures.size | |
184 | @report[:connection_errors] = connection_errors.size | |
185 | @report[:shortest_call] = shortest_call | |
186 | @report[:shortest_client] = shortest_client | |
187 | @report[:longest_call] = longest_call | |
188 | @report[:longest_client] = longest_client | |
189 | @report[:total_benchmark_time] = @benchmark_end - @benchmark_start | |
190 | @report[:fastthread] = $".include?('fastthread.bundle') | |
191 | end | |
192 | ||
193 | def report_output | |
194 | fmt = "%.4f seconds" | |
195 | puts | |
196 | tabulate "%d", | |
197 | [["Server class", "%s"], @server.serverclass == Object ? "" : @server.serverclass], | |
198 | [["Server interpreter", "%s"], @server.interpreter], | |
199 | [["Client interpreter", "%s"], @interpreter], | |
200 | [["Socket class", "%s"], socket_class], | |
201 | ["Number of processes", @num_processes], | |
202 | ["Clients per process", @clients_per_process], | |
203 | ["Calls per client", @calls_per_client], | |
204 | [["Using fastthread", "%s"], @report[:fastthread] ? "yes" : "no"] | |
205 | puts | |
206 | failures = (@report[:connection_failures] > 0) | |
207 | tabulate fmt, | |
208 | [["Connection failures", "%d", [:red, :bold]], @report[:connection_failures]], | |
209 | [["Connection errors", "%d", [:red, :bold]], @report[:connection_errors]], | |
210 | ["Average time per call", @report[:avg_calls]], | |
211 | ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]], | |
212 | ["Total time for all calls", @report[:total_calls]], | |
213 | ["Real time for benchmarking", @report[:total_benchmark_time]], | |
214 | ["Shortest call time", @report[:shortest_call]], | |
215 | ["Longest call time", @report[:longest_call]], | |
216 | ["Shortest client time (%d calls)" % @calls_per_client, @report[:shortest_client]], | |
217 | ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]] | |
218 | end | |
219 | ||
220 | ANSI = { | |
221 | :reset => 0, | |
222 | :bold => 1, | |
223 | :black => 30, | |
224 | :red => 31, | |
225 | :green => 32, | |
226 | :yellow => 33, | |
227 | :blue => 34, | |
228 | :magenta => 35, | |
229 | :cyan => 36, | |
230 | :white => 37 | |
231 | } | |
232 | ||
233 | def tabulate(fmt, *labels_and_values) | |
234 | labels = labels_and_values.map { |l| Array === l ? l.first : l } | |
235 | label_width = labels.inject(0) { |w,l| l.size > w ? l.size : w } | |
236 | labels_and_values.each do |(l,v)| | |
237 | f = fmt | |
238 | l, f, c = l if Array === l | |
239 | fmtstr = "%-#{label_width+1}s #{f}" | |
240 | if STDOUT.tty? and c and v.to_i > 0 | |
241 | fmtstr = "\e[#{[*c].map { |x| ANSI[x] } * ";"}m" + fmtstr + "\e[#{ANSI[:reset]}m" | |
242 | end | |
243 | puts fmtstr % [l+":", v] | |
244 | end | |
245 | end | |
246 | end | |
247 | ||
248 | def resolve_const(const) | |
249 | const and const.split('::').inject(Object) { |k,c| k.const_get(c) } | |
250 | end | |
251 | ||
252 | puts "Starting server..." | |
253 | args = {} | |
254 | args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby" | |
255 | args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer | |
256 | args[:host] = ENV['THRIFT_HOST'] || HOST | |
257 | args[:port] = (ENV['THRIFT_PORT'] || PORT).to_i | |
258 | server = Server.new(args) | |
259 | server.start | |
260 | ||
261 | args = {} | |
262 | args[:host] = ENV['THRIFT_HOST'] || HOST | |
263 | args[:port] = (ENV['THRIFT_PORT'] || PORT).to_i | |
264 | args[:num_processes] = (ENV['THRIFT_NUM_PROCESSES'] || 40).to_i | |
265 | args[:clients_per_process] = (ENV['THRIFT_NUM_CLIENTS'] || 5).to_i | |
266 | args[:calls_per_client] = (ENV['THRIFT_NUM_CALLS'] || 50).to_i | |
267 | args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby" | |
268 | args[:log_exceptions] = !!ENV['THRIFT_LOG_EXCEPTIONS'] | |
269 | BenchmarkManager.new(args, server).run | |
270 | ||
271 | server.shutdown |