]>
git.proxmox.com Git - ceph.git/blob - ceph/src/compressor/AsyncCompressor.h
7ca8fad04d6804886e9c74c3570b9811f796d607
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Haomai Wang <haomaiwang@gmail.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_ASYNCCOMPRESSOR_H
16 #define CEPH_ASYNCCOMPRESSOR_H
22 #include "include/str_list.h"
24 #include "Compressor.h"
25 #include "common/WorkQueue.h"
27 class AsyncCompressor
{
29 CompressorRef compressor
;
31 std::atomic
<uint64_t> job_id
{ 0 };
33 ThreadPool compress_tp
;
44 std::atomic
<status_t
> status
{ status_t::WAIT
};
47 Job(uint64_t i
, bool compress
): id(i
), is_compress(compress
) {}
48 Job(const Job
&j
): id(j
.id
), status(j
.status
.load()), is_compress(j
.is_compress
), data(j
.data
) {}
51 // only when job.status == DONE && with job_lock holding, we can insert/erase element in jobs
52 // only when job.status == WAIT && with pool_lock holding, you can change its status and modify element's info later
53 unordered_map
<uint64_t, Job
> jobs
;
55 struct CompressWQ
: public ThreadPool::WorkQueue
<Job
> {
56 typedef AsyncCompressor::Job Job
;
57 AsyncCompressor
*async_compressor
;
58 deque
<Job
*> job_queue
;
60 CompressWQ(AsyncCompressor
*ac
, time_t timeout
, time_t suicide_timeout
, ThreadPool
*tp
)
61 : ThreadPool::WorkQueue
<Job
>("AsyncCompressor::CompressWQ", timeout
, suicide_timeout
, tp
), async_compressor(ac
) {}
63 bool _enqueue(Job
*item
) override
{
64 job_queue
.push_back(item
);
67 void _dequeue(Job
*item
) override
{
70 bool _empty() override
{
71 return job_queue
.empty();
73 Job
* _dequeue() override
{
74 if (job_queue
.empty())
77 while (!job_queue
.empty()) {
78 item
= job_queue
.front();
79 job_queue
.pop_front();
81 auto expected
= status_t::WAIT
;
82 if (item
->status
.compare_exchange_strong(expected
, status_t::WORKING
)) {
85 Mutex::Locker
l(async_compressor
->job_lock
);
86 async_compressor
->jobs
.erase(item
->id
);
92 void _process(Job
*item
, ThreadPool::TPHandle
&) override
{
93 assert(item
->status
== status_t::WORKING
);
96 if (item
->is_compress
)
97 r
= async_compressor
->compressor
->compress(item
->data
, out
);
99 r
= async_compressor
->compressor
->decompress(item
->data
, out
);
101 item
->data
.swap(out
);
102 auto expected
= status_t::WORKING
;
103 assert(item
->status
.compare_exchange_strong(expected
, status_t::DONE
));
105 item
->status
= status_t::ERROR
;
108 void _process_finish(Job
*item
) override
{}
109 void _clear() override
{}
111 friend class CompressWQ
;
112 void _compress(bufferlist
&in
, bufferlist
&out
);
113 void _decompress(bufferlist
&in
, bufferlist
&out
);
116 explicit AsyncCompressor(CephContext
*c
);
117 virtual ~AsyncCompressor() {}
119 int get_cpuid(int id
) {
122 return coreids
[id
% coreids
.size()];
127 uint64_t async_compress(bufferlist
&data
);
128 uint64_t async_decompress(bufferlist
&data
);
129 int get_compress_data(uint64_t compress_id
, bufferlist
&data
, bool blocking
, bool *finished
);
130 int get_decompress_data(uint64_t decompress_id
, bufferlist
&data
, bool blocking
, bool *finished
);