]>
Commit | Line | Data |
---|---|---|
35cf5daa DM |
1 | use failure::*; |
2 | use std::path::{Path, PathBuf}; | |
128b37fe DM |
3 | use std::io::Write; |
4 | ||
5 | use crypto::digest::Digest; | |
6 | use crypto::sha2::Sha512Trunc256; | |
c5d82e5f DM |
7 | use std::sync::Mutex; |
8 | ||
45773720 DM |
9 | use std::fs::{File, OpenOptions}; |
10 | use nix::fcntl::{flock, FlockArg}; | |
11 | use std::os::unix::io::AsRawFd; | |
35cf5daa DM |
12 | |
13 | pub struct ChunkStore { | |
14 | base: PathBuf, | |
15 | chunk_dir: PathBuf, | |
128b37fe | 16 | hasher: Sha512Trunc256, |
c5d82e5f | 17 | mutex: Mutex<bool>, |
45773720 | 18 | lockfile: File, |
128b37fe DM |
19 | } |
20 | ||
21 | const HEX_CHARS: &'static [u8; 16] = b"0123456789abcdef"; | |
22 | ||
23 | fn u256_to_hex(digest: &[u8; 32]) -> String { | |
24 | ||
25 | let mut buf = Vec::<u8>::with_capacity(64); | |
26 | ||
27 | for i in 0..32 { | |
28 | buf.push(HEX_CHARS[(digest[i] >> 4) as usize]); | |
29 | buf.push(HEX_CHARS[(digest[i] & 0xf) as usize]); | |
30 | } | |
31 | ||
32 | unsafe { String::from_utf8_unchecked(buf) } | |
35cf5daa DM |
33 | } |
34 | ||
128b37fe DM |
35 | fn u256_to_prefix(digest: &[u8; 32]) -> PathBuf { |
36 | ||
37 | let mut buf = Vec::<u8>::with_capacity(3+1+2+1); | |
38 | ||
39 | buf.push(HEX_CHARS[(digest[0] as usize) >> 4]); | |
40 | buf.push(HEX_CHARS[(digest[0] as usize) &0xf]); | |
41 | buf.push(HEX_CHARS[(digest[1] as usize) >> 4]); | |
42 | buf.push('/' as u8); | |
35cf5daa | 43 | |
128b37fe DM |
44 | buf.push(HEX_CHARS[(digest[1] as usize) & 0xf]); |
45 | buf.push(HEX_CHARS[(digest[2] as usize) >> 4]); | |
46 | buf.push('/' as u8); | |
47 | ||
48 | let path = unsafe { String::from_utf8_unchecked(buf)}; | |
49 | ||
50 | path.into() | |
35cf5daa DM |
51 | } |
52 | ||
12bb93b3 DM |
53 | fn lock_file<P: AsRef<Path>>(filename: P, timeout: usize) -> Result<File, Error> { |
54 | ||
55 | let path = filename.as_ref(); | |
56 | let lockfile = match OpenOptions::new() | |
57 | .create(true) | |
58 | .append(true) | |
59 | .open(path) { | |
60 | Ok(file) => file, | |
61 | Err(err) => bail!("Unable to open lock {:?} - {}", | |
62 | path, err), | |
63 | }; | |
64 | ||
65 | let fd = lockfile.as_raw_fd(); | |
66 | ||
67 | let now = std::time::SystemTime::now(); | |
68 | let mut print_msg = true; | |
69 | loop { | |
70 | match flock(fd, FlockArg::LockExclusiveNonblock) { | |
71 | Ok(_) => break, | |
72 | Err(_) => { | |
73 | if print_msg { | |
74 | print_msg = false; | |
75 | eprintln!("trying to aquire lock..."); | |
76 | } | |
77 | } | |
78 | } | |
79 | ||
80 | match now.elapsed() { | |
81 | Ok(elapsed) => { | |
82 | if elapsed.as_secs() >= (timeout as u64) { | |
83 | bail!("unable to aquire lock {:?} - got timeout", path); | |
84 | } | |
85 | } | |
86 | Err(err) => { | |
87 | bail!("unable to aquire lock {:?} - clock problems - {}", path, err); | |
88 | } | |
89 | } | |
461e62fc | 90 | std::thread::sleep(std::time::Duration::from_millis(100)); |
12bb93b3 DM |
91 | } |
92 | Ok(lockfile) | |
93 | } | |
94 | ||
35cf5daa DM |
95 | impl ChunkStore { |
96 | ||
45773720 DM |
97 | fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf { |
98 | ||
99 | let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref()); | |
100 | chunk_dir.push(".chunks"); | |
101 | ||
102 | chunk_dir | |
103 | } | |
104 | ||
35cf5daa DM |
105 | pub fn create<P: Into<PathBuf>>(path: P) -> Result<Self, Error> { |
106 | ||
45773720 DM |
107 | let base: PathBuf = path.into(); |
108 | let chunk_dir = Self::chunk_dir(&base); | |
35cf5daa | 109 | |
2989f6bf DM |
110 | if let Err(err) = std::fs::create_dir(&base) { |
111 | bail!("unable to create chunk store {:?} - {}", base, err); | |
112 | } | |
113 | ||
114 | if let Err(err) = std::fs::create_dir(&chunk_dir) { | |
115 | bail!("unable to create chunk store subdir {:?} - {}", chunk_dir, err); | |
116 | } | |
35cf5daa | 117 | |
128b37fe DM |
118 | // create 4096 subdir |
119 | for i in 0..4096 { | |
45773720 | 120 | let mut l1path = base.clone(); |
128b37fe | 121 | l1path.push(format!("{:03x}",i)); |
2989f6bf DM |
122 | if let Err(err) = std::fs::create_dir(&l1path) { |
123 | bail!("unable to create chunk subdir {:?} - {}", l1path, err); | |
124 | } | |
35cf5daa DM |
125 | } |
126 | ||
45773720 | 127 | Self::open(base) |
35cf5daa DM |
128 | } |
129 | ||
130 | pub fn open<P: Into<PathBuf>>(path: P) -> Result<Self, Error> { | |
131 | ||
45773720 DM |
132 | let base: PathBuf = path.into(); |
133 | let chunk_dir = Self::chunk_dir(&base); | |
134 | ||
2989f6bf DM |
135 | let metadata = match std::fs::metadata(&chunk_dir) { |
136 | Ok(data) => data, | |
137 | Err(err) => bail!("unable to open chunk store {:?} - {}", chunk_dir, err), | |
138 | }; | |
45773720 DM |
139 | |
140 | let mut lockfile_path = base.clone(); | |
141 | lockfile_path.push(".lock"); | |
142 | ||
12bb93b3 | 143 | let lockfile = lock_file(lockfile_path, 10)?; |
35cf5daa | 144 | |
b8d4766a DM |
145 | Ok(ChunkStore { |
146 | base, | |
147 | chunk_dir, | |
148 | hasher: Sha512Trunc256::new(), | |
149 | lockfile, | |
150 | mutex: Mutex::new(false) | |
151 | }) | |
35cf5daa DM |
152 | } |
153 | ||
391a2e43 | 154 | pub fn insert_chunk(&mut self, chunk: &[u8]) -> Result<(bool, [u8; 32]), Error> { |
128b37fe DM |
155 | |
156 | self.hasher.reset(); | |
157 | self.hasher.input(chunk); | |
158 | ||
159 | let mut digest = [0u8; 32]; | |
160 | self.hasher.result(&mut digest); | |
161 | println!("DIGEST {}", u256_to_hex(&digest)); | |
162 | ||
163 | let mut chunk_path = self.base.clone(); | |
128b37fe | 164 | let prefix = u256_to_prefix(&digest); |
c5d82e5f DM |
165 | chunk_path.push(&prefix); |
166 | let digest_str = u256_to_hex(&digest); | |
167 | chunk_path.push(&digest_str); | |
168 | ||
169 | let lock = self.mutex.lock(); | |
170 | ||
171 | if let Ok(metadata) = std::fs::metadata(&chunk_path) { | |
172 | if metadata.is_file() { | |
391a2e43 | 173 | return Ok((true, digest)); |
c5d82e5f DM |
174 | } else { |
175 | bail!("Got unexpected file type for chunk {}", digest_str); | |
176 | } | |
177 | } | |
128b37fe | 178 | |
c5d82e5f DM |
179 | let mut chunk_dir = self.base.clone(); |
180 | chunk_dir.push(&prefix); | |
128b37fe | 181 | |
c5d82e5f | 182 | if let Err(_) = std::fs::create_dir(&chunk_dir) { /* ignore */ } |
128b37fe | 183 | |
c5d82e5f DM |
184 | let mut tmp_path = chunk_path.clone(); |
185 | tmp_path.set_extension("tmp"); | |
186 | let mut f = std::fs::File::create(&tmp_path)?; | |
128b37fe DM |
187 | f.write_all(chunk)?; |
188 | ||
c5d82e5f DM |
189 | if let Err(err) = std::fs::rename(&tmp_path, &chunk_path) { |
190 | if let Err(_) = std::fs::remove_file(&tmp_path) { /* ignore */ } | |
191 | bail!("Atomic rename failed for chunk {} - {}", digest_str, err); | |
192 | } | |
193 | ||
128b37fe DM |
194 | println!("PATH {:?}", chunk_path); |
195 | ||
c5d82e5f DM |
196 | drop(lock); |
197 | ||
391a2e43 | 198 | Ok((false, digest)) |
128b37fe DM |
199 | } |
200 | ||
35cf5daa DM |
201 | } |
202 | ||
203 | ||
204 | #[test] | |
205 | fn test_chunk_store1() { | |
206 | ||
207 | if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ } | |
208 | ||
209 | let chunk_store = ChunkStore::open(".testdir"); | |
210 | assert!(chunk_store.is_err()); | |
211 | ||
128b37fe | 212 | let mut chunk_store = ChunkStore::create(".testdir").unwrap(); |
391a2e43 DM |
213 | let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap(); |
214 | assert!(!exists); | |
215 | ||
216 | let (exists, _) = chunk_store.insert_chunk(&[0u8, 1u8]).unwrap(); | |
217 | assert!(exists); | |
128b37fe | 218 | |
35cf5daa DM |
219 | |
220 | let chunk_store = ChunkStore::create(".testdir"); | |
221 | assert!(chunk_store.is_err()); | |
222 | ||
223 | ||
224 | } |