]> git.proxmox.com Git - mirror_ifupdown2.git/blame - nlmanager/nllistener.py
nlmanager: rdnbrd "Interrupted system call" traceback in nlmanager
[mirror_ifupdown2.git] / nlmanager / nllistener.py
CommitLineData
198ded6a
JF
1#!/usr/bin/env python
2
3from nlpacket import *
4from nlmanager import NetlinkManager
5from select import select
6from struct import pack, unpack, calcsize
7from threading import Thread, Event, Lock
8from Queue import Queue
9import logging
10import socket
11
12log = logging.getLogger(__name__)
13
14
15class NetlinkListener(Thread):
16
17 def __init__(self, manager, groups):
18 """
19 groups controls what types of messages we are interested in hearing
20 To get everything pass:
21 RTMGRP_LINK | \
22 RTMGRP_IPV4_IFADDR | \
23 RTMGRP_IPV4_ROUTE | \
24 RTMGRP_IPV6_IFADDR | \
25 RTMGRP_IPV6_ROUTE
26 """
27 Thread.__init__(self)
28 self.manager = manager
29 self.shutdown_event = Event()
30 self.groups = groups
31
32 def __str__(self):
33 return 'NetlinkListener'
34
35 def run(self):
36 manager = self.manager
37 header_PACK = 'IHHII'
38 header_LEN = calcsize(header_PACK)
39
40 # The RX socket is used to listen to all netlink messages that fly by
41 # as things change in the kernel. We need a very large SO_RCVBUF here
42 # else we tend to miss messages.
43 self.rx_socket = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, 0)
44 self.rx_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10000000)
45 self.rx_socket.bind((manager.pid+1, self.groups))
46 self.rx_socket_prev_seq = {}
47
48 if not manager.tx_socket:
49 manager.tx_socket_allocate()
50
51 my_sockets = (manager.tx_socket, self.rx_socket)
52
53 socket_string = {
54 manager.tx_socket: "TX",
55 self.rx_socket: "RX"
56 }
57
58 supported_messages = (RTM_NEWLINK, RTM_DELLINK, RTM_NEWADDR,
59 RTM_DELADDR, RTM_NEWNEIGH, RTM_DELNEIGH,
60 RTM_NEWROUTE, RTM_DELROUTE)
61
62 ignore_messages = (RTM_GETLINK, RTM_GETADDR, RTM_GETNEIGH,
63 RTM_GETROUTE, RTM_GETQDISC, NLMSG_ERROR, NLMSG_DONE)
64
65 while True:
66
67 if self.shutdown_event.is_set():
68 log.info("%s: shutting down" % self)
69 return
70
d3d4f288 71 # Only block for 1 second so we can wake up to see if shutdown_event is set
198ded6a
JF
72 try:
73 (readable, writeable, exceptional) = select(my_sockets, [], my_sockets, 1)
74 except Exception as e:
75 log.error('select() error: ' + str(e))
76 continue
77
78 if not readable:
79 continue
80
81 set_alarm = False
82 set_tx_socket_rxed_ack_alarm = False
198ded6a
JF
83
84 for s in readable:
d3d4f288 85 data = []
198ded6a
JF
86
87 try:
88 data = s.recv(4096)
198ded6a
JF
89 except Exception as e:
90 log.error('recv() error: ' + str(e))
198ded6a
JF
91 continue
92
93 total_length = len(data)
94 while data:
95
96 # Extract the length, etc from the header
97 (length, msgtype, flags, seq, pid) = unpack(header_PACK, data[:header_LEN])
98
99 if manager.debug_listener:
d3d4f288
JF
100 log.debug('%s %s: RXed %s seq %d, pid %d, %d bytes (%d total)' %
101 (self, socket_string[s], NetlinkPacket.type_to_string[msgtype],
102 seq, pid, length, total_length))
198ded6a 103
d3d4f288
JF
104 if msgtype == NLMSG_ERROR:
105 # The error code is a signed negative number.
106 error_code = abs(unpack('=i', data[header_LEN:header_LEN+4])[0])
107 msg = Error(msgtype, True)
108 msg.decode_packet(length, flags, seq, pid, data)
198ded6a
JF
109
110 if error_code:
d3d4f288 111 log.debug("%s %s: RXed NLMSG_ERROR code %s (%d)" % (self, socket_string[s], msg.error_to_string.get(error_code), error_code))
198ded6a 112
d3d4f288 113 if seq == manager.target_seq and pid == manager.target_pid:
198ded6a 114 if manager.target_seq_pid_debug:
d3d4f288
JF
115 log.debug("%s %s: Setting RXed ACK alarm for seq %d, pid %d" %
116 (self, socket_string[s], seq, pid))
198ded6a
JF
117 set_tx_socket_rxed_ack_alarm = True
118
119 # Put the message on the manager's netlinkq
120 if msgtype in supported_messages:
121 set_alarm = True
122 manager.netlinkq.append((msgtype, length, flags, seq, pid, data[0:length]))
123
124 # There are certain message types we do not care about
125 # (RTM_GETs for example)
126 elif msgtype in ignore_messages:
127 pass
128
129 # And there are certain message types we have not added
130 # support for yet (QDISC). Log an error for these just
131 # as a reminder to add support for them.
132 else:
133 if msgtype in NetlinkPacket.type_to_string:
d3d4f288
JF
134 log.warning('%s %s: RXed unsupported message %s (type %d)' %
135 (self, socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype))
198ded6a 136 else:
d3d4f288
JF
137 log.warning('%s %s: RXed unknown message type %d' %
138 (self, socket_string[s], msgtype))
198ded6a
JF
139
140 # Track the previous PID sequence number for RX and TX sockets
141 if s == self.rx_socket:
142 prev_seq = self.rx_socket_prev_seq
143 elif s == manager.tx_socket:
144 prev_seq = manager.tx_socket_prev_seq
145
146 if pid in prev_seq and prev_seq[pid] and prev_seq[pid] != seq and (prev_seq[pid]+1 != seq):
d3d4f288 147 log.debug('%s %s: went from seq %d to %d' % (self, socket_string[s], prev_seq[pid], seq))
198ded6a
JF
148 prev_seq[pid] = seq
149
150 data = data[length:]
151
152 if set_tx_socket_rxed_ack_alarm:
153 manager.target_lock.acquire()
154 manager.target_seq = None
155 manager.target_pid = None
156 manager.target_lock.release()
157 manager.tx_socket_rxed_ack.set()
158
159 if set_alarm:
160 manager.workq.put(('SERVICE_NETLINK_QUEUE', None))
161 manager.alarm.set()
162
163 self.rx_socket.close()
164
165
166class NetlinkManagerWithListener(NetlinkManager):
167
168 def __init__(self, groups, start_listener=True):
169 NetlinkManager.__init__(self)
170 self.groups = groups
171 self.workq = Queue()
172 self.netlinkq = []
173 self.alarm = Event()
174 self.shutdown_event = Event()
175 self.tx_socket_rxed_ack = Event()
176 self.tx_socket_rxed_ack.clear()
177 self.target_seq = None
178 self.target_pid = None
179 self.target_seq_pid_debug = False
180 self.target_lock = Lock()
181 self.tx_socket_prev_seq = {}
182 self.debug_listener = False
183 self.debug_seq_pid = {}
184 self.ifname_by_index = {}
185 self.blacklist_filter = {}
186 self.whitelist_filter = {}
187
188 # Listen to netlink messages
189 if start_listener:
190 self.listener = NetlinkListener(self, self.groups)
191 self.listener.start()
192 else:
193 self.listener = None
194
195 def __str__(self):
196 return 'NetlinkManagerWithListener'
197
198 def signal_term_handler(self, signal, frame):
199 log.info("NetlinkManagerWithListener: Caught SIGTERM")
d3d4f288
JF
200
201 if self.listener:
202 self.listener.shutdown_event.set()
203
198ded6a
JF
204 self.shutdown_flag = True # For NetlinkManager shutdown
205 self.shutdown_event.set()
206 self.alarm.set()
207
208 def signal_int_handler(self, signal, frame):
209 log.info("NetlinkManagerWithListener: Caught SIGINT")
d3d4f288
JF
210
211 if self.listener:
212 self.listener.shutdown_event.set()
213
198ded6a
JF
214 self.shutdown_flag = True # For NetlinkManager shutdown
215 self.shutdown_event.set()
216 self.alarm.set()
217
d3d4f288 218 def tx_nlpacket_get_response(self, nlpacket):
198ded6a
JF
219 """
220 TX the message and wait for an ack
221 """
222
223 # NetlinkListener looks at the manager's target_seq and target_pid
224 # to know when we've RXed the ack that we want
225 self.target_lock.acquire()
226 self.target_seq = nlpacket.seq
227 self.target_pid = nlpacket.pid
228 self.target_lock.release()
229
230 if not self.tx_socket:
231 self.tx_socket_allocate()
232 self.tx_socket.sendall(nlpacket.message)
233
234 # Wait for NetlinkListener to RX an ACK or DONE for this (seq, pid)
235 self.tx_socket_rxed_ack.wait()
236 self.tx_socket_rxed_ack.clear()
237
238 # These are here to show some basic examples of how one might react to RXing
239 # various netlink message types. Odds are our child class will redefine these
240 # to do more than log a message.
241 def rx_rtm_newlink(self, msg):
d3d4f288
JF
242 log.debug("RXed RTM_NEWLINK seq %d, pid %d, %d bytes, for %s, state %s" %
243 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
198ded6a
JF
244
245 def rx_rtm_dellink(self, msg):
d3d4f288
JF
246 log.debug("RXed RTM_DELLINK seq %d, pid %d, %d bytes, for %s, state %s" %
247 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
198ded6a
JF
248
249 def rx_rtm_newaddr(self, msg):
d3d4f288
JF
250 log.debug("RXed RTM_NEWADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
251 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
252
253 def rx_rtm_deladdr(self, msg):
d3d4f288
JF
254 log.debug("RXed RTM_DELADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
255 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
256
257 def rx_rtm_newneigh(self, msg):
d3d4f288
JF
258 log.debug("RXed RTM_NEWNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
259 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
260
261 def rx_rtm_delneigh(self, msg):
d3d4f288
JF
262 log.debug("RXed RTM_DELNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
263 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
264
265 def rx_rtm_newroute(self, msg):
d3d4f288
JF
266 log.debug("RXed RTM_NEWROUTE seq %d, pid %d, %d bytes, for %s%s" %
267 (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
198ded6a
JF
268
269 def rx_rtm_delroute(self, msg):
d3d4f288
JF
270 log.debug("RXed RTM_DELROUTE seq %d, pid %d, %d bytes, for %s%s" %
271 (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
198ded6a 272
d3d4f288 273 # Note that tx_nlpacket_get_response will block until NetlinkListener has RXed
198ded6a
JF
274 # an Ack/DONE for the message we TXed
275 def get_all_addresses(self):
276 family = socket.AF_UNSPEC
277 debug = RTM_GETADDR in self.debug
278
279 addr = Address(RTM_GETADDR, debug)
280 addr.flags = NLM_F_REQUEST | NLM_F_DUMP
281 addr.body = pack('Bxxxi', family, 0)
282 addr.build_message(self.sequence.next(), self.pid)
283
284 if debug:
285 self.debug_seq_pid[(addr.seq, addr.pid)] = True
286
d3d4f288 287 self.tx_nlpacket_get_response(addr)
198ded6a
JF
288
289 def get_all_links(self):
290 family = socket.AF_UNSPEC
291 debug = RTM_GETLINK in self.debug
292
293 link = Link(RTM_GETLINK, debug)
294 link.flags = NLM_F_REQUEST | NLM_F_DUMP
295 link.body = pack('Bxxxiii', family, 0, 0, 0)
296 link.build_message(self.sequence.next(), self.pid)
297
298 if debug:
299 self.debug_seq_pid[(link.seq, link.pid)] = True
300
d3d4f288 301 self.tx_nlpacket_get_response(link)
198ded6a
JF
302
303 def get_all_neighbors(self):
304 family = socket.AF_UNSPEC
305 debug = RTM_GETNEIGH in self.debug
306
307 neighbor = Neighbor(RTM_GETNEIGH, debug)
308 neighbor.flags = NLM_F_REQUEST | NLM_F_DUMP
309 neighbor.body = pack('Bxxxii', family, 0, 0)
310 neighbor.build_message(self.sequence.next(), self.pid)
311
312 if debug:
313 self.debug_seq_pid[(neighbor.seq, neighbor.pid)] = True
314
d3d4f288 315 self.tx_nlpacket_get_response(neighbor)
198ded6a
JF
316
317 def get_all_routes(self):
318 family = socket.AF_UNSPEC
319 debug = RTM_GETROUTE in self.debug
320
321 route = Route(RTM_GETROUTE, debug)
322 route.flags = NLM_F_REQUEST | NLM_F_DUMP
323 route.body = pack('Bxxxii', family, 0, 0)
324 route.build_message(self.sequence.next(), self.pid)
325
326 if debug:
327 self.debug_seq_pid[(route.seq, route.pid)] = True
328
d3d4f288 329 self.tx_nlpacket_get_response(route)
198ded6a
JF
330
331 def nested_attributes_match(self, msg, attr_filter):
332 """
333 attr_filter will be a dictionary such as:
334 attr_filter = {
335 Link.IFLA_LINKINFO: {
336 Link.IFLA_INFO_KIND: 'vlan'
337 }
338 }
339 """
340 for (key, value) in attr_filter.items():
341 if type(value) is dict:
342 if not self.nested_attributes_match(msg, value):
343 return False
344 else:
345 attr_value = msg.get_attribute_value(key)
346 if attr_value != value:
347 return False
348 return True
349
350 def filter_rule_matches(self, msg, rule):
351 field = rule[0]
352 options = rule[1:]
353
354 if field == 'IFINDEX':
355 ifindex = options[0]
356
357 if msg.ifindex == ifindex:
358 return True
359
360 elif field == 'ATTRIBUTE':
361 (attr_type, target_value) = options[0:2]
362 attr_value = msg.get_attribute_value(attr_type)
363
364 if attr_value == target_value:
365 return True
366
367 elif field == 'NESTED_ATTRIBUTE':
368 if self.nested_attributes_match(msg, options[0]):
369 return True
370
371 elif field == 'FAMILY':
372 family = options[0]
373
374 if msg.family == family:
375 return True
376 else:
377 raise Exception("Add support to filter based on %s" % field)
378
379 return False
380
381 def filter_permit(self, msg):
382 """
383 Return True if our whitelist/blacklist filters permit this netlink msg
384 """
385 if msg.msgtype in self.whitelist_filter:
386 found_it = False
387
388 for rule in self.whitelist_filter[msg.msgtype]:
389 if self.filter_rule_matches(msg, rule):
390 found_it = True
391 break
392
393 return found_it
394
395 elif msg.msgtype in self.blacklist_filter:
396 for rule in self.blacklist_filter[msg.msgtype]:
397 if self.filter_rule_matches(msg, rule):
398 return False
399 return True
400
401 else:
402 return True
403
404 def _filter_update(self, add, filter_type, msgtype, filter_guts):
405 assert filter_type in ('whitelist', 'blacklist'), "whitelist and blacklist are the only supported filter options"
406
407 if add:
408 if filter_type == 'whitelist':
409
410 # Keep things simple, do not allow both whitelist and blacklist
411 if self.blacklist_filter and self.blacklist_filter.get(msgtype):
412 raise Exception("whitelist and blacklist filters cannot be used at the same time")
413
414 if msgtype not in self.whitelist_filter:
415 self.whitelist_filter[msgtype] = []
416 self.whitelist_filter[msgtype].append(filter_guts)
417
418 elif filter_type == 'blacklist':
419
420 # Keep things simple, do not allow both whitelist and blacklist
421 if self.whitelist_filter and self.whitelist_filter.get(msgtype):
422 raise Exception("whitelist and blacklist filters cannot be used at the same time")
423
424 if msgtype not in self.blacklist_filter:
425 self.blacklist_filter[msgtype] = []
426 self.blacklist_filter[msgtype].append(filter_guts)
427
428 else:
429 if filter_type == 'whitelist':
430 if msgtype in self.whitelist_filter:
431 self.whitelist_filter[msgtype].remove(filter_guts)
432
433 if not self.whitelist_filter[msgtype]:
434 del self.whitelist_filter[msgtype]
435
436 elif filter_type == 'blacklist':
437 if msgtype in self.blacklist_filter:
438 self.blacklist_filter[msgtype].remove(filter_guts)
439
440 if not self.blacklist_filter[msgtype]:
441 del self.blacklist_filter[msgtype]
442
443 def filter_by_address_family(self, add, filter_type, msgtype, family):
444 self._filter_update(add, filter_type, msgtype, ('FAMILY', family))
445
446 def filter_by_ifindex(self, add, filter_type, msgtype, ifindex):
447 self._filter_update(add, filter_type, msgtype, ('IFINDEX', ifindex))
448
449 def filter_by_attribute(self, add, filter_type, msgtype, attribute, attribute_value):
450 self._filter_update(add, filter_type, msgtype, ('ATTRIBUTE', attribute, attribute_value))
451
452 def filter_by_nested_attribute(self, add, filter_type, msgtype, attr_filter):
453 self._filter_update(add, filter_type, msgtype, ('NESTED_ATTRIBUTE', attr_filter))
454
455 def service_netlinkq(self):
456 msg_count = {}
457 processed = 0
458
459 for (msgtype, length, flags, seq, pid, data) in self.netlinkq:
460 processed += 1
461
462 # If this is a reply to a TX message that debugs were enabled for then debug the reply
463 if (seq, pid) in self.debug_seq_pid:
464 debug = True
465 else:
466 debug = self.debug_this_packet(msgtype)
467
468 if msgtype == RTM_NEWLINK or msgtype == RTM_DELLINK:
469 msg = Link(msgtype, debug)
470
471 elif msgtype == RTM_NEWADDR or msgtype == RTM_DELADDR:
472 msg = Address(msgtype, debug)
473
474 elif msgtype == RTM_NEWNEIGH or msgtype == RTM_DELNEIGH:
475 msg = Neighbor(msgtype, debug)
476
477 elif msgtype == RTM_NEWROUTE or msgtype == RTM_DELROUTE:
478 msg = Route(msgtype, debug)
479
480 else:
481 log.warning('RXed unknown netlink message type %s' % msgtype)
482 continue
483
484 msg.decode_packet(length, flags, seq, pid, data)
485
486 if not self.filter_permit(msg):
487 continue
488
489 if debug:
490 msg.dump()
491
492 # Only used for printing debugs about how many we RXed of each type
493 if msg.msgtype not in msg_count:
494 msg_count[msg.msgtype] = 0
495 msg_count[msg.msgtype] += 1
496
497 # Call the appropriate handler method based on the msgtype. The handler
498 # functions are defined in our child class.
499 if msg.msgtype == RTM_NEWLINK:
500
501 # We will use ifname_by_index to display the interface name in debug output
502 self.ifname_by_index[msg.ifindex] = msg.get_attribute_value(msg.IFLA_IFNAME)
503 self.rx_rtm_newlink(msg)
504
505 elif msg.msgtype == RTM_DELLINK:
506
507 # We will use ifname_by_index to display the interface name in debug output
508 if msg.ifindex in self.ifname_by_index:
509 del self.ifname_by_index[msg.ifindex]
510 self.rx_rtm_dellink(msg)
511
512 elif msg.msgtype == RTM_NEWADDR:
513 self.rx_rtm_newaddr(msg)
514
515 elif msg.msgtype == RTM_DELADDR:
516 self.rx_rtm_deladdr(msg)
517
518 elif msg.msgtype == RTM_NEWNEIGH:
519 self.rx_rtm_newneigh(msg)
520
521 elif msg.msgtype == RTM_DELNEIGH:
522 self.rx_rtm_delneigh(msg)
523
524 elif msg.msgtype == RTM_NEWROUTE:
525 self.rx_rtm_newroute(msg)
526
527 elif msg.msgtype == RTM_DELROUTE:
528 self.rx_rtm_delroute(msg)
529
530 else:
531 log.warning('RXed unknown netlink message type %s' % msgtype)
532
533 if processed:
534 self.netlinkq = self.netlinkq[processed:]
535
536 # too chatty
537 # for msgtype in msg_count:
538 # log.debug('RXed %d %s messages' % (msg_count[msgtype], NetlinkPacket.type_to_string[msgtype]))