]> git.proxmox.com Git - mirror_ifupdown2.git/blame - nlmanager/nllistener.py
addons: tunnel: Fix (re)creation of tunnelsof any kind.
[mirror_ifupdown2.git] / nlmanager / nllistener.py
CommitLineData
198ded6a
JF
1#!/usr/bin/env python
2
3from nlpacket import *
4from nlmanager import NetlinkManager
5from select import select
6from struct import pack, unpack, calcsize
7from threading import Thread, Event, Lock
8from Queue import Queue
9import logging
10import socket
11
12log = logging.getLogger(__name__)
13
14
15class NetlinkListener(Thread):
16
a61d1777 17 def __init__(self, manager, groups, pid_offset=1):
198ded6a
JF
18 """
19 groups controls what types of messages we are interested in hearing
20 To get everything pass:
21 RTMGRP_LINK | \
22 RTMGRP_IPV4_IFADDR | \
23 RTMGRP_IPV4_ROUTE | \
24 RTMGRP_IPV6_IFADDR | \
25 RTMGRP_IPV6_ROUTE
26 """
27 Thread.__init__(self)
28 self.manager = manager
29 self.shutdown_event = Event()
30 self.groups = groups
9f25ff0d 31 self.pid_offset = pid_offset
198ded6a
JF
32
33 def __str__(self):
34 return 'NetlinkListener'
35
36 def run(self):
37 manager = self.manager
38 header_PACK = 'IHHII'
39 header_LEN = calcsize(header_PACK)
40
41 # The RX socket is used to listen to all netlink messages that fly by
42 # as things change in the kernel. We need a very large SO_RCVBUF here
43 # else we tend to miss messages.
9f25ff0d
SE
44 # PID_MAX_LIMIT is 2^22 allowing 1024 sockets per-pid. We default to
45 # use 2 in the upper space (top 10 bits) instead of 0 to avoid conflicts
46 # with the netlink manager which always attempts to bind with the pid.
198ded6a
JF
47 self.rx_socket = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, 0)
48 self.rx_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10000000)
9f25ff0d 49 self.rx_socket.bind((manager.pid | (self.pid_offset << 22), self.groups))
198ded6a
JF
50 self.rx_socket_prev_seq = {}
51
52 if not manager.tx_socket:
53 manager.tx_socket_allocate()
54
55 my_sockets = (manager.tx_socket, self.rx_socket)
56
57 socket_string = {
58 manager.tx_socket: "TX",
59 self.rx_socket: "RX"
60 }
61
62 supported_messages = (RTM_NEWLINK, RTM_DELLINK, RTM_NEWADDR,
63 RTM_DELADDR, RTM_NEWNEIGH, RTM_DELNEIGH,
64 RTM_NEWROUTE, RTM_DELROUTE)
65
66 ignore_messages = (RTM_GETLINK, RTM_GETADDR, RTM_GETNEIGH,
67 RTM_GETROUTE, RTM_GETQDISC, NLMSG_ERROR, NLMSG_DONE)
68
69 while True:
70
71 if self.shutdown_event.is_set():
72 log.info("%s: shutting down" % self)
73 return
74
d3d4f288 75 # Only block for 1 second so we can wake up to see if shutdown_event is set
198ded6a
JF
76 try:
77 (readable, writeable, exceptional) = select(my_sockets, [], my_sockets, 1)
78 except Exception as e:
79 log.error('select() error: ' + str(e))
80 continue
81
82 if not readable:
83 continue
84
85 set_alarm = False
86 set_tx_socket_rxed_ack_alarm = False
198ded6a
JF
87
88 for s in readable:
d3d4f288 89 data = []
198ded6a
JF
90
91 try:
92 data = s.recv(4096)
198ded6a
JF
93 except Exception as e:
94 log.error('recv() error: ' + str(e))
198ded6a
JF
95 continue
96
97 total_length = len(data)
98 while data:
99
100 # Extract the length, etc from the header
101 (length, msgtype, flags, seq, pid) = unpack(header_PACK, data[:header_LEN])
102
ddafc33d
JF
103 log.debug('%s %s: RXed %s seq %d, pid %d, %d bytes (%d total)' %
104 (self, socket_string[s], NetlinkPacket.type_to_string[msgtype],
105 seq, pid, length, total_length))
106 possible_ack = False
107
108 if msgtype == NLMSG_DONE:
109 possible_ack = True
110
111 elif msgtype == NLMSG_ERROR:
112 possible_ack = True
198ded6a 113
d3d4f288
JF
114 # The error code is a signed negative number.
115 error_code = abs(unpack('=i', data[header_LEN:header_LEN+4])[0])
116 msg = Error(msgtype, True)
117 msg.decode_packet(length, flags, seq, pid, data)
198ded6a
JF
118
119 if error_code:
d3d4f288 120 log.debug("%s %s: RXed NLMSG_ERROR code %s (%d)" % (self, socket_string[s], msg.error_to_string.get(error_code), error_code))
198ded6a 121
ddafc33d
JF
122 if possible_ack and seq == manager.target_seq and pid == manager.target_pid:
123 log.debug("%s %s: Setting RXed ACK alarm for seq %d, pid %d" %
124 (self, socket_string[s], seq, pid))
198ded6a
JF
125 set_tx_socket_rxed_ack_alarm = True
126
127 # Put the message on the manager's netlinkq
128 if msgtype in supported_messages:
129 set_alarm = True
130 manager.netlinkq.append((msgtype, length, flags, seq, pid, data[0:length]))
131
132 # There are certain message types we do not care about
133 # (RTM_GETs for example)
134 elif msgtype in ignore_messages:
135 pass
136
137 # And there are certain message types we have not added
138 # support for yet (QDISC). Log an error for these just
139 # as a reminder to add support for them.
140 else:
141 if msgtype in NetlinkPacket.type_to_string:
d3d4f288
JF
142 log.warning('%s %s: RXed unsupported message %s (type %d)' %
143 (self, socket_string[s], NetlinkPacket.type_to_string[msgtype], msgtype))
198ded6a 144 else:
d3d4f288
JF
145 log.warning('%s %s: RXed unknown message type %d' %
146 (self, socket_string[s], msgtype))
198ded6a
JF
147
148 # Track the previous PID sequence number for RX and TX sockets
149 if s == self.rx_socket:
150 prev_seq = self.rx_socket_prev_seq
151 elif s == manager.tx_socket:
152 prev_seq = manager.tx_socket_prev_seq
153
154 if pid in prev_seq and prev_seq[pid] and prev_seq[pid] != seq and (prev_seq[pid]+1 != seq):
d3d4f288 155 log.debug('%s %s: went from seq %d to %d' % (self, socket_string[s], prev_seq[pid], seq))
198ded6a
JF
156 prev_seq[pid] = seq
157
158 data = data[length:]
159
160 if set_tx_socket_rxed_ack_alarm:
161 manager.target_lock.acquire()
162 manager.target_seq = None
163 manager.target_pid = None
164 manager.target_lock.release()
165 manager.tx_socket_rxed_ack.set()
166
167 if set_alarm:
168 manager.workq.put(('SERVICE_NETLINK_QUEUE', None))
169 manager.alarm.set()
170
171 self.rx_socket.close()
172
173
174class NetlinkManagerWithListener(NetlinkManager):
175
a61d1777
SE
176 def __init__(self, groups, start_listener=True, use_color=True):
177 NetlinkManager.__init__(self, use_color=use_color)
198ded6a
JF
178 self.groups = groups
179 self.workq = Queue()
180 self.netlinkq = []
181 self.alarm = Event()
182 self.shutdown_event = Event()
183 self.tx_socket_rxed_ack = Event()
184 self.tx_socket_rxed_ack.clear()
185 self.target_seq = None
186 self.target_pid = None
187 self.target_seq_pid_debug = False
188 self.target_lock = Lock()
189 self.tx_socket_prev_seq = {}
190 self.debug_listener = False
191 self.debug_seq_pid = {}
192 self.ifname_by_index = {}
193 self.blacklist_filter = {}
194 self.whitelist_filter = {}
195
196 # Listen to netlink messages
197 if start_listener:
198 self.listener = NetlinkListener(self, self.groups)
199 self.listener.start()
200 else:
201 self.listener = None
202
203 def __str__(self):
204 return 'NetlinkManagerWithListener'
205
206 def signal_term_handler(self, signal, frame):
207 log.info("NetlinkManagerWithListener: Caught SIGTERM")
d3d4f288
JF
208
209 if self.listener:
210 self.listener.shutdown_event.set()
211
198ded6a
JF
212 self.shutdown_flag = True # For NetlinkManager shutdown
213 self.shutdown_event.set()
214 self.alarm.set()
215
216 def signal_int_handler(self, signal, frame):
217 log.info("NetlinkManagerWithListener: Caught SIGINT")
d3d4f288
JF
218
219 if self.listener:
220 self.listener.shutdown_event.set()
221
198ded6a
JF
222 self.shutdown_flag = True # For NetlinkManager shutdown
223 self.shutdown_event.set()
224 self.alarm.set()
225
d3d4f288 226 def tx_nlpacket_get_response(self, nlpacket):
198ded6a
JF
227 """
228 TX the message and wait for an ack
229 """
230
231 # NetlinkListener looks at the manager's target_seq and target_pid
232 # to know when we've RXed the ack that we want
233 self.target_lock.acquire()
234 self.target_seq = nlpacket.seq
235 self.target_pid = nlpacket.pid
236 self.target_lock.release()
237
238 if not self.tx_socket:
239 self.tx_socket_allocate()
ddafc33d
JF
240
241 log.debug('%s TX: TXed %s seq %d, pid %d, %d bytes' %
242 (self, NetlinkPacket.type_to_string[nlpacket.msgtype],
243 nlpacket.seq, nlpacket.pid, nlpacket.length))
244
198ded6a
JF
245 self.tx_socket.sendall(nlpacket.message)
246
247 # Wait for NetlinkListener to RX an ACK or DONE for this (seq, pid)
248 self.tx_socket_rxed_ack.wait()
249 self.tx_socket_rxed_ack.clear()
250
251 # These are here to show some basic examples of how one might react to RXing
252 # various netlink message types. Odds are our child class will redefine these
253 # to do more than log a message.
254 def rx_rtm_newlink(self, msg):
d3d4f288
JF
255 log.debug("RXed RTM_NEWLINK seq %d, pid %d, %d bytes, for %s, state %s" %
256 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
198ded6a
JF
257
258 def rx_rtm_dellink(self, msg):
d3d4f288
JF
259 log.debug("RXed RTM_DELLINK seq %d, pid %d, %d bytes, for %s, state %s" %
260 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFLA_IFNAME), "up" if msg.is_up() else "down"))
198ded6a
JF
261
262 def rx_rtm_newaddr(self, msg):
d3d4f288
JF
263 log.debug("RXed RTM_NEWADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
264 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
265
266 def rx_rtm_deladdr(self, msg):
d3d4f288
JF
267 log.debug("RXed RTM_DELADDR seq %d, pid %d, %d bytes, for %s/%d on %s" %
268 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.IFA_ADDRESS), msg.prefixlen, self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
269
270 def rx_rtm_newneigh(self, msg):
d3d4f288
JF
271 log.debug("RXed RTM_NEWNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
272 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
273
274 def rx_rtm_delneigh(self, msg):
d3d4f288
JF
275 log.debug("RXed RTM_DELNEIGH seq %d, pid %d, %d bytes, for %s on %s" %
276 (msg.seq, msg.pid, msg.length, msg.get_attribute_value(msg.NDA_DST), self.ifname_by_index.get(msg.ifindex)))
198ded6a
JF
277
278 def rx_rtm_newroute(self, msg):
d3d4f288
JF
279 log.debug("RXed RTM_NEWROUTE seq %d, pid %d, %d bytes, for %s%s" %
280 (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
198ded6a
JF
281
282 def rx_rtm_delroute(self, msg):
d3d4f288
JF
283 log.debug("RXed RTM_DELROUTE seq %d, pid %d, %d bytes, for %s%s" %
284 (msg.seq, msg.pid, msg.length, msg.get_prefix_string(), msg.get_nexthops_string(self.ifname_by_index)))
198ded6a 285
d3d4f288 286 # Note that tx_nlpacket_get_response will block until NetlinkListener has RXed
198ded6a
JF
287 # an Ack/DONE for the message we TXed
288 def get_all_addresses(self):
289 family = socket.AF_UNSPEC
290 debug = RTM_GETADDR in self.debug
291
a61d1777 292 addr = Address(RTM_GETADDR, debug, use_color=self.use_color)
198ded6a
JF
293 addr.flags = NLM_F_REQUEST | NLM_F_DUMP
294 addr.body = pack('Bxxxi', family, 0)
295 addr.build_message(self.sequence.next(), self.pid)
296
297 if debug:
298 self.debug_seq_pid[(addr.seq, addr.pid)] = True
299
d3d4f288 300 self.tx_nlpacket_get_response(addr)
198ded6a
JF
301
302 def get_all_links(self):
303 family = socket.AF_UNSPEC
304 debug = RTM_GETLINK in self.debug
305
a61d1777 306 link = Link(RTM_GETLINK, debug, use_color=self.use_color)
198ded6a
JF
307 link.flags = NLM_F_REQUEST | NLM_F_DUMP
308 link.body = pack('Bxxxiii', family, 0, 0, 0)
309 link.build_message(self.sequence.next(), self.pid)
310
311 if debug:
312 self.debug_seq_pid[(link.seq, link.pid)] = True
313
d3d4f288 314 self.tx_nlpacket_get_response(link)
198ded6a
JF
315
316 def get_all_neighbors(self):
317 family = socket.AF_UNSPEC
318 debug = RTM_GETNEIGH in self.debug
319
a61d1777 320 neighbor = Neighbor(RTM_GETNEIGH, debug, use_color=self.use_color)
198ded6a
JF
321 neighbor.flags = NLM_F_REQUEST | NLM_F_DUMP
322 neighbor.body = pack('Bxxxii', family, 0, 0)
323 neighbor.build_message(self.sequence.next(), self.pid)
324
325 if debug:
326 self.debug_seq_pid[(neighbor.seq, neighbor.pid)] = True
327
d3d4f288 328 self.tx_nlpacket_get_response(neighbor)
198ded6a
JF
329
330 def get_all_routes(self):
331 family = socket.AF_UNSPEC
332 debug = RTM_GETROUTE in self.debug
333
a61d1777 334 route = Route(RTM_GETROUTE, debug, use_color=self.use_color)
198ded6a
JF
335 route.flags = NLM_F_REQUEST | NLM_F_DUMP
336 route.body = pack('Bxxxii', family, 0, 0)
337 route.build_message(self.sequence.next(), self.pid)
338
339 if debug:
340 self.debug_seq_pid[(route.seq, route.pid)] = True
341
d3d4f288 342 self.tx_nlpacket_get_response(route)
198ded6a
JF
343
344 def nested_attributes_match(self, msg, attr_filter):
345 """
346 attr_filter will be a dictionary such as:
347 attr_filter = {
348 Link.IFLA_LINKINFO: {
349 Link.IFLA_INFO_KIND: 'vlan'
350 }
351 }
352 """
353 for (key, value) in attr_filter.items():
354 if type(value) is dict:
355 if not self.nested_attributes_match(msg, value):
356 return False
357 else:
358 attr_value = msg.get_attribute_value(key)
359 if attr_value != value:
360 return False
361 return True
362
363 def filter_rule_matches(self, msg, rule):
364 field = rule[0]
365 options = rule[1:]
366
367 if field == 'IFINDEX':
368 ifindex = options[0]
369
370 if msg.ifindex == ifindex:
371 return True
372
373 elif field == 'ATTRIBUTE':
374 (attr_type, target_value) = options[0:2]
375 attr_value = msg.get_attribute_value(attr_type)
376
377 if attr_value == target_value:
378 return True
379
380 elif field == 'NESTED_ATTRIBUTE':
381 if self.nested_attributes_match(msg, options[0]):
382 return True
383
384 elif field == 'FAMILY':
385 family = options[0]
386
387 if msg.family == family:
388 return True
389 else:
390 raise Exception("Add support to filter based on %s" % field)
391
392 return False
393
394 def filter_permit(self, msg):
395 """
396 Return True if our whitelist/blacklist filters permit this netlink msg
397 """
398 if msg.msgtype in self.whitelist_filter:
399 found_it = False
400
401 for rule in self.whitelist_filter[msg.msgtype]:
402 if self.filter_rule_matches(msg, rule):
403 found_it = True
404 break
405
406 return found_it
407
408 elif msg.msgtype in self.blacklist_filter:
409 for rule in self.blacklist_filter[msg.msgtype]:
410 if self.filter_rule_matches(msg, rule):
411 return False
412 return True
413
414 else:
415 return True
416
417 def _filter_update(self, add, filter_type, msgtype, filter_guts):
418 assert filter_type in ('whitelist', 'blacklist'), "whitelist and blacklist are the only supported filter options"
419
420 if add:
421 if filter_type == 'whitelist':
422
423 # Keep things simple, do not allow both whitelist and blacklist
424 if self.blacklist_filter and self.blacklist_filter.get(msgtype):
425 raise Exception("whitelist and blacklist filters cannot be used at the same time")
426
427 if msgtype not in self.whitelist_filter:
428 self.whitelist_filter[msgtype] = []
429 self.whitelist_filter[msgtype].append(filter_guts)
430
431 elif filter_type == 'blacklist':
432
433 # Keep things simple, do not allow both whitelist and blacklist
434 if self.whitelist_filter and self.whitelist_filter.get(msgtype):
435 raise Exception("whitelist and blacklist filters cannot be used at the same time")
436
437 if msgtype not in self.blacklist_filter:
438 self.blacklist_filter[msgtype] = []
439 self.blacklist_filter[msgtype].append(filter_guts)
440
441 else:
442 if filter_type == 'whitelist':
443 if msgtype in self.whitelist_filter:
444 self.whitelist_filter[msgtype].remove(filter_guts)
445
446 if not self.whitelist_filter[msgtype]:
447 del self.whitelist_filter[msgtype]
448
449 elif filter_type == 'blacklist':
450 if msgtype in self.blacklist_filter:
451 self.blacklist_filter[msgtype].remove(filter_guts)
452
453 if not self.blacklist_filter[msgtype]:
454 del self.blacklist_filter[msgtype]
455
456 def filter_by_address_family(self, add, filter_type, msgtype, family):
457 self._filter_update(add, filter_type, msgtype, ('FAMILY', family))
458
459 def filter_by_ifindex(self, add, filter_type, msgtype, ifindex):
460 self._filter_update(add, filter_type, msgtype, ('IFINDEX', ifindex))
461
462 def filter_by_attribute(self, add, filter_type, msgtype, attribute, attribute_value):
463 self._filter_update(add, filter_type, msgtype, ('ATTRIBUTE', attribute, attribute_value))
464
465 def filter_by_nested_attribute(self, add, filter_type, msgtype, attr_filter):
466 self._filter_update(add, filter_type, msgtype, ('NESTED_ATTRIBUTE', attr_filter))
467
468 def service_netlinkq(self):
469 msg_count = {}
470 processed = 0
471
472 for (msgtype, length, flags, seq, pid, data) in self.netlinkq:
473 processed += 1
474
475 # If this is a reply to a TX message that debugs were enabled for then debug the reply
476 if (seq, pid) in self.debug_seq_pid:
477 debug = True
478 else:
479 debug = self.debug_this_packet(msgtype)
480
481 if msgtype == RTM_NEWLINK or msgtype == RTM_DELLINK:
a61d1777 482 msg = Link(msgtype, debug, use_color=self.use_color)
198ded6a
JF
483
484 elif msgtype == RTM_NEWADDR or msgtype == RTM_DELADDR:
a61d1777 485 msg = Address(msgtype, debug, use_color=self.use_color)
198ded6a
JF
486
487 elif msgtype == RTM_NEWNEIGH or msgtype == RTM_DELNEIGH:
a61d1777 488 msg = Neighbor(msgtype, debug, use_color=self.use_color)
198ded6a
JF
489
490 elif msgtype == RTM_NEWROUTE or msgtype == RTM_DELROUTE:
a61d1777 491 msg = Route(msgtype, debug, use_color=self.use_color)
198ded6a
JF
492
493 else:
494 log.warning('RXed unknown netlink message type %s' % msgtype)
495 continue
496
497 msg.decode_packet(length, flags, seq, pid, data)
498
499 if not self.filter_permit(msg):
500 continue
501
502 if debug:
503 msg.dump()
504
505 # Only used for printing debugs about how many we RXed of each type
506 if msg.msgtype not in msg_count:
507 msg_count[msg.msgtype] = 0
508 msg_count[msg.msgtype] += 1
509
510 # Call the appropriate handler method based on the msgtype. The handler
511 # functions are defined in our child class.
512 if msg.msgtype == RTM_NEWLINK:
513
514 # We will use ifname_by_index to display the interface name in debug output
515 self.ifname_by_index[msg.ifindex] = msg.get_attribute_value(msg.IFLA_IFNAME)
516 self.rx_rtm_newlink(msg)
517
518 elif msg.msgtype == RTM_DELLINK:
519
520 # We will use ifname_by_index to display the interface name in debug output
521 if msg.ifindex in self.ifname_by_index:
522 del self.ifname_by_index[msg.ifindex]
523 self.rx_rtm_dellink(msg)
524
525 elif msg.msgtype == RTM_NEWADDR:
526 self.rx_rtm_newaddr(msg)
527
528 elif msg.msgtype == RTM_DELADDR:
529 self.rx_rtm_deladdr(msg)
530
531 elif msg.msgtype == RTM_NEWNEIGH:
532 self.rx_rtm_newneigh(msg)
533
534 elif msg.msgtype == RTM_DELNEIGH:
535 self.rx_rtm_delneigh(msg)
536
537 elif msg.msgtype == RTM_NEWROUTE:
538 self.rx_rtm_newroute(msg)
539
540 elif msg.msgtype == RTM_DELROUTE:
541 self.rx_rtm_delroute(msg)
542
543 else:
544 log.warning('RXed unknown netlink message type %s' % msgtype)
545
546 if processed:
547 self.netlinkq = self.netlinkq[processed:]
548
549 # too chatty
550 # for msgtype in msg_count:
551 # log.debug('RXed %d %s messages' % (msg_count[msgtype], NetlinkPacket.type_to_string[msgtype]))