]>
git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/mpi/test/python/nonblocking_test.py
2 # Andreas Kloeckner <inform -at- tiker.net>
4 # Use, modification and distribution is subject to the Boost Software
5 # License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
6 # http://www.boost.org/LICENSE_1_0.txt)
8 # Authors: Andreas Kloeckner
10 from __future__
import print_function
19 TAG_PROGRESS_REPORT
= 3
24 class TagGroupListener
:
25 """Class to help listen for only a given set of tags.
27 This is contrived: Typicallly you could just listen for
28 mpi.any_tag and filter."""
29 def __init__(self
, comm
, tags
):
32 self
.active_requests
= {}
36 if tag
not in self
.active_requests
:
37 self
.active_requests
[tag
] = self
.comm
.irecv(tag
=tag
)
38 requests
= mpi
.RequestList(self
.active_requests
.values())
39 data
, status
, index
= mpi
.wait_any(requests
)
40 del self
.active_requests
[status
.tag
]
44 for r
in self
.active_requests
.itervalues():
47 self
.active_requests
= {}
52 sent_histories
= (mpi
.size
-1)*15
53 print ("sending %d packets on their way" % sent_histories
)
54 send_reqs
= mpi
.RequestList()
55 for i
in range(sent_histories
):
56 dest
= random
.randrange(1, mpi
.size
)
57 send_reqs
.append(mpi
.world
.isend(dest
, TAG_DATA
, []))
59 mpi
.wait_all(send_reqs
)
61 completed_histories
= []
65 tgl
= TagGroupListener(mpi
.world
,
66 [TAG_DATA
, TAG_DEBUG
, TAG_PROGRESS_REPORT
, TAG_TERMINATE
])
69 for i
in progress_reports
.values():
70 if i
!= sent_histories
:
72 return len(dead_kids
) == mpi
.size
-1
75 status
, data
= tgl
.wait()
77 if status
.tag
== TAG_DATA
:
78 #print ("received completed history %s from %d" % (data, status.source))
79 completed_histories
.append(data
)
80 if len(completed_histories
) == sent_histories
:
81 print ("all histories received, exiting")
82 for rank
in range(1, mpi
.size
):
83 mpi
.world
.send(rank
, TAG_TERMINATE
, None)
84 elif status
.tag
== TAG_PROGRESS_REPORT
:
85 progress_reports
[len(data
)] = progress_reports
.get(len(data
), 0) + 1
86 elif status
.tag
== TAG_DEBUG
:
87 print ("[DBG %d] %s" % (status
.source
, data
))
88 elif status
.tag
== TAG_TERMINATE
:
89 dead_kids
.append(status
.source
)
91 print ("unexpected tag %d from %d" % (status
.tag
, status
.source
))
100 data
, status
= mpi
.world
.recv(return_status
=True)
101 if status
.tag
== TAG_DATA
:
102 mpi
.world
.send(0, TAG_PROGRESS_REPORT
, data
)
103 data
.append(mpi
.rank
)
104 if len(data
) >= MAX_GENERATIONS
:
107 dest
= random
.randrange(1, mpi
.size
)
108 mpi
.world
.send(dest
, TAG_DATA
, data
)
109 elif status
.tag
== TAG_TERMINATE
:
110 from time
import sleep
111 mpi
.world
.send(0, TAG_TERMINATE
, 0)
114 print ("[DIRECTDBG %d] unexpected tag %d from %d" % (mpi
.rank
, status
.tag
, status
.source
))
118 # this program sends around messages consisting of lists of visited nodes
119 # randomly. After MAX_GENERATIONS, they are returned to rank 0.
128 if __name__
== "__main__":