]> git.proxmox.com Git - mirror_ifupdown2.git/blame - nlmanager/nllistener.py
addons: bond: 'ifquery -c' doesn't re-order user's bond-slaves list
[mirror_ifupdown2.git] / nlmanager / nllistener.py
CommitLineData
198ded6a
JF
1#!/usr/bin/env python
2
3from nlpacket import *
4from nlmanager import NetlinkManager
5from select import select
6from struct import pack, unpack, calcsize
7from threading import Thread, Event, Lock
8from Queue import Queue
9import logging
10import socket
11
12log = logging.getLogger(__name__)
13
14
15class NetlinkListener(Thread):
16
17 def __init__(self, manager, groups):
18 """
19 groups controls what types of messages we are interested in hearing
20 To get everything pass:
21 RTMGRP_LINK | \
22 RTMGRP_IPV4_IFADDR | \
23 RTMGRP_IPV4_ROUTE | \
24 RTMGRP_IPV6_IFADDR | \
25 RTMGRP_IPV6_ROUTE
26 """
27 Thread.__init__(self)
28 self.manager = manager
29 self.shutdown_event = Event()
30 self.groups = groups
31
32 def __str__(self):
33 return 'NetlinkListener'
34
35 def run(self):
36 manager = self.manager
37 header_PACK = 'IHHII'
38 header_LEN = calcsize(header_PACK)
39
40 # The RX socket is used to listen to all netlink messages that fly by
41 # as things change in the kernel. We need a very large SO_RCVBUF here
42 # else we tend to miss messages.
43 self.rx_socket = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, 0)
44 self.rx_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10000000)
45 self.rx_socket.bind((manager.pid+1, self.groups))
46 self.rx_socket_prev_seq = {}
47
48 if not manager.tx_socket:
49 manager.tx_socket_allocate()
50
51 my_sockets = (manager.tx_socket, self.rx_socket)
52
53 socket_string = {
54 manager.tx_socket: "TX",
55 self.rx_socket: "RX"
56 }
57
58 supported_messages = (RTM_NEWLINK, RTM_DELLINK, RTM_NEWADDR,
59 RTM_DELADDR, RTM_NEWNEIGH, RTM_DELNEIGH,
60 RTM_NEWROUTE, RTM_DELROUTE)
61
62 ignore_messages = (RTM_GETLINK, RTM_GETADDR, RTM_GETNEIGH,
63 RTM_GETROUTE, RTM_GETQDISC, NLMSG_ERROR, NLMSG_DONE)
64
65 while True:
66
67 if self.shutdown_event.is_set():
68 log.info("%s: shutting down" % self)
69 return
70
d3d4f288 71 # Only block for 1 second so we can wake up to see if shutdown_event is set
198ded6a
JF
72 try:
73 (readable, writeable, exceptional) = select(my_sockets, [], my_sockets, 1)
74 except Exception as e:
75 log.error('select() error: ' + str(e))
76 continue
77
78 if not readable:
79 continue
80
81 set_alarm = False
82 set_tx_socket_rxed_ack_alarm = False
198ded6a
JF
83
84 for s in readable:
d3d4f288 85 data = []
198ded6a
JF
86
87 try:
88 data = s.recv(4096)
198ded6a
JF
89 except Exception as e:
90 log.error('recv() error: ' + str(e))
198ded6a
JF
91 continue
92
93 total_length = len(data)
94 while data:
95
96 # Extract the length, etc from the header
97 (length, msgtype, flags, seq, pid) = unpack(header_PACK, data[:header_LEN])
98
ddafc33d
JF
99 log.debug('%s %s: RXed %s seq %d, pid %d, %d bytes (%d total)' %
100 (self, socket_string[s], NetlinkPacket.type_to_string[msgtype],
101 seq, pid, length, total_length))
102 possible_ack = False
103
104 if msgtype == NLMSG_DONE:
105 possible_ack = True
106
107 elif msgtype == NLMSG_ERROR:
108 possible_ack = True
198ded6a 109
d3d4f288
JF
110 # The error code is a signed negative number.
111 error_code = abs(unpack('=i', data[header_LEN:header_LEN+4])[0])
112 msg = Error(msgtype, True)
113 msg.decode_packet(length, flags, seq, pid, data)
198ded6a
JF
114
115 if error_code:
d3d4f288 116 log.debug("%s %s: RXed NLMSG_ERROR code %s (%d)" % (self, socket_string[s], msg.error_to_string.get(error_code), error_code))
198ded6a 117
ddafc33d
JF
118 if possible_ack and seq == manager.target_seq and pid == manager.target_pid:
119 log.debug("%s %s: Setting RXed ACK alarm for seq %d, pid %d" %
120 (self, socket_string[s], seq, pid))
198ded6a
JF
121 set_tx_socket_rxed_ack_alarm = True
122
123 # Put the message on the manager's netlinkq
124 if msgtype in supported_messages:
125 set_alarm = True
126 manager.netlinkq.append((msgtype, length, flags, seq, pid, data[0:length]))
127
128 # There are certain message types we do not care about
129 # (RTM_GETs for example)
130 elif msgtype in ignore_messages:
131 pass
132
133 # And there are certain message types we have not added
134 # support for yet (QDISC). Log an error for these just
135 # as a reminder to add support for them.
136 else:
137 if msgtype in NetlinkPacket.type_to_string:
d3d4f288
JF
138 log.warning('%s %s: RXed unsupported message %s (type %d)' %
139 (self, socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype))
198ded6a 140 else:
d3d4f288
JF
141 log.warning('%s %s: RXed unknown message type %d' %
142 (self, socket_string[s], msgtype))
198ded6a
JF
143
144 # Track the previous PID sequence number for RX and TX sockets
145 if s == self.rx_socket:
146 prev_seq = self.rx_socket_prev_seq
147 elif s == manager.tx_socket:
148 prev_seq = manager.tx_socket_prev_seq
149
150 if pid in prev_seq and prev_seq[pid] and prev_seq[pid] != seq and (prev_seq[pid]+1 != seq):
d3d4f288 151 log.debug('%s %s: went from seq %d to %d' % (self, socket_string[s], prev_seq[pid], seq))
198ded6a
JF
152 prev_seq[pid] = seq
153
154 data = data[length:]
155
156 if set_tx_socket_rxed_ack_alarm:
157 manager.target_lock.acquire()
158 manager.target_seq = None
159 manager.target_pid = None
160 manager.target_lock.release()
161 manager.tx_socket_rxed_ack.set()
162
163 if set_alarm:
164 manager.workq.put(('SERVICE_NETLINK_QUEUE', None))
165 manager.alarm.set()
166
167 self.rx_socket.close()
168
169
170class NetlinkManagerWithListener(NetlinkManager):
171
172 def __init__(self, groups, start_listener=True):
173 NetlinkManager.__init__(self)
174 self.groups = groups
175 self.workq = Queue()
176 self.netlinkq = []
177 self.alarm = Event()
178 self.shutdown_event = Event()
179 self.tx_socket_rxed_ack = Event()
180 self.tx_socket_rxed_ack.clear()
181 self.target_seq = None
182 self.target_pid = None
183 self.target_seq_pid_debug = False
184 self.target_lock = Lock()
185 self.tx_socket_prev_seq = {}
186 self.debug_listener = False
187 self.debug_seq_pid = {}
188 self.ifname_by_index = {}
189 self.blacklist_filter = {}
190 self.whitelist_filter = {}
191
192 # Listen to netlink messages
193 if start_listener:
194 self.listener = NetlinkListener(self, self.groups)
195 self.listener.start()
196 else:
197 self.listener = None
198
199 def __str__(self):
200 return 'NetlinkManagerWithListener'
201
202 def signal_term_handler(self, signal, frame):
203 log.info("NetlinkManagerWithListener: Caught SIGTERM")
d3d4f288
JF
204
205 if self.listener:
206 self.listener.shutdown_event.set()
207
198ded6a
JF
208 self.shutdown_flag = True # For NetlinkManager shutdown
209 self.shutdown_event.set()
210 self.alarm.set()
211
212 def signal_int_handler(self, signal, frame):
213 log.info("NetlinkManagerWithListener: Caught SIGINT")
d3d4f288
JF
214
215 if self.listener:
216 self.listener.shutdown_event.set()
217
198ded6a
JF
218 self.shutdown_flag = True # For NetlinkManager shutdown
219 self.shutdown_event.set()
220 self.alarm.set()
221
d3d4f288 222 def tx_nlpacket_get_response(self, nlpacket):
198ded6a
JF
223 """
224 TX the message and wait for an ack
225 """
226
227 # NetlinkListener looks at the manager's target_seq and target_pid
228 # to know when we've RXed the ack that we want
229 self.target_lock.acquire()
230 self.target_seq = nlpacket.seq
231 self.target_pid = nlpacket.pid
232 self.target_lock.release()
233
234 if not self.tx_socket:
235 self.tx_socket_allocate()
ddafc33d
JF
236
237 log.debug('%s TX: TXed %s seq %d, pid %d, %d bytes' %
238 (self, NetlinkPacket.type_to_string[nlpacket.msgtype],
239 nlpacket.seq, nlpacket.pid, nlpacket.length))
240
198ded6a
JF
241 self.tx_socket.sendall(nlpacket.message)
242
243 # Wait for NetlinkListener to RX an ACK or DONE for this (seq, pid)
244 self.tx_socket_rxed_ack.wait()
245 self.tx_socket_rxed_ack.clear()
246
247 # These are here to show some basic examples of how one might react to RXing
248 # various netlink message types. Odds are our child class will redefine these
249 # to do more than log a message.
250 def rx_rtm_newlink(self, msg):
d3d4f288
JF
251 log.debug("RXed RTM_NEWLINK seq %d, pid %d, %d bytes, for %s, state %s" %
252 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
198ded6a
JF
253
254 def rx_rtm_dellink(self, msg):
d3d4f288
JF
255 log.debug("RXed RTM_DELLINK seq %d, pid %d, %d bytes, for %s, state %s" %
256 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
198ded6a
JF
257
258 def rx_rtm_newaddr(self, msg):
d3d4f288
JF
259 log.debug("RXed RTM_NEWADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
260 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
261
262 def rx_rtm_deladdr(self, msg):
d3d4f288
JF
263 log.debug("RXed RTM_DELADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
264 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
265
266 def rx_rtm_newneigh(self, msg):
d3d4f288
JF
267 log.debug("RXed RTM_NEWNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
268 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
269
270 def rx_rtm_delneigh(self, msg):
d3d4f288
JF
271 log.debug("RXed RTM_DELNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
272 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
273
274 def rx_rtm_newroute(self, msg):
d3d4f288
JF
275 log.debug("RXed RTM_NEWROUTE seq %d, pid %d, %d bytes, for %s%s" %
276 (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
198ded6a
JF
277
278 def rx_rtm_delroute(self, msg):
d3d4f288
JF
279 log.debug("RXed RTM_DELROUTE seq %d, pid %d, %d bytes, for %s%s" %
280 (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
198ded6a 281
d3d4f288 282 # Note that tx_nlpacket_get_response will block until NetlinkListener has RXed
198ded6a
JF
283 # an Ack/DONE for the message we TXed
284 def get_all_addresses(self):
285 family = socket.AF_UNSPEC
286 debug = RTM_GETADDR in self.debug
287
288 addr = Address(RTM_GETADDR, debug)
289 addr.flags = NLM_F_REQUEST | NLM_F_DUMP
290 addr.body = pack('Bxxxi', family, 0)
291 addr.build_message(self.sequence.next(), self.pid)
292
293 if debug:
294 self.debug_seq_pid[(addr.seq, addr.pid)] = True
295
d3d4f288 296 self.tx_nlpacket_get_response(addr)
198ded6a
JF
297
298 def get_all_links(self):
299 family = socket.AF_UNSPEC
300 debug = RTM_GETLINK in self.debug
301
302 link = Link(RTM_GETLINK, debug)
303 link.flags = NLM_F_REQUEST | NLM_F_DUMP
304 link.body = pack('Bxxxiii', family, 0, 0, 0)
305 link.build_message(self.sequence.next(), self.pid)
306
307 if debug:
308 self.debug_seq_pid[(link.seq, link.pid)] = True
309
d3d4f288 310 self.tx_nlpacket_get_response(link)
198ded6a
JF
311
312 def get_all_neighbors(self):
313 family = socket.AF_UNSPEC
314 debug = RTM_GETNEIGH in self.debug
315
316 neighbor = Neighbor(RTM_GETNEIGH, debug)
317 neighbor.flags = NLM_F_REQUEST | NLM_F_DUMP
318 neighbor.body = pack('Bxxxii', family, 0, 0)
319 neighbor.build_message(self.sequence.next(), self.pid)
320
321 if debug:
322 self.debug_seq_pid[(neighbor.seq, neighbor.pid)] = True
323
d3d4f288 324 self.tx_nlpacket_get_response(neighbor)
198ded6a
JF
325
326 def get_all_routes(self):
327 family = socket.AF_UNSPEC
328 debug = RTM_GETROUTE in self.debug
329
330 route = Route(RTM_GETROUTE, debug)
331 route.flags = NLM_F_REQUEST | NLM_F_DUMP
332 route.body = pack('Bxxxii', family, 0, 0)
333 route.build_message(self.sequence.next(), self.pid)
334
335 if debug:
336 self.debug_seq_pid[(route.seq, route.pid)] = True
337
d3d4f288 338 self.tx_nlpacket_get_response(route)
198ded6a
JF
339
340 def nested_attributes_match(self, msg, attr_filter):
341 """
342 attr_filter will be a dictionary such as:
343 attr_filter = {
344 Link.IFLA_LINKINFO: {
345 Link.IFLA_INFO_KIND: 'vlan'
346 }
347 }
348 """
349 for (key, value) in attr_filter.items():
350 if type(value) is dict:
351 if not self.nested_attributes_match(msg, value):
352 return False
353 else:
354 attr_value = msg.get_attribute_value(key)
355 if attr_value != value:
356 return False
357 return True
358
359 def filter_rule_matches(self, msg, rule):
360 field = rule[0]
361 options = rule[1:]
362
363 if field == 'IFINDEX':
364 ifindex = options[0]
365
366 if msg.ifindex == ifindex:
367 return True
368
369 elif field == 'ATTRIBUTE':
370 (attr_type, target_value) = options[0:2]
371 attr_value = msg.get_attribute_value(attr_type)
372
373 if attr_value == target_value:
374 return True
375
376 elif field == 'NESTED_ATTRIBUTE':
377 if self.nested_attributes_match(msg, options[0]):
378 return True
379
380 elif field == 'FAMILY':
381 family = options[0]
382
383 if msg.family == family:
384 return True
385 else:
386 raise Exception("Add support to filter based on %s" % field)
387
388 return False
389
390 def filter_permit(self, msg):
391 """
392 Return True if our whitelist/blacklist filters permit this netlink msg
393 """
394 if msg.msgtype in self.whitelist_filter:
395 found_it = False
396
397 for rule in self.whitelist_filter[msg.msgtype]:
398 if self.filter_rule_matches(msg, rule):
399 found_it = True
400 break
401
402 return found_it
403
404 elif msg.msgtype in self.blacklist_filter:
405 for rule in self.blacklist_filter[msg.msgtype]:
406 if self.filter_rule_matches(msg, rule):
407 return False
408 return True
409
410 else:
411 return True
412
413 def _filter_update(self, add, filter_type, msgtype, filter_guts):
414 assert filter_type in ('whitelist', 'blacklist'), "whitelist and blacklist are the only supported filter options"
415
416 if add:
417 if filter_type == 'whitelist':
418
419 # Keep things simple, do not allow both whitelist and blacklist
420 if self.blacklist_filter and self.blacklist_filter.get(msgtype):
421 raise Exception("whitelist and blacklist filters cannot be used at the same time")
422
423 if msgtype not in self.whitelist_filter:
424 self.whitelist_filter[msgtype] = []
425 self.whitelist_filter[msgtype].append(filter_guts)
426
427 elif filter_type == 'blacklist':
428
429 # Keep things simple, do not allow both whitelist and blacklist
430 if self.whitelist_filter and self.whitelist_filter.get(msgtype):
431 raise Exception("whitelist and blacklist filters cannot be used at the same time")
432
433 if msgtype not in self.blacklist_filter:
434 self.blacklist_filter[msgtype] = []
435 self.blacklist_filter[msgtype].append(filter_guts)
436
437 else:
438 if filter_type == 'whitelist':
439 if msgtype in self.whitelist_filter:
440 self.whitelist_filter[msgtype].remove(filter_guts)
441
442 if not self.whitelist_filter[msgtype]:
443 del self.whitelist_filter[msgtype]
444
445 elif filter_type == 'blacklist':
446 if msgtype in self.blacklist_filter:
447 self.blacklist_filter[msgtype].remove(filter_guts)
448
449 if not self.blacklist_filter[msgtype]:
450 del self.blacklist_filter[msgtype]
451
452 def filter_by_address_family(self, add, filter_type, msgtype, family):
453 self._filter_update(add, filter_type, msgtype, ('FAMILY', family))
454
455 def filter_by_ifindex(self, add, filter_type, msgtype, ifindex):
456 self._filter_update(add, filter_type, msgtype, ('IFINDEX', ifindex))
457
458 def filter_by_attribute(self, add, filter_type, msgtype, attribute, attribute_value):
459 self._filter_update(add, filter_type, msgtype, ('ATTRIBUTE', attribute, attribute_value))
460
461 def filter_by_nested_attribute(self, add, filter_type, msgtype, attr_filter):
462 self._filter_update(add, filter_type, msgtype, ('NESTED_ATTRIBUTE', attr_filter))
463
464 def service_netlinkq(self):
465 msg_count = {}
466 processed = 0
467
468 for (msgtype, length, flags, seq, pid, data) in self.netlinkq:
469 processed += 1
470
471 # If this is a reply to a TX message that debugs were enabled for then debug the reply
472 if (seq, pid) in self.debug_seq_pid:
473 debug = True
474 else:
475 debug = self.debug_this_packet(msgtype)
476
477 if msgtype == RTM_NEWLINK or msgtype == RTM_DELLINK:
478 msg = Link(msgtype, debug)
479
480 elif msgtype == RTM_NEWADDR or msgtype == RTM_DELADDR:
481 msg = Address(msgtype, debug)
482
483 elif msgtype == RTM_NEWNEIGH or msgtype == RTM_DELNEIGH:
484 msg = Neighbor(msgtype, debug)
485
486 elif msgtype == RTM_NEWROUTE or msgtype == RTM_DELROUTE:
487 msg = Route(msgtype, debug)
488
489 else:
490 log.warning('RXed unknown netlink message type %s' % msgtype)
491 continue
492
493 msg.decode_packet(length, flags, seq, pid, data)
494
495 if not self.filter_permit(msg):
496 continue
497
498 if debug:
499 msg.dump()
500
501 # Only used for printing debugs about how many we RXed of each type
502 if msg.msgtype not in msg_count:
503 msg_count[msg.msgtype] = 0
504 msg_count[msg.msgtype] += 1
505
506 # Call the appropriate handler method based on the msgtype. The handler
507 # functions are defined in our child class.
508 if msg.msgtype == RTM_NEWLINK:
509
510 # We will use ifname_by_index to display the interface name in debug output
511 self.ifname_by_index[msg.ifindex] = msg.get_attribute_value(msg.IFLA_IFNAME)
512 self.rx_rtm_newlink(msg)
513
514 elif msg.msgtype == RTM_DELLINK:
515
516 # We will use ifname_by_index to display the interface name in debug output
517 if msg.ifindex in self.ifname_by_index:
518 del self.ifname_by_index[msg.ifindex]
519 self.rx_rtm_dellink(msg)
520
521 elif msg.msgtype == RTM_NEWADDR:
522 self.rx_rtm_newaddr(msg)
523
524 elif msg.msgtype == RTM_DELADDR:
525 self.rx_rtm_deladdr(msg)
526
527 elif msg.msgtype == RTM_NEWNEIGH:
528 self.rx_rtm_newneigh(msg)
529
530 elif msg.msgtype == RTM_DELNEIGH:
531 self.rx_rtm_delneigh(msg)
532
533 elif msg.msgtype == RTM_NEWROUTE:
534 self.rx_rtm_newroute(msg)
535
536 elif msg.msgtype == RTM_DELROUTE:
537 self.rx_rtm_delroute(msg)
538
539 else:
540 log.warning('RXed unknown netlink message type %s' % msgtype)
541
542 if processed:
543 self.netlinkq = self.netlinkq[processed:]
544
545 # too chatty
546 # for msgtype in msg_count:
547 # log.debug('RXed %d %s messages' % (msg_count[msgtype], NetlinkPacket.type_to_string[msgtype]))