]> git.proxmox.com Git - ceph.git/blob - ceph/src/compressor/AsyncCompressor.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / compressor / AsyncCompressor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "common/dout.h"
16 #include "common/errno.h"
17 #include "AsyncCompressor.h"
18
19 #define dout_subsys ceph_subsys_compressor
20 #undef dout_prefix
21 #define dout_prefix *_dout << "compressor "
22
23 AsyncCompressor::AsyncCompressor(CephContext *c):
24 compressor(Compressor::create(c, c->_conf->async_compressor_type)), cct(c),
25 job_id(0),
26 compress_tp(cct, "AsyncCompressor::compressor_tp", "tp_async_compr", cct->_conf->async_compressor_threads, "async_compressor_threads"),
27 job_lock("AsyncCompressor::job_lock"),
28 compress_wq(this, c->_conf->async_compressor_thread_timeout, c->_conf->async_compressor_thread_suicide_timeout, &compress_tp) {
29 }
30
31 void AsyncCompressor::init()
32 {
33 ldout(cct, 10) << __func__ << dendl;
34 compress_tp.start();
35 }
36
37 void AsyncCompressor::terminate()
38 {
39 ldout(cct, 10) << __func__ << dendl;
40 compress_tp.stop();
41 }
42
43 uint64_t AsyncCompressor::async_compress(bufferlist &data)
44 {
45 uint64_t id = job_id.inc();
46 pair<unordered_map<uint64_t, Job>::iterator, bool> it;
47 {
48 Mutex::Locker l(job_lock);
49 it = jobs.insert(make_pair(id, Job(id, true)));
50 it.first->second.data = data;
51 }
52 compress_wq.queue(&it.first->second);
53 ldout(cct, 10) << __func__ << " insert async compress job id=" << id << dendl;
54 return id;
55 }
56
57 uint64_t AsyncCompressor::async_decompress(bufferlist &data)
58 {
59 uint64_t id = job_id.inc();
60 pair<unordered_map<uint64_t, Job>::iterator, bool> it;
61 {
62 Mutex::Locker l(job_lock);
63 it = jobs.insert(make_pair(id, Job(id, false)));
64 it.first->second.data = data;
65 }
66 compress_wq.queue(&it.first->second);
67 ldout(cct, 10) << __func__ << " insert async decompress job id=" << id << dendl;
68 return id;
69 }
70
71 int AsyncCompressor::get_compress_data(uint64_t compress_id, bufferlist &data, bool blocking, bool *finished)
72 {
73 assert(finished);
74 Mutex::Locker l(job_lock);
75 unordered_map<uint64_t, Job>::iterator it = jobs.find(compress_id);
76 if (it == jobs.end() || !it->second.is_compress) {
77 ldout(cct, 10) << __func__ << " missing to get compress job id=" << compress_id << dendl;
78 return -ENOENT;
79 }
80 int status;
81
82 retry:
83 status = it->second.status.read();
84 if (status == DONE) {
85 ldout(cct, 20) << __func__ << " successfully getting compressed data, job id=" << compress_id << dendl;
86 *finished = true;
87 data.swap(it->second.data);
88 jobs.erase(it);
89 } else if (status == ERROR) {
90 ldout(cct, 20) << __func__ << " compressed data failed, job id=" << compress_id << dendl;
91 jobs.erase(it);
92 return -EIO;
93 } else if (blocking) {
94 if (it->second.status.compare_and_swap(WAIT, DONE)) {
95 ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished, abort!"<< dendl;
96 if (compressor->compress(it->second.data, data)) {
97 ldout(cct, 1) << __func__ << " compress job id=" << compress_id << " failed!"<< dendl;
98 it->second.status.set(ERROR);
99 return -EIO;
100 }
101 *finished = true;
102 } else {
103 job_lock.Unlock();
104 usleep(1000);
105 job_lock.Lock();
106 goto retry;
107 }
108 } else {
109 ldout(cct, 10) << __func__ << " compress job id=" << compress_id << " hasn't finished."<< dendl;
110 *finished = false;
111 }
112 return 0;
113 }
114
115 int AsyncCompressor::get_decompress_data(uint64_t decompress_id, bufferlist &data, bool blocking, bool *finished)
116 {
117 assert(finished);
118 Mutex::Locker l(job_lock);
119 unordered_map<uint64_t, Job>::iterator it = jobs.find(decompress_id);
120 if (it == jobs.end() || it->second.is_compress) {
121 ldout(cct, 10) << __func__ << " missing to get decompress job id=" << decompress_id << dendl;
122 return -ENOENT;
123 }
124 int status;
125
126 retry:
127 status = it->second.status.read();
128 if (status == DONE) {
129 ldout(cct, 20) << __func__ << " successfully getting decompressed data, job id=" << decompress_id << dendl;
130 *finished = true;
131 data.swap(it->second.data);
132 jobs.erase(it);
133 } else if (status == ERROR) {
134 ldout(cct, 20) << __func__ << " compressed data failed, job id=" << decompress_id << dendl;
135 jobs.erase(it);
136 return -EIO;
137 } else if (blocking) {
138 if (it->second.status.compare_and_swap(WAIT, DONE)) {
139 ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't started, abort!"<< dendl;
140 if (compressor->decompress(it->second.data, data)) {
141 ldout(cct, 1) << __func__ << " decompress job id=" << decompress_id << " failed!"<< dendl;
142 it->second.status.set(ERROR);
143 return -EIO;
144 }
145 *finished = true;
146 } else {
147 job_lock.Unlock();
148 usleep(1000);
149 job_lock.Lock();
150 goto retry;
151 }
152 } else {
153 ldout(cct, 10) << __func__ << " decompress job id=" << decompress_id << " hasn't finished."<< dendl;
154 *finished = false;
155 }
156 return 0;
157 }