]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | #!/usr/bin/env python3 |
2 | # SPDX-License-Identifier: BSD-3-Clause | |
3 | # Copyright(c) 2019 Intel Corporation | |
4 | ||
5 | from scapy.all import * | |
6 | import unittest | |
7 | import pkttest | |
8 | ||
9 | #{ipv4{ipv4}} test | |
10 | SRC_ADDR_IPV4_1 = "192.168.1.1" | |
11 | DST_ADDR_IPV4_1 = "192.168.2.1" | |
12 | ||
13 | #{ipv6{ipv6}} test | |
14 | SRC_ADDR_IPV6_1 = "1111:0000:0000:0000:0000:0000:0000:0001" | |
15 | DST_ADDR_IPV6_1 = "2222:0000:0000:0000:0000:0000:0000:0001" | |
16 | ||
17 | #{ipv4{ipv6}} test | |
18 | SRC_ADDR_IPV4_2 = "192.168.11.1" | |
19 | DST_ADDR_IPV4_2 = "192.168.12.1" | |
20 | SRC_ADDR_IPV6_2 = "1111:0000:0000:0000:0000:0000:0001:0001" | |
21 | DST_ADDR_IPV6_2 = "2222:0000:0000:0000:0000:0000:0001:0001" | |
22 | ||
23 | #{ipv6{ipv4}} test | |
24 | SRC_ADDR_IPV4_3 = "192.168.21.1" | |
25 | DST_ADDR_IPV4_3 = "192.168.22.1" | |
26 | SRC_ADDR_IPV6_3 = "1111:0000:0000:0000:0000:0001:0001:0001" | |
27 | DST_ADDR_IPV6_3 = "2222:0000:0000:0000:0000:0001:0001:0001" | |
28 | ||
29 | def config(): | |
30 | return """ | |
31 | #outter-ipv4 inner-ipv4 tunnel mode test | |
32 | sp ipv4 out esp protect 5 pri 1 \\ | |
33 | src {0}/32 \\ | |
34 | dst {1}/32 \\ | |
35 | sport 0:65535 dport 0:65535 | |
36 | ||
37 | sp ipv4 in esp protect 6 pri 1 \\ | |
38 | src {1}/32 \\ | |
39 | dst {0}/32 \\ | |
40 | sport 0:65535 dport 0:65535 | |
41 | ||
42 | sa out 5 cipher_algo null auth_algo null mode ipv4-tunnel \\ | |
43 | src {0} dst {1} | |
44 | sa in 6 cipher_algo null auth_algo null mode ipv4-tunnel \\ | |
45 | src {1} dst {0} | |
46 | ||
47 | rt ipv4 dst {0}/32 port 1 | |
48 | rt ipv4 dst {1}/32 port 0 | |
49 | ||
50 | #outter-ipv6 inner-ipv6 tunnel mode test | |
51 | sp ipv6 out esp protect 7 pri 1 \\ | |
52 | src {2}/128 \\ | |
53 | dst {3}/128 \\ | |
54 | sport 0:65535 dport 0:65535 | |
55 | ||
56 | sp ipv6 in esp protect 8 pri 1 \\ | |
57 | src {3}/128 \\ | |
58 | dst {2}/128 \\ | |
59 | sport 0:65535 dport 0:65535 | |
60 | ||
61 | sa out 7 cipher_algo null auth_algo null mode ipv6-tunnel \\ | |
62 | src {2} dst {3} | |
63 | sa in 8 cipher_algo null auth_algo null mode ipv6-tunnel \\ | |
64 | src {3} dst {2} | |
65 | ||
66 | rt ipv6 dst {2}/128 port 1 | |
67 | rt ipv6 dst {3}/128 port 0 | |
68 | ||
69 | #outter-ipv4 inner-ipv6 tunnel mode test | |
70 | sp ipv6 out esp protect 9 pri 1 \\ | |
71 | src {4}/128 \\ | |
72 | dst {5}/128 \\ | |
73 | sport 0:65535 dport 0:65535 | |
74 | ||
75 | sp ipv6 in esp protect 10 pri 1 \\ | |
76 | src {5}/128 \\ | |
77 | dst {4}/128 \\ | |
78 | sport 0:65535 dport 0:65535 | |
79 | ||
80 | sa out 9 cipher_algo null auth_algo null mode ipv4-tunnel \\ | |
81 | src {6} dst {7} | |
82 | sa in 10 cipher_algo null auth_algo null mode ipv4-tunnel \\ | |
83 | src {7} dst {6} | |
84 | ||
85 | rt ipv6 dst {4}/128 port 1 | |
86 | rt ipv4 dst {7}/32 port 0 | |
87 | ||
88 | #outter-ipv6 inner-ipv4 tunnel mode test | |
89 | sp ipv4 out esp protect 11 pri 1 \\ | |
90 | src {8}/32 \\ | |
91 | dst {9}/32 \\ | |
92 | sport 0:65535 dport 0:65535 | |
93 | ||
94 | sp ipv4 in esp protect 12 pri 1 \\ | |
95 | src {9}/32 \\ | |
96 | dst {8}/32 \\ | |
97 | sport 0:65535 dport 0:65535 | |
98 | ||
99 | sa out 11 cipher_algo null auth_algo null mode ipv6-tunnel \\ | |
100 | src {10} dst {11} | |
101 | sa in 12 cipher_algo null auth_algo null mode ipv6-tunnel \\ | |
102 | src {11} dst {10} | |
103 | ||
104 | rt ipv4 dst {8}/32 port 1 | |
105 | rt ipv6 dst {11}/128 port 0 | |
106 | """.format(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, | |
107 | SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, | |
108 | SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, SRC_ADDR_IPV4_2, DST_ADDR_IPV4_2, | |
109 | SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, SRC_ADDR_IPV6_3, DST_ADDR_IPV6_3) | |
110 | ||
111 | ECN_ECT0 = 0x02 | |
112 | ECN_ECT1 = 0x01 | |
113 | ECN_CE = 0x03 | |
114 | DSCP_1 = 0x04 | |
115 | DSCP_3F = 0xFC | |
116 | ||
117 | class TestTunnelHeaderReconstruct(unittest.TestCase): | |
118 | def setUp(self): | |
119 | self.px = pkttest.PacketXfer() | |
120 | th = IP(src=DST_ADDR_IPV4_1, dst=SRC_ADDR_IPV4_1) | |
121 | self.sa_ipv4v4 = SecurityAssociation(ESP, spi=6, tunnel_header = th) | |
122 | ||
123 | th = IPv6(src=DST_ADDR_IPV6_1, dst=SRC_ADDR_IPV6_1) | |
124 | self.sa_ipv6v6 = SecurityAssociation(ESP, spi=8, tunnel_header = th) | |
125 | ||
126 | th = IP(src=DST_ADDR_IPV4_2, dst=SRC_ADDR_IPV4_2) | |
127 | self.sa_ipv4v6 = SecurityAssociation(ESP, spi=10, tunnel_header = th) | |
128 | ||
129 | th = IPv6(src=DST_ADDR_IPV6_3, dst=SRC_ADDR_IPV6_3) | |
130 | self.sa_ipv6v4 = SecurityAssociation(ESP, spi=12, tunnel_header = th) | |
131 | ||
132 | def gen_pkt_plain_ipv4(self, src, dst, tos): | |
133 | pkt = IP(src=src, dst=dst, tos=tos) | |
134 | pkt /= UDP(sport=123,dport=456)/Raw(load="abc") | |
135 | return pkt | |
136 | ||
137 | def gen_pkt_plain_ipv6(self, src, dst, tc): | |
138 | pkt = IPv6(src=src, dst=dst, tc=tc) | |
139 | pkt /= UDP(sport=123,dport=456)/Raw(load="abc") | |
140 | return pkt | |
141 | ||
142 | def gen_pkt_tun_ipv4v4(self, tos_outter, tos_inner): | |
143 | pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_1, SRC_ADDR_IPV4_1, | |
144 | tos_inner) | |
145 | pkt = self.sa_ipv4v4.encrypt(pkt) | |
146 | self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP) | |
147 | self.assertEqual(pkt[ESP].spi, 6) | |
148 | pkt[IP].tos = tos_outter | |
149 | return pkt | |
150 | ||
151 | def gen_pkt_tun_ipv6v6(self, tc_outter, tc_inner): | |
152 | pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_1, SRC_ADDR_IPV6_1, | |
153 | tc_inner) | |
154 | pkt = self.sa_ipv6v6.encrypt(pkt) | |
155 | self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP) | |
156 | self.assertEqual(pkt[ESP].spi, 8) | |
157 | pkt[IPv6].tc = tc_outter | |
158 | return pkt | |
159 | ||
160 | def gen_pkt_tun_ipv4v6(self, tos_outter, tc_inner): | |
161 | pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_2, SRC_ADDR_IPV6_2, | |
162 | tc_inner) | |
163 | pkt = self.sa_ipv4v6.encrypt(pkt) | |
164 | self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP) | |
165 | self.assertEqual(pkt[ESP].spi, 10) | |
166 | pkt[IP].tos = tos_outter | |
167 | return pkt | |
168 | ||
169 | def gen_pkt_tun_ipv6v4(self, tc_outter, tos_inner): | |
170 | pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_3, SRC_ADDR_IPV4_3, | |
171 | tos_inner) | |
172 | pkt = self.sa_ipv6v4.encrypt(pkt) | |
173 | self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP) | |
174 | self.assertEqual(pkt[ESP].spi, 12) | |
175 | pkt[IPv6].tc = tc_outter | |
176 | return pkt | |
177 | ||
178 | #RFC4301 5.1.2.1 & 5.1.2.2, outbound packets shall be copied ECN field | |
179 | def test_outb_ipv4v4_ecn(self): | |
180 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, | |
181 | ECN_ECT1) | |
182 | resp = self.px.xfer_unprotected(pkt) | |
183 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
184 | self.assertEqual(resp[ESP].spi, 5) | |
185 | self.assertEqual(resp[IP].tos, ECN_ECT1) | |
186 | ||
187 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, | |
188 | ECN_ECT0) | |
189 | resp = self.px.xfer_unprotected(pkt) | |
190 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
191 | self.assertEqual(resp[ESP].spi, 5) | |
192 | self.assertEqual(resp[IP].tos, ECN_ECT0) | |
193 | ||
194 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, | |
195 | ECN_CE) | |
196 | resp = self.px.xfer_unprotected(pkt) | |
197 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
198 | self.assertEqual(resp[ESP].spi, 5) | |
199 | self.assertEqual(resp[IP].tos, ECN_CE) | |
200 | ||
201 | def test_outb_ipv6v6_ecn(self): | |
202 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, | |
203 | ECN_ECT1) | |
204 | resp = self.px.xfer_unprotected(pkt) | |
205 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
206 | self.assertEqual(resp[IPv6].tc, ECN_ECT1) | |
207 | ||
208 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, | |
209 | ECN_ECT0) | |
210 | resp = self.px.xfer_unprotected(pkt) | |
211 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
212 | self.assertEqual(resp[ESP].spi, 7) | |
213 | self.assertEqual(resp[IPv6].tc, ECN_ECT0) | |
214 | ||
215 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, | |
216 | ECN_CE) | |
217 | resp = self.px.xfer_unprotected(pkt) | |
218 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
219 | self.assertEqual(resp[ESP].spi, 7) | |
220 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
221 | ||
222 | def test_outb_ipv4v6_ecn(self): | |
223 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, | |
224 | ECN_ECT1) | |
225 | resp = self.px.xfer_unprotected(pkt) | |
226 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
227 | self.assertEqual(resp[IP].tos, ECN_ECT1) | |
228 | ||
229 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, | |
230 | ECN_ECT0) | |
231 | resp = self.px.xfer_unprotected(pkt) | |
232 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
233 | self.assertEqual(resp[IP].tos, ECN_ECT0) | |
234 | ||
235 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, | |
236 | ECN_CE) | |
237 | resp = self.px.xfer_unprotected(pkt) | |
238 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
239 | self.assertEqual(resp[IP].tos, ECN_CE) | |
240 | ||
241 | def test_outb_ipv6v4_ecn(self): | |
242 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, | |
243 | ECN_ECT1) | |
244 | resp = self.px.xfer_unprotected(pkt) | |
245 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
246 | self.assertEqual(resp[IPv6].tc, ECN_ECT1) | |
247 | ||
248 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, | |
249 | ECN_ECT0) | |
250 | resp = self.px.xfer_unprotected(pkt) | |
251 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
252 | self.assertEqual(resp[IPv6].tc, ECN_ECT0) | |
253 | ||
254 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, | |
255 | ECN_CE) | |
256 | resp = self.px.xfer_unprotected(pkt) | |
257 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
258 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
259 | ||
260 | #RFC4301 5.1.2.1 & 5.1.2.2, if outbound packets ECN is CE (0x3), inbound packets | |
261 | #ECN is overwritten to CE, otherwise no change | |
262 | ||
263 | #Outter header not CE, Inner header should be no change | |
264 | def test_inb_ipv4v4_ecn_inner_no_change(self): | |
265 | pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_ECT0) | |
266 | resp = self.px.xfer_protected(pkt) | |
267 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
268 | self.assertEqual(resp[IP].tos, ECN_ECT0) | |
269 | ||
270 | pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT0, ECN_ECT1) | |
271 | resp = self.px.xfer_protected(pkt) | |
272 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
273 | self.assertEqual(resp[IP].tos, ECN_ECT1) | |
274 | ||
275 | pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_CE) | |
276 | resp = self.px.xfer_protected(pkt) | |
277 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
278 | self.assertEqual(resp[IP].tos, ECN_CE) | |
279 | ||
280 | def test_inb_ipv6v6_ecn_inner_no_change(self): | |
281 | pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_ECT0) | |
282 | resp = self.px.xfer_protected(pkt) | |
283 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
284 | self.assertEqual(resp[IPv6].tc, ECN_ECT0) | |
285 | ||
286 | pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT0, ECN_ECT1) | |
287 | resp = self.px.xfer_protected(pkt) | |
288 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
289 | self.assertEqual(resp[IPv6].tc, ECN_ECT1) | |
290 | ||
291 | pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_CE) | |
292 | resp = self.px.xfer_protected(pkt) | |
293 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
294 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
295 | ||
296 | def test_inb_ipv4v6_ecn_inner_no_change(self): | |
297 | pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_ECT0) | |
298 | resp = self.px.xfer_protected(pkt) | |
299 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
300 | self.assertEqual(resp[IPv6].tc, ECN_ECT0) | |
301 | ||
302 | pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT0, ECN_ECT1) | |
303 | resp = self.px.xfer_protected(pkt) | |
304 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
305 | self.assertEqual(resp[IPv6].tc, ECN_ECT1) | |
306 | ||
307 | pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_CE) | |
308 | resp = self.px.xfer_protected(pkt) | |
309 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
310 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
311 | ||
312 | def test_inb_ipv6v4_ecn_inner_no_change(self): | |
313 | pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_ECT0) | |
314 | resp = self.px.xfer_protected(pkt) | |
315 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
316 | self.assertEqual(resp[IP].tos, ECN_ECT0) | |
317 | ||
318 | pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT0, ECN_ECT1) | |
319 | resp = self.px.xfer_protected(pkt) | |
320 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
321 | self.assertEqual(resp[IP].tos, ECN_ECT1) | |
322 | ||
323 | pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_CE) | |
324 | resp = self.px.xfer_protected(pkt) | |
325 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
326 | self.assertEqual(resp[IP].tos, ECN_CE) | |
327 | ||
328 | #Outter header CE, Inner header should be changed to CE | |
329 | def test_inb_ipv4v4_ecn_inner_change(self): | |
330 | pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT0) | |
331 | resp = self.px.xfer_protected(pkt) | |
332 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
333 | self.assertEqual(resp[IP].tos, ECN_CE) | |
334 | ||
335 | pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT1) | |
336 | resp = self.px.xfer_protected(pkt) | |
337 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
338 | self.assertEqual(resp[IP].tos, ECN_CE) | |
339 | ||
340 | def test_inb_ipv6v6_ecn_inner_change(self): | |
341 | pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT0) | |
342 | resp = self.px.xfer_protected(pkt) | |
343 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
344 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
345 | ||
346 | pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT1) | |
347 | resp = self.px.xfer_protected(pkt) | |
348 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
349 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
350 | ||
351 | def test_inb_ipv4v6_ecn_inner_change(self): | |
352 | pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT0) | |
353 | resp = self.px.xfer_protected(pkt) | |
354 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
355 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
356 | ||
357 | pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT1) | |
358 | resp = self.px.xfer_protected(pkt) | |
359 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
360 | self.assertEqual(resp[IPv6].tc, ECN_CE) | |
361 | ||
362 | def test_inb_ipv6v4_ecn_inner_change(self): | |
363 | pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT0) | |
364 | resp = self.px.xfer_protected(pkt) | |
365 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
366 | self.assertEqual(resp[IP].tos, ECN_CE) | |
367 | ||
368 | pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT1) | |
369 | resp = self.px.xfer_protected(pkt) | |
370 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
371 | self.assertEqual(resp[IP].tos, ECN_CE) | |
372 | ||
373 | #RFC4301 5.1.2.1.5 Outer DS field should be copied from Inner DS field | |
374 | def test_outb_ipv4v4_dscp(self): | |
375 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, | |
376 | DSCP_1) | |
377 | resp = self.px.xfer_unprotected(pkt) | |
378 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
379 | self.assertEqual(resp[ESP].spi, 5) | |
380 | self.assertEqual(resp[IP].tos, DSCP_1) | |
381 | ||
382 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, | |
383 | DSCP_3F) | |
384 | resp = self.px.xfer_unprotected(pkt) | |
385 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
386 | self.assertEqual(resp[ESP].spi, 5) | |
387 | self.assertEqual(resp[IP].tos, DSCP_3F) | |
388 | ||
389 | def test_outb_ipv6v6_dscp(self): | |
390 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, | |
391 | DSCP_1) | |
392 | resp = self.px.xfer_unprotected(pkt) | |
393 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
394 | self.assertEqual(resp[ESP].spi, 7) | |
395 | self.assertEqual(resp[IPv6].tc, DSCP_1) | |
396 | ||
397 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, | |
398 | DSCP_3F) | |
399 | resp = self.px.xfer_unprotected(pkt) | |
400 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
401 | self.assertEqual(resp[ESP].spi, 7) | |
402 | self.assertEqual(resp[IPv6].tc, DSCP_3F) | |
403 | ||
404 | def test_outb_ipv4v6_dscp(self): | |
405 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, | |
406 | DSCP_1) | |
407 | resp = self.px.xfer_unprotected(pkt) | |
408 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
409 | self.assertEqual(resp[ESP].spi, 9) | |
410 | self.assertEqual(resp[IP].tos, DSCP_1) | |
411 | ||
412 | pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, | |
413 | DSCP_3F) | |
414 | resp = self.px.xfer_unprotected(pkt) | |
415 | self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) | |
416 | self.assertEqual(resp[ESP].spi, 9) | |
417 | self.assertEqual(resp[IP].tos, DSCP_3F) | |
418 | ||
419 | def test_outb_ipv6v4_dscp(self): | |
420 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, | |
421 | DSCP_1) | |
422 | resp = self.px.xfer_unprotected(pkt) | |
423 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
424 | self.assertEqual(resp[ESP].spi, 11) | |
425 | self.assertEqual(resp[IPv6].tc, DSCP_1) | |
426 | ||
427 | pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, | |
428 | DSCP_3F) | |
429 | resp = self.px.xfer_unprotected(pkt) | |
430 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) | |
431 | self.assertEqual(resp[ESP].spi, 11) | |
432 | self.assertEqual(resp[IPv6].tc, DSCP_3F) | |
433 | ||
434 | #RFC4301 5.1.2.1.5 Inner DS field should not be affected by Outer DS field | |
435 | def test_inb_ipv4v4_dscp(self): | |
436 | pkt = self.gen_pkt_tun_ipv4v4(DSCP_3F, DSCP_1) | |
437 | resp = self.px.xfer_protected(pkt) | |
438 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
439 | self.assertEqual(resp[IP].tos, DSCP_1) | |
440 | ||
441 | pkt = self.gen_pkt_tun_ipv4v4(DSCP_1, DSCP_3F) | |
442 | resp = self.px.xfer_protected(pkt) | |
443 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
444 | self.assertEqual(resp[IP].tos, DSCP_3F) | |
445 | ||
446 | def test_inb_ipv6v6_dscp(self): | |
447 | pkt = self.gen_pkt_tun_ipv6v6(DSCP_3F, DSCP_1) | |
448 | resp = self.px.xfer_protected(pkt) | |
449 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
450 | self.assertEqual(resp[IPv6].tc, DSCP_1) | |
451 | ||
452 | pkt = self.gen_pkt_tun_ipv6v6(DSCP_1, DSCP_3F) | |
453 | resp = self.px.xfer_protected(pkt) | |
454 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
455 | self.assertEqual(resp[IPv6].tc, DSCP_3F) | |
456 | ||
457 | def test_inb_ipv4v6_dscp(self): | |
458 | pkt = self.gen_pkt_tun_ipv4v6(DSCP_3F, DSCP_1) | |
459 | resp = self.px.xfer_protected(pkt) | |
460 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
461 | self.assertEqual(resp[IPv6].tc, DSCP_1) | |
462 | ||
463 | pkt = self.gen_pkt_tun_ipv4v6(DSCP_1, DSCP_3F) | |
464 | resp = self.px.xfer_protected(pkt) | |
465 | self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) | |
466 | self.assertEqual(resp[IPv6].tc, DSCP_3F) | |
467 | ||
468 | def test_inb_ipv6v4_dscp(self): | |
469 | pkt = self.gen_pkt_tun_ipv6v4(DSCP_3F, DSCP_1) | |
470 | resp = self.px.xfer_protected(pkt) | |
471 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
472 | self.assertEqual(resp[IP].tos, DSCP_1) | |
473 | ||
474 | pkt = self.gen_pkt_tun_ipv6v4(DSCP_1, DSCP_3F) | |
475 | resp = self.px.xfer_protected(pkt) | |
476 | self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) | |
477 | self.assertEqual(resp[IP].tos, DSCP_3F) | |
478 | ||
479 | pkttest.pkttest() |