-
Notifications
You must be signed in to change notification settings - Fork 84
Expand file tree
/
Copy pathnode.rb
More file actions
201 lines (171 loc) · 6.78 KB
/
node.rb
File metadata and controls
201 lines (171 loc) · 6.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
module CloudCrowd
# A Node is a Sinatra/Thin application that runs a single instance per-machine
# It registers with the central server, receives WorkUnits, and forks off
# Workers to process them. The actions are:
#
# [get /heartbeat] Returns 200 OK to let monitoring tools know the server's up.
# [post /work] The central server hits <tt>/work</tt> to dispatch a WorkUnit to this Node.
class Node < Sinatra::Base
# A Node's default port. You only run a single node per machine, so they
# can all use the same port without any problems.
DEFAULT_PORT = 9063
# A list of regex scrapers, which let us extract the one-minute load
# average and the amount of free memory on different flavors of UNIX.
SCRAPE_UPTIME = /\d+\.\d+/
SCRAPE_LINUX_MEMORY = /MemFree:\s+(\d+) kB/
SCRAPE_MAC_MEMORY = /Pages free:\s+(\d+)./
SCRAPE_MAC_PAGE = /page size of (\d+) bytes/
# The interval at which the node monitors the machine's load and memory use
# (if configured to do so in config.yml).
MONITOR_INTERVAL = 3
# The interval at which the node regularly checks in with central (5 min).
CHECK_IN_INTERVAL = 300
# The response sent back when this node is overloaded.
OVERLOADED_MESSAGE = 'Node Overloaded'
attr_reader :enabled_actions, :host, :port, :tag, :central
set :root, ROOT
set :authorization_realm, "CloudCrowd"
helpers Helpers
# methodoverride allows the _method param.
enable :methodoverride
# Enabling HTTP Authentication turns it on for all requests.
# This works the same way as in the central CloudCrowd::Server.
before do
login_required if CloudCrowd.config[:http_authentication]
end
# To monitor a Node with Monit, God, Nagios, or another tool, you can hit
# /heartbeat to make sure its still online.
get '/heartbeat' do
"buh-bump"
end
# Posts a WorkUnit to this Node. Forks a Worker and returns the process id.
# Returns a 503 if this Node is overloaded.
post '/work' do
throw :halt, [503, OVERLOADED_MESSAGE] if @overloaded
unit = JSON.parse(params[:work_unit])
pid = fork { Worker.new(self, unit).run }
Process.detach(pid)
json :pid => pid
end
# When creating a node, specify the port it should run on.
def initialize(options={})
require 'json'
CloudCrowd.identity = :node
@central = CloudCrowd.central_server
@host = Socket.gethostname
@enabled_actions = CloudCrowd.actions.keys - (CloudCrowd.config[:disabled_actions] || [])
@port = options[:port] || DEFAULT_PORT
@id = "#{@host}:#{@port}"
@daemon = !!options[:daemonize]
@tag = options[:tag]
@overloaded = false
@max_load = CloudCrowd.config[:max_load]
@min_memory = CloudCrowd.config[:min_free_memory]
start unless test?
end
# Starting up a Node registers with the central server and begins to listen
# for incoming WorkUnits.
def start
FileUtils.mkdir_p(CloudCrowd.log_path) if @daemon && !File.exists?(CloudCrowd.log_path)
@server = Thin::Server.new('0.0.0.0', @port, self, :signals => false)
@server.tag = 'cloud-crowd-node'
@server.pid_file = CloudCrowd.pid_path('node.pid')
@server.log_file = CloudCrowd.log_path('node.log')
@server.daemonize if @daemon
trap_signals
asset_store
@server_thread = Thread.new { @server.start }
check_in(true)
check_in_periodically
monitor_system if @max_load || @min_memory
@server_thread.join
end
# Checking in with the central server informs it of the location and
# configuration of this Node. If it can't check-in, there's no point in
# starting.
def check_in(critical=false)
@central["/node/#{@id}"].put(
:busy => @overloaded,
:tag => @tag,
:max_workers => CloudCrowd.config[:max_workers],
:enabled_actions => @enabled_actions.join(',')
)
rescue RestClient::Exception, Errno::ECONNREFUSED
puts "Failed to connect to the central server (#{@central.to_s})."
raise SystemExit if critical
end
# Before exiting, the Node checks out with the central server, releasing all
# of its WorkUnits for other Nodes to handle
def check_out
@central["/node/#{@id}"].delete
end
# Lazy-initialize the asset_store, preferably after the Node has launched.
def asset_store
@asset_store ||= AssetStore.new
end
# Is the node overloaded? If configured, checks if the load average is
# greater than 'max_load', or if the available RAM is less than
# 'min_free_memory'.
def overloaded?
(@max_load && load_average > @max_load) ||
(@min_memory && free_memory < @min_memory)
end
# The current one-minute load average.
def load_average
`uptime`.match(SCRAPE_UPTIME).to_s.to_f
end
# The current amount of free memory in megabytes.
def free_memory
case RUBY_PLATFORM
when /darwin/
stats = `vm_stat`
@mac_page_size ||= stats.match(SCRAPE_MAC_PAGE)[1].to_f / 1048576.0
stats.match(SCRAPE_MAC_MEMORY)[1].to_f * @mac_page_size
when /linux/
`cat /proc/meminfo`.match(SCRAPE_LINUX_MEMORY)[1].to_f / 1024.0
else
raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform"
end
end
private
# Launch a monitoring thread that periodically checks the node's load
# average and the amount of free memory remaining. If we transition out of
# the overloaded state, let central know.
def monitor_system
@monitor_thread = Thread.new do
loop do
was_overloaded = @overloaded
@overloaded = overloaded?
check_in if was_overloaded && !@overloaded
sleep MONITOR_INTERVAL
end
end
end
# If communication is interrupted for external reasons, the central server
# will assume that the node has gone down. Checking in will let central know
# it's still online.
def check_in_periodically
@check_in_thread = Thread.new do
loop do
sleep CHECK_IN_INTERVAL
check_in
end
end
end
# Trap exit signals in order to shut down cleanly.
def trap_signals
Signal.trap('QUIT') { shut_down }
Signal.trap('INT') { shut_down }
Signal.trap('KILL') { shut_down }
Signal.trap('TERM') { shut_down }
end
# At shut down, de-register with the central server before exiting.
def shut_down
@check_in_thread.kill if @check_in_thread
@monitor_thread.kill if @monitor_thread
check_out
@server_thread.kill if @server_thread
Process.exit
end
end
end