]>
Commit | Line | Data |
---|---|---|
a650f503 DM |
1 | //! Inter-process reader-writer lock builder. |
2 | //! | |
add5861e | 3 | //! This implementation uses fcntl record locks with non-blocking |
a650f503 | 4 | //! F_SETLK command (never blocks). |
11861a48 DM |
5 | //! |
6 | //! We maintain a map of shared locks with time stamps, so you can get | |
7 | //! the timestamp for the oldest open lock with | |
8 | //! `oldest_shared_lock()`. | |
a650f503 | 9 | |
11861a48 | 10 | use std::collections::HashMap; |
83771aa0 WB |
11 | use std::os::unix::io::AsRawFd; |
12 | use std::sync::{Arc, Mutex}; | |
13 | ||
14 | use anyhow::{bail, Error}; | |
a650f503 DM |
15 | |
16 | // fixme: use F_OFD_ locks when implemented with nix::fcntl | |
17 | ||
18 | // Note: flock lock conversion is not atomic, so we need to use fcntl | |
19 | ||
20 | /// Inter-process reader-writer lock | |
21 | pub struct ProcessLocker { | |
22 | file: std::fs::File, | |
23 | exclusive: bool, | |
24 | writers: usize, | |
11861a48 DM |
25 | next_guard_id: u64, |
26 | shared_guard_list: HashMap<u64, i64>, // guard_id => timestamp | |
a650f503 DM |
27 | } |
28 | ||
29 | /// Lock guard for shared locks | |
30 | /// | |
31 | /// Release the lock when it goes out of scope. | |
32 | pub struct ProcessLockSharedGuard { | |
11861a48 | 33 | guard_id: u64, |
a650f503 DM |
34 | locker: Arc<Mutex<ProcessLocker>>, |
35 | } | |
36 | ||
83771aa0 | 37 | impl Drop for ProcessLockSharedGuard { |
a650f503 DM |
38 | fn drop(&mut self) { |
39 | let mut data = self.locker.lock().unwrap(); | |
40 | ||
83771aa0 WB |
41 | if data.writers == 0 { |
42 | panic!("unexpected ProcessLocker state"); | |
43 | } | |
a650f503 | 44 | |
11861a48 DM |
45 | data.shared_guard_list.remove(&self.guard_id); |
46 | ||
a650f503 | 47 | if data.writers == 1 && !data.exclusive { |
a650f503 DM |
48 | let op = libc::flock { |
49 | l_type: libc::F_UNLCK as i16, | |
50 | l_whence: libc::SEEK_SET as i16, | |
51 | l_start: 0, | |
52 | l_len: 0, | |
53 | l_pid: 0, | |
54 | }; | |
55 | ||
83771aa0 WB |
56 | if let Err(err) = |
57 | nix::fcntl::fcntl(data.file.as_raw_fd(), nix::fcntl::FcntlArg::F_SETLKW(&op)) | |
58 | { | |
a650f503 DM |
59 | panic!("unable to drop writer lock - {}", err); |
60 | } | |
3e2984bc DC |
61 | } |
62 | if data.writers > 0 { | |
63 | data.writers -= 1; | |
a650f503 DM |
64 | } |
65 | } | |
66 | } | |
67 | ||
68 | /// Lock guard for exclusive locks | |
69 | /// | |
70 | /// Release the lock when it goes out of scope. | |
71 | pub struct ProcessLockExclusiveGuard { | |
72 | locker: Arc<Mutex<ProcessLocker>>, | |
73 | } | |
74 | ||
83771aa0 | 75 | impl Drop for ProcessLockExclusiveGuard { |
a650f503 DM |
76 | fn drop(&mut self) { |
77 | let mut data = self.locker.lock().unwrap(); | |
78 | ||
83771aa0 WB |
79 | if !data.exclusive { |
80 | panic!("unexpected ProcessLocker state"); | |
81 | } | |
a650f503 | 82 | |
83771aa0 WB |
83 | let ltype = if data.writers != 0 { |
84 | libc::F_RDLCK | |
85 | } else { | |
86 | libc::F_UNLCK | |
87 | }; | |
a650f503 DM |
88 | let op = libc::flock { |
89 | l_type: ltype as i16, | |
90 | l_whence: libc::SEEK_SET as i16, | |
91 | l_start: 0, | |
92 | l_len: 0, | |
93 | l_pid: 0, | |
94 | }; | |
95 | ||
83771aa0 WB |
96 | if let Err(err) = |
97 | nix::fcntl::fcntl(data.file.as_raw_fd(), nix::fcntl::FcntlArg::F_SETLKW(&op)) | |
98 | { | |
a650f503 DM |
99 | panic!("unable to drop exclusive lock - {}", err); |
100 | } | |
101 | ||
102 | data.exclusive = false; | |
103 | } | |
104 | } | |
105 | ||
106 | impl ProcessLocker { | |
a650f503 DM |
107 | /// Create a new instance for the specified file. |
108 | /// | |
109 | /// This simply creates the file if it does not exist. | |
abfc001f | 110 | pub fn new<P: AsRef<std::path::Path>>(lockfile: P) -> Result<Arc<Mutex<Self>>, Error> { |
a650f503 DM |
111 | let file = std::fs::OpenOptions::new() |
112 | .create(true) | |
113 | .read(true) | |
114 | .write(true) | |
115 | .open(lockfile)?; | |
116 | ||
117 | Ok(Arc::new(Mutex::new(Self { | |
653b1ca1 | 118 | file, |
a650f503 DM |
119 | exclusive: false, |
120 | writers: 0, | |
11861a48 DM |
121 | next_guard_id: 0, |
122 | shared_guard_list: HashMap::new(), | |
a650f503 DM |
123 | }))) |
124 | } | |
125 | ||
126 | fn try_lock(file: &std::fs::File, ltype: i32) -> Result<(), Error> { | |
a650f503 DM |
127 | let op = libc::flock { |
128 | l_type: ltype as i16, | |
129 | l_whence: libc::SEEK_SET as i16, | |
130 | l_start: 0, | |
131 | l_len: 0, | |
132 | l_pid: 0, | |
133 | }; | |
134 | ||
135 | nix::fcntl::fcntl(file.as_raw_fd(), nix::fcntl::FcntlArg::F_SETLK(&op))?; | |
136 | ||
137 | Ok(()) | |
138 | } | |
139 | ||
add5861e | 140 | /// Try to acquire a shared lock |
a650f503 | 141 | /// |
add5861e | 142 | /// On success, this makes sure that no other process can get an exclusive lock for the file. |
a650f503 | 143 | pub fn try_shared_lock(locker: Arc<Mutex<Self>>) -> Result<ProcessLockSharedGuard, Error> { |
a650f503 DM |
144 | let mut data = locker.lock().unwrap(); |
145 | ||
146 | if data.writers == 0 && !data.exclusive { | |
147 | if let Err(err) = Self::try_lock(&data.file, libc::F_RDLCK) { | |
148 | bail!("unable to get shared lock - {}", err); | |
149 | } | |
150 | } | |
151 | ||
152 | data.writers += 1; | |
153 | ||
83771aa0 WB |
154 | let guard = ProcessLockSharedGuard { |
155 | locker: locker.clone(), | |
156 | guard_id: data.next_guard_id, | |
157 | }; | |
11861a48 DM |
158 | data.next_guard_id += 1; |
159 | ||
160 | let now = unsafe { libc::time(std::ptr::null_mut()) }; | |
161 | ||
162 | data.shared_guard_list.insert(guard.guard_id, now); | |
163 | ||
164 | Ok(guard) | |
165 | } | |
166 | ||
167 | /// Get oldest shared lock timestamp | |
168 | pub fn oldest_shared_lock(locker: Arc<Mutex<Self>>) -> Option<i64> { | |
169 | let mut result = None; | |
170 | ||
171 | let data = locker.lock().unwrap(); | |
172 | ||
62ee2eb4 | 173 | for v in data.shared_guard_list.values() { |
11861a48 DM |
174 | result = match result { |
175 | None => Some(*v), | |
83771aa0 WB |
176 | Some(x) => { |
177 | if x < *v { | |
178 | Some(x) | |
179 | } else { | |
180 | Some(*v) | |
181 | } | |
182 | } | |
11861a48 DM |
183 | }; |
184 | } | |
185 | ||
186 | result | |
a650f503 DM |
187 | } |
188 | ||
add5861e | 189 | /// Try to acquire a exclusive lock |
a650f503 DM |
190 | /// |
191 | /// Make sure the we are the only process which has locks for this file (shared or exclusive). | |
83771aa0 WB |
192 | pub fn try_exclusive_lock( |
193 | locker: Arc<Mutex<Self>>, | |
194 | ) -> Result<ProcessLockExclusiveGuard, Error> { | |
a650f503 DM |
195 | let mut data = locker.lock().unwrap(); |
196 | ||
197 | if data.exclusive { | |
198 | bail!("already locked exclusively"); | |
199 | } | |
200 | ||
201 | if let Err(err) = Self::try_lock(&data.file, libc::F_WRLCK) { | |
202 | bail!("unable to get exclusive lock - {}", err); | |
203 | } | |
204 | ||
205 | data.exclusive = true; | |
206 | ||
83771aa0 WB |
207 | Ok(ProcessLockExclusiveGuard { |
208 | locker: locker.clone(), | |
209 | }) | |
a650f503 DM |
210 | } |
211 | } |