]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/dpdk/examples/ipsec-secgw/test/tun_null_header_reconstruct.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / dpdk / examples / ipsec-secgw / test / tun_null_header_reconstruct.py
CommitLineData
f67539c2
TL
1#!/usr/bin/env python3
2# SPDX-License-Identifier: BSD-3-Clause
3# Copyright(c) 2019 Intel Corporation
4
5from scapy.all import *
6import unittest
7import pkttest
8
9#{ipv4{ipv4}} test
10SRC_ADDR_IPV4_1 = "192.168.1.1"
11DST_ADDR_IPV4_1 = "192.168.2.1"
12
13#{ipv6{ipv6}} test
14SRC_ADDR_IPV6_1 = "1111:0000:0000:0000:0000:0000:0000:0001"
15DST_ADDR_IPV6_1 = "2222:0000:0000:0000:0000:0000:0000:0001"
16
17#{ipv4{ipv6}} test
18SRC_ADDR_IPV4_2 = "192.168.11.1"
19DST_ADDR_IPV4_2 = "192.168.12.1"
20SRC_ADDR_IPV6_2 = "1111:0000:0000:0000:0000:0000:0001:0001"
21DST_ADDR_IPV6_2 = "2222:0000:0000:0000:0000:0000:0001:0001"
22
23#{ipv6{ipv4}} test
24SRC_ADDR_IPV4_3 = "192.168.21.1"
25DST_ADDR_IPV4_3 = "192.168.22.1"
26SRC_ADDR_IPV6_3 = "1111:0000:0000:0000:0000:0001:0001:0001"
27DST_ADDR_IPV6_3 = "2222:0000:0000:0000:0000:0001:0001:0001"
28
29def config():
30 return """
31#outter-ipv4 inner-ipv4 tunnel mode test
32sp ipv4 out esp protect 5 pri 1 \\
33src {0}/32 \\
34dst {1}/32 \\
35sport 0:65535 dport 0:65535
36
37sp ipv4 in esp protect 6 pri 1 \\
38src {1}/32 \\
39dst {0}/32 \\
40sport 0:65535 dport 0:65535
41
42sa out 5 cipher_algo null auth_algo null mode ipv4-tunnel \\
43src {0} dst {1}
44sa in 6 cipher_algo null auth_algo null mode ipv4-tunnel \\
45src {1} dst {0}
46
47rt ipv4 dst {0}/32 port 1
48rt ipv4 dst {1}/32 port 0
49
50#outter-ipv6 inner-ipv6 tunnel mode test
51sp ipv6 out esp protect 7 pri 1 \\
52src {2}/128 \\
53dst {3}/128 \\
54sport 0:65535 dport 0:65535
55
56sp ipv6 in esp protect 8 pri 1 \\
57src {3}/128 \\
58dst {2}/128 \\
59sport 0:65535 dport 0:65535
60
61sa out 7 cipher_algo null auth_algo null mode ipv6-tunnel \\
62src {2} dst {3}
63sa in 8 cipher_algo null auth_algo null mode ipv6-tunnel \\
64src {3} dst {2}
65
66rt ipv6 dst {2}/128 port 1
67rt ipv6 dst {3}/128 port 0
68
69#outter-ipv4 inner-ipv6 tunnel mode test
70sp ipv6 out esp protect 9 pri 1 \\
71src {4}/128 \\
72dst {5}/128 \\
73sport 0:65535 dport 0:65535
74
75sp ipv6 in esp protect 10 pri 1 \\
76src {5}/128 \\
77dst {4}/128 \\
78sport 0:65535 dport 0:65535
79
80sa out 9 cipher_algo null auth_algo null mode ipv4-tunnel \\
81src {6} dst {7}
82sa in 10 cipher_algo null auth_algo null mode ipv4-tunnel \\
83src {7} dst {6}
84
85rt ipv6 dst {4}/128 port 1
86rt ipv4 dst {7}/32 port 0
87
88#outter-ipv6 inner-ipv4 tunnel mode test
89sp ipv4 out esp protect 11 pri 1 \\
90src {8}/32 \\
91dst {9}/32 \\
92sport 0:65535 dport 0:65535
93
94sp ipv4 in esp protect 12 pri 1 \\
95src {9}/32 \\
96dst {8}/32 \\
97sport 0:65535 dport 0:65535
98
99sa out 11 cipher_algo null auth_algo null mode ipv6-tunnel \\
100src {10} dst {11}
101sa in 12 cipher_algo null auth_algo null mode ipv6-tunnel \\
102src {11} dst {10}
103
104rt ipv4 dst {8}/32 port 1
105rt ipv6 dst {11}/128 port 0
106""".format(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
107 SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
108 SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, SRC_ADDR_IPV4_2, DST_ADDR_IPV4_2,
109 SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, SRC_ADDR_IPV6_3, DST_ADDR_IPV6_3)
110
111ECN_ECT0 = 0x02
112ECN_ECT1 = 0x01
113ECN_CE = 0x03
114DSCP_1 = 0x04
115DSCP_3F = 0xFC
116
117class TestTunnelHeaderReconstruct(unittest.TestCase):
118 def setUp(self):
119 self.px = pkttest.PacketXfer()
120 th = IP(src=DST_ADDR_IPV4_1, dst=SRC_ADDR_IPV4_1)
121 self.sa_ipv4v4 = SecurityAssociation(ESP, spi=6, tunnel_header = th)
122
123 th = IPv6(src=DST_ADDR_IPV6_1, dst=SRC_ADDR_IPV6_1)
124 self.sa_ipv6v6 = SecurityAssociation(ESP, spi=8, tunnel_header = th)
125
126 th = IP(src=DST_ADDR_IPV4_2, dst=SRC_ADDR_IPV4_2)
127 self.sa_ipv4v6 = SecurityAssociation(ESP, spi=10, tunnel_header = th)
128
129 th = IPv6(src=DST_ADDR_IPV6_3, dst=SRC_ADDR_IPV6_3)
130 self.sa_ipv6v4 = SecurityAssociation(ESP, spi=12, tunnel_header = th)
131
132 def gen_pkt_plain_ipv4(self, src, dst, tos):
133 pkt = IP(src=src, dst=dst, tos=tos)
134 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
135 return pkt
136
137 def gen_pkt_plain_ipv6(self, src, dst, tc):
138 pkt = IPv6(src=src, dst=dst, tc=tc)
139 pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
140 return pkt
141
142 def gen_pkt_tun_ipv4v4(self, tos_outter, tos_inner):
143 pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_1, SRC_ADDR_IPV4_1,
144 tos_inner)
145 pkt = self.sa_ipv4v4.encrypt(pkt)
146 self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
147 self.assertEqual(pkt[ESP].spi, 6)
148 pkt[IP].tos = tos_outter
149 return pkt
150
151 def gen_pkt_tun_ipv6v6(self, tc_outter, tc_inner):
152 pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_1, SRC_ADDR_IPV6_1,
153 tc_inner)
154 pkt = self.sa_ipv6v6.encrypt(pkt)
155 self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
156 self.assertEqual(pkt[ESP].spi, 8)
157 pkt[IPv6].tc = tc_outter
158 return pkt
159
160 def gen_pkt_tun_ipv4v6(self, tos_outter, tc_inner):
161 pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_2, SRC_ADDR_IPV6_2,
162 tc_inner)
163 pkt = self.sa_ipv4v6.encrypt(pkt)
164 self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
165 self.assertEqual(pkt[ESP].spi, 10)
166 pkt[IP].tos = tos_outter
167 return pkt
168
169 def gen_pkt_tun_ipv6v4(self, tc_outter, tos_inner):
170 pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_3, SRC_ADDR_IPV4_3,
171 tos_inner)
172 pkt = self.sa_ipv6v4.encrypt(pkt)
173 self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
174 self.assertEqual(pkt[ESP].spi, 12)
175 pkt[IPv6].tc = tc_outter
176 return pkt
177
178#RFC4301 5.1.2.1 & 5.1.2.2, outbound packets shall be copied ECN field
179 def test_outb_ipv4v4_ecn(self):
180 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
181 ECN_ECT1)
182 resp = self.px.xfer_unprotected(pkt)
183 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
184 self.assertEqual(resp[ESP].spi, 5)
185 self.assertEqual(resp[IP].tos, ECN_ECT1)
186
187 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
188 ECN_ECT0)
189 resp = self.px.xfer_unprotected(pkt)
190 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
191 self.assertEqual(resp[ESP].spi, 5)
192 self.assertEqual(resp[IP].tos, ECN_ECT0)
193
194 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
195 ECN_CE)
196 resp = self.px.xfer_unprotected(pkt)
197 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
198 self.assertEqual(resp[ESP].spi, 5)
199 self.assertEqual(resp[IP].tos, ECN_CE)
200
201 def test_outb_ipv6v6_ecn(self):
202 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
203 ECN_ECT1)
204 resp = self.px.xfer_unprotected(pkt)
205 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
206 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
207
208 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
209 ECN_ECT0)
210 resp = self.px.xfer_unprotected(pkt)
211 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
212 self.assertEqual(resp[ESP].spi, 7)
213 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
214
215 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
216 ECN_CE)
217 resp = self.px.xfer_unprotected(pkt)
218 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
219 self.assertEqual(resp[ESP].spi, 7)
220 self.assertEqual(resp[IPv6].tc, ECN_CE)
221
222 def test_outb_ipv4v6_ecn(self):
223 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
224 ECN_ECT1)
225 resp = self.px.xfer_unprotected(pkt)
226 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
227 self.assertEqual(resp[IP].tos, ECN_ECT1)
228
229 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
230 ECN_ECT0)
231 resp = self.px.xfer_unprotected(pkt)
232 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
233 self.assertEqual(resp[IP].tos, ECN_ECT0)
234
235 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
236 ECN_CE)
237 resp = self.px.xfer_unprotected(pkt)
238 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
239 self.assertEqual(resp[IP].tos, ECN_CE)
240
241 def test_outb_ipv6v4_ecn(self):
242 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
243 ECN_ECT1)
244 resp = self.px.xfer_unprotected(pkt)
245 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
246 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
247
248 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
249 ECN_ECT0)
250 resp = self.px.xfer_unprotected(pkt)
251 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
252 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
253
254 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
255 ECN_CE)
256 resp = self.px.xfer_unprotected(pkt)
257 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
258 self.assertEqual(resp[IPv6].tc, ECN_CE)
259
260#RFC4301 5.1.2.1 & 5.1.2.2, if outbound packets ECN is CE (0x3), inbound packets
261#ECN is overwritten to CE, otherwise no change
262
263#Outter header not CE, Inner header should be no change
264 def test_inb_ipv4v4_ecn_inner_no_change(self):
265 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_ECT0)
266 resp = self.px.xfer_protected(pkt)
267 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
268 self.assertEqual(resp[IP].tos, ECN_ECT0)
269
270 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT0, ECN_ECT1)
271 resp = self.px.xfer_protected(pkt)
272 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
273 self.assertEqual(resp[IP].tos, ECN_ECT1)
274
275 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_CE)
276 resp = self.px.xfer_protected(pkt)
277 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
278 self.assertEqual(resp[IP].tos, ECN_CE)
279
280 def test_inb_ipv6v6_ecn_inner_no_change(self):
281 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_ECT0)
282 resp = self.px.xfer_protected(pkt)
283 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
284 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
285
286 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT0, ECN_ECT1)
287 resp = self.px.xfer_protected(pkt)
288 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
289 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
290
291 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_CE)
292 resp = self.px.xfer_protected(pkt)
293 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
294 self.assertEqual(resp[IPv6].tc, ECN_CE)
295
296 def test_inb_ipv4v6_ecn_inner_no_change(self):
297 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_ECT0)
298 resp = self.px.xfer_protected(pkt)
299 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
300 self.assertEqual(resp[IPv6].tc, ECN_ECT0)
301
302 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT0, ECN_ECT1)
303 resp = self.px.xfer_protected(pkt)
304 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
305 self.assertEqual(resp[IPv6].tc, ECN_ECT1)
306
307 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_CE)
308 resp = self.px.xfer_protected(pkt)
309 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
310 self.assertEqual(resp[IPv6].tc, ECN_CE)
311
312 def test_inb_ipv6v4_ecn_inner_no_change(self):
313 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_ECT0)
314 resp = self.px.xfer_protected(pkt)
315 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
316 self.assertEqual(resp[IP].tos, ECN_ECT0)
317
318 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT0, ECN_ECT1)
319 resp = self.px.xfer_protected(pkt)
320 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
321 self.assertEqual(resp[IP].tos, ECN_ECT1)
322
323 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_CE)
324 resp = self.px.xfer_protected(pkt)
325 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
326 self.assertEqual(resp[IP].tos, ECN_CE)
327
328#Outter header CE, Inner header should be changed to CE
329 def test_inb_ipv4v4_ecn_inner_change(self):
330 pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT0)
331 resp = self.px.xfer_protected(pkt)
332 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
333 self.assertEqual(resp[IP].tos, ECN_CE)
334
335 pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT1)
336 resp = self.px.xfer_protected(pkt)
337 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
338 self.assertEqual(resp[IP].tos, ECN_CE)
339
340 def test_inb_ipv6v6_ecn_inner_change(self):
341 pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT0)
342 resp = self.px.xfer_protected(pkt)
343 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
344 self.assertEqual(resp[IPv6].tc, ECN_CE)
345
346 pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT1)
347 resp = self.px.xfer_protected(pkt)
348 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
349 self.assertEqual(resp[IPv6].tc, ECN_CE)
350
351 def test_inb_ipv4v6_ecn_inner_change(self):
352 pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT0)
353 resp = self.px.xfer_protected(pkt)
354 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
355 self.assertEqual(resp[IPv6].tc, ECN_CE)
356
357 pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT1)
358 resp = self.px.xfer_protected(pkt)
359 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
360 self.assertEqual(resp[IPv6].tc, ECN_CE)
361
362 def test_inb_ipv6v4_ecn_inner_change(self):
363 pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT0)
364 resp = self.px.xfer_protected(pkt)
365 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
366 self.assertEqual(resp[IP].tos, ECN_CE)
367
368 pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT1)
369 resp = self.px.xfer_protected(pkt)
370 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
371 self.assertEqual(resp[IP].tos, ECN_CE)
372
373#RFC4301 5.1.2.1.5 Outer DS field should be copied from Inner DS field
374 def test_outb_ipv4v4_dscp(self):
375 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
376 DSCP_1)
377 resp = self.px.xfer_unprotected(pkt)
378 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
379 self.assertEqual(resp[ESP].spi, 5)
380 self.assertEqual(resp[IP].tos, DSCP_1)
381
382 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
383 DSCP_3F)
384 resp = self.px.xfer_unprotected(pkt)
385 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
386 self.assertEqual(resp[ESP].spi, 5)
387 self.assertEqual(resp[IP].tos, DSCP_3F)
388
389 def test_outb_ipv6v6_dscp(self):
390 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
391 DSCP_1)
392 resp = self.px.xfer_unprotected(pkt)
393 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
394 self.assertEqual(resp[ESP].spi, 7)
395 self.assertEqual(resp[IPv6].tc, DSCP_1)
396
397 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
398 DSCP_3F)
399 resp = self.px.xfer_unprotected(pkt)
400 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
401 self.assertEqual(resp[ESP].spi, 7)
402 self.assertEqual(resp[IPv6].tc, DSCP_3F)
403
404 def test_outb_ipv4v6_dscp(self):
405 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
406 DSCP_1)
407 resp = self.px.xfer_unprotected(pkt)
408 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
409 self.assertEqual(resp[ESP].spi, 9)
410 self.assertEqual(resp[IP].tos, DSCP_1)
411
412 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
413 DSCP_3F)
414 resp = self.px.xfer_unprotected(pkt)
415 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
416 self.assertEqual(resp[ESP].spi, 9)
417 self.assertEqual(resp[IP].tos, DSCP_3F)
418
419 def test_outb_ipv6v4_dscp(self):
420 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
421 DSCP_1)
422 resp = self.px.xfer_unprotected(pkt)
423 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
424 self.assertEqual(resp[ESP].spi, 11)
425 self.assertEqual(resp[IPv6].tc, DSCP_1)
426
427 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
428 DSCP_3F)
429 resp = self.px.xfer_unprotected(pkt)
430 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
431 self.assertEqual(resp[ESP].spi, 11)
432 self.assertEqual(resp[IPv6].tc, DSCP_3F)
433
434#RFC4301 5.1.2.1.5 Inner DS field should not be affected by Outer DS field
435 def test_inb_ipv4v4_dscp(self):
436 pkt = self.gen_pkt_tun_ipv4v4(DSCP_3F, DSCP_1)
437 resp = self.px.xfer_protected(pkt)
438 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
439 self.assertEqual(resp[IP].tos, DSCP_1)
440
441 pkt = self.gen_pkt_tun_ipv4v4(DSCP_1, DSCP_3F)
442 resp = self.px.xfer_protected(pkt)
443 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
444 self.assertEqual(resp[IP].tos, DSCP_3F)
445
446 def test_inb_ipv6v6_dscp(self):
447 pkt = self.gen_pkt_tun_ipv6v6(DSCP_3F, DSCP_1)
448 resp = self.px.xfer_protected(pkt)
449 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
450 self.assertEqual(resp[IPv6].tc, DSCP_1)
451
452 pkt = self.gen_pkt_tun_ipv6v6(DSCP_1, DSCP_3F)
453 resp = self.px.xfer_protected(pkt)
454 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
455 self.assertEqual(resp[IPv6].tc, DSCP_3F)
456
457 def test_inb_ipv4v6_dscp(self):
458 pkt = self.gen_pkt_tun_ipv4v6(DSCP_3F, DSCP_1)
459 resp = self.px.xfer_protected(pkt)
460 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
461 self.assertEqual(resp[IPv6].tc, DSCP_1)
462
463 pkt = self.gen_pkt_tun_ipv4v6(DSCP_1, DSCP_3F)
464 resp = self.px.xfer_protected(pkt)
465 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
466 self.assertEqual(resp[IPv6].tc, DSCP_3F)
467
468 def test_inb_ipv6v4_dscp(self):
469 pkt = self.gen_pkt_tun_ipv6v4(DSCP_3F, DSCP_1)
470 resp = self.px.xfer_protected(pkt)
471 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
472 self.assertEqual(resp[IP].tos, DSCP_1)
473
474 pkt = self.gen_pkt_tun_ipv6v4(DSCP_1, DSCP_3F)
475 resp = self.px.xfer_protected(pkt)
476 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
477 self.assertEqual(resp[IP].tos, DSCP_3F)
478
479pkttest.pkttest()