]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "common/dout.h" | |
16 | #include "common/errno.h" | |
17 | #include "AsyncCompressor.h" | |
18 | ||
19 | #define dout_subsys ceph_subsys_compressor | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << "compressor " | |
22 | ||
23 | AsyncCompressor::AsyncCompressor(CephContext *c): | |
24 | compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c), | |
7c673cae FG |
25 | compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"), |
26 | job_lock("AsyncCompressor::job_lock"), | |
27 | compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) { | |
28 | } | |
29 | ||
30 | void AsyncCompressor::init() | |
31 | { | |
32 | ldout(cct, 10) << __func__ << dendl; | |
33 | compress_tp.start(); | |
34 | } | |
35 | ||
36 | void AsyncCompressor::terminate() | |
37 | { | |
38 | ldout(cct, 10) << __func__ << dendl; | |
39 | compress_tp.stop(); | |
40 | } | |
41 | ||
42 | uint64_t AsyncCompressor::async_compress(bufferlist &data) | |
43 | { | |
31f18b77 | 44 | uint64_t id = ++job_id; |
7c673cae FG |
45 | pair<unordered_map<uint64_t, Job>::iterator, bool> it; |
46 | { | |
47 | Mutex::Locker l(job_lock); | |
48 | it = jobs.insert(make_pair(id, Job(id, true))); | |
49 | it.first->second.data = data; | |
50 | } | |
51 | compress_wq.queue(&it.first->second); | |
52 | ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl; | |
53 | return id; | |
54 | } | |
55 | ||
56 | uint64_t AsyncCompressor::async_decompress(bufferlist &data) | |
57 | { | |
31f18b77 | 58 | uint64_t id = ++job_id; |
7c673cae FG |
59 | pair<unordered_map<uint64_t, Job>::iterator, bool> it; |
60 | { | |
61 | Mutex::Locker l(job_lock); | |
62 | it = jobs.insert(make_pair(id, Job(id, false))); | |
63 | it.first->second.data = data; | |
64 | } | |
65 | compress_wq.queue(&it.first->second); | |
66 | ldout(cct, 10) << __func__ << " insert async decompress job id=" << id << dendl; | |
67 | return id; | |
68 | } | |
69 | ||
70 | int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished) | |
71 | { | |
72 | assert(finished); | |
73 | Mutex::Locker l(job_lock); | |
74 | unordered_map<uint64_t, Job>::iterator it = jobs.find(compress_id); | |
75 | if (it == jobs.end() || !it->second.is_compress) { | |
76 | ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl; | |
77 | return -ENOENT; | |
78 | } | |
7c673cae FG |
79 | |
80 | retry: | |
31f18b77 FG |
81 | auto status = it->second.status.load(); |
82 | if (status == status_t::DONE) { | |
7c673cae FG |
83 | ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl; |
84 | *finished = true; | |
85 | data.swap(it->second.data); | |
86 | jobs.erase(it); | |
31f18b77 | 87 | } else if (status == status_t::ERROR) { |
7c673cae FG |
88 | ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl; |
89 | jobs.erase(it); | |
90 | return -EIO; | |
91 | } else if (blocking) { | |
31f18b77 FG |
92 | auto expected = status_t::WAIT; |
93 | if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) { | |
7c673cae FG |
94 | ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl; |
95 | if (compressor->compress(it->second.data, data)) { | |
96 | ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl; | |
31f18b77 | 97 | it->second.status = status_t::ERROR; |
7c673cae FG |
98 | return -EIO; |
99 | } | |
100 | *finished = true; | |
101 | } else { | |
102 | job_lock.Unlock(); | |
103 | usleep(1000); | |
104 | job_lock.Lock(); | |
105 | goto retry; | |
106 | } | |
107 | } else { | |
108 | ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl; | |
109 | *finished = false; | |
110 | } | |
111 | return 0; | |
112 | } | |
113 | ||
114 | int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished) | |
115 | { | |
116 | assert(finished); | |
117 | Mutex::Locker l(job_lock); | |
118 | unordered_map<uint64_t, Job>::iterator it = jobs.find(decompress_id); | |
119 | if (it == jobs.end() || it->second.is_compress) { | |
120 | ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl; | |
121 | return -ENOENT; | |
122 | } | |
7c673cae | 123 | retry: |
31f18b77 FG |
124 | auto status = it->second.status.load(); |
125 | if (status == status_t::DONE) { | |
7c673cae FG |
126 | ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl; |
127 | *finished = true; | |
128 | data.swap(it->second.data); | |
129 | jobs.erase(it); | |
31f18b77 | 130 | } else if (status == status_t::ERROR) { |
7c673cae FG |
131 | ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl; |
132 | jobs.erase(it); | |
133 | return -EIO; | |
134 | } else if (blocking) { | |
31f18b77 FG |
135 | auto expected = status_t::WAIT; |
136 | if (it->second.status.compare_exchange_strong(expected, status_t::DONE)) { | |
7c673cae FG |
137 | ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl; |
138 | if (compressor->decompress(it->second.data, data)) { | |
139 | ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl; | |
31f18b77 | 140 | it->second.status = status_t::ERROR; |
7c673cae FG |
141 | return -EIO; |
142 | } | |
143 | *finished = true; | |
144 | } else { | |
145 | job_lock.Unlock(); | |
146 | usleep(1000); | |
147 | job_lock.Lock(); | |
148 | goto retry; | |
149 | } | |
150 | } else { | |
151 | ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't finished."<< dendl; | |
152 | *finished = false; | |
153 | } | |
154 | return 0; | |
155 | } |