#include "db/event_helpers.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/listener.h"
+#include "rocksdb/utilities/customizable_util.h"
+
namespace ROCKSDB_NAMESPACE {
+#ifndef ROCKSDB_LITE
+Status EventListener::CreateFromString(const ConfigOptions& config_options,
+ const std::string& id,
+ std::shared_ptr<EventListener>* result) {
+ return LoadSharedObject<EventListener>(config_options, id, nullptr, result);
+}
+#endif // ROCKSDB_LITE
namespace {
template <class T>
inline T SafeDivide(T a, T b) {
return b == 0 ? 0 : a / b;
}
-} // namespace
+} // anonymous namespace
void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
*jwriter << "time_micros"
const std::vector<std::shared_ptr<EventListener>>& listeners,
const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, TableFileCreationReason reason) {
+ if (listeners.empty()) {
+ return;
+ }
TableFileCreationBriefInfo info;
info.db_name = db_name;
info.cf_name = cf_name;
BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
bool* auto_recovery) {
#ifndef ROCKSDB_LITE
- if (listeners.size() == 0U) {
+ if (listeners.empty()) {
return;
}
db_mutex->AssertHeld();
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
<< "table_file_creation"
<< "file_number" << fd.GetNumber() << "file_size"
- << fd.GetFileSize() << "file_checksum" << file_checksum
- << "file_checksum_func_name" << file_checksum_func_name;
+ << fd.GetFileSize() << "file_checksum"
+ << Slice(file_checksum).ToString(true) << "file_checksum_func_name"
+ << file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
+ << "largest_seqno" << fd.largest_seqno;
// table_properties
{
table_properties.num_entries)
<< "num_data_blocks" << table_properties.num_data_blocks
<< "num_entries" << table_properties.num_entries
+ << "num_filter_entries" << table_properties.num_filter_entries
<< "num_deletions" << table_properties.num_deletions
<< "num_merge_operands" << table_properties.num_merge_operands
<< "num_range_deletions" << table_properties.num_range_deletions
<< table_properties.compression_options << "creation_time"
<< table_properties.creation_time << "oldest_key_time"
<< table_properties.oldest_key_time << "file_creation_time"
- << table_properties.file_creation_time << "db_id"
- << table_properties.db_id << "db_session_id"
- << table_properties.db_session_id;
+ << table_properties.file_creation_time
+ << "slow_compression_estimated_data_size"
+ << table_properties.slow_compression_estimated_data_size
+ << "fast_compression_estimated_data_size"
+ << table_properties.fast_compression_estimated_data_size
+ << "db_id" << table_properties.db_id << "db_session_id"
+ << table_properties.db_session_id << "orig_file_number"
+ << table_properties.orig_file_number << "seqno_to_time_mapping";
+
+ if (table_properties.seqno_to_time_mapping.empty()) {
+ jwriter << "N/A";
+ } else {
+ SeqnoToTimeMapping tmp;
+ Status status = tmp.Add(table_properties.seqno_to_time_mapping);
+ if (status.ok()) {
+ jwriter << tmp.ToHumanString();
+ } else {
+ jwriter << "Invalid";
+ }
+ }
// user collected properties
for (const auto& prop : table_properties.readable_properties) {
}
#ifndef ROCKSDB_LITE
- if (listeners.size() == 0) {
+ if (listeners.empty()) {
return;
}
TableFileCreationInfo info;
event_logger->Log(jwriter);
#ifndef ROCKSDB_LITE
+ if (listeners.empty()) {
+ return;
+ }
TableFileDeletionInfo info;
info.db_name = dbname;
info.job_id = job_id;
#endif // !ROCKSDB_LITE
}
-void EventHelpers::NotifyOnErrorRecoveryCompleted(
+void EventHelpers::NotifyOnErrorRecoveryEnd(
const std::vector<std::shared_ptr<EventListener>>& listeners,
- Status old_bg_error, InstrumentedMutex* db_mutex) {
+ const Status& old_bg_error, const Status& new_bg_error,
+ InstrumentedMutex* db_mutex) {
#ifndef ROCKSDB_LITE
- if (listeners.size() == 0U) {
- return;
- }
- db_mutex->AssertHeld();
- // release lock while notifying events
- db_mutex->Unlock();
- for (auto& listener : listeners) {
- listener->OnErrorRecoveryCompleted(old_bg_error);
+ if (!listeners.empty()) {
+ db_mutex->AssertHeld();
+ // release lock while notifying events
+ db_mutex->Unlock();
+ for (auto& listener : listeners) {
+ BackgroundErrorRecoveryInfo info;
+ info.old_bg_error = old_bg_error;
+ info.new_bg_error = new_bg_error;
+ listener->OnErrorRecoveryCompleted(old_bg_error);
+ listener->OnErrorRecoveryEnd(info);
+ info.old_bg_error.PermitUncheckedError();
+ info.new_bg_error.PermitUncheckedError();
+ }
+ db_mutex->Lock();
}
- old_bg_error.PermitUncheckedError();
- db_mutex->Lock();
#else
(void)listeners;
(void)old_bg_error;
+ (void)new_bg_error;
(void)db_mutex;
#endif // ROCKSDB_LITE
}
+#ifndef ROCKSDB_LITE
+void EventHelpers::NotifyBlobFileCreationStarted(
+ const std::vector<std::shared_ptr<EventListener>>& listeners,
+ const std::string& db_name, const std::string& cf_name,
+ const std::string& file_path, int job_id,
+ BlobFileCreationReason creation_reason) {
+ if (listeners.empty()) {
+ return;
+ }
+ BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
+ creation_reason);
+ for (const auto& listener : listeners) {
+ listener->OnBlobFileCreationStarted(info);
+ }
+}
+#endif // !ROCKSDB_LITE
+
+void EventHelpers::LogAndNotifyBlobFileCreationFinished(
+ EventLogger* event_logger,
+ const std::vector<std::shared_ptr<EventListener>>& listeners,
+ const std::string& db_name, const std::string& cf_name,
+ const std::string& file_path, int job_id, uint64_t file_number,
+ BlobFileCreationReason creation_reason, const Status& s,
+ const std::string& file_checksum,
+ const std::string& file_checksum_func_name, uint64_t total_blob_count,
+ uint64_t total_blob_bytes) {
+ if (s.ok() && event_logger) {
+ JSONWriter jwriter;
+ AppendCurrentTime(&jwriter);
+ jwriter << "cf_name" << cf_name << "job" << job_id << "event"
+ << "blob_file_creation"
+ << "file_number" << file_number << "total_blob_count"
+ << total_blob_count << "total_blob_bytes" << total_blob_bytes
+ << "file_checksum" << file_checksum << "file_checksum_func_name"
+ << file_checksum_func_name << "status" << s.ToString();
+
+ jwriter.EndObject();
+ event_logger->Log(jwriter);
+ }
+
+#ifndef ROCKSDB_LITE
+ if (listeners.empty()) {
+ return;
+ }
+ BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
+ creation_reason, total_blob_count, total_blob_bytes,
+ s, file_checksum, file_checksum_func_name);
+ for (const auto& listener : listeners) {
+ listener->OnBlobFileCreated(info);
+ }
+ info.status.PermitUncheckedError();
+#else
+ (void)listeners;
+ (void)db_name;
+ (void)file_path;
+ (void)creation_reason;
+#endif
+}
+
+void EventHelpers::LogAndNotifyBlobFileDeletion(
+ EventLogger* event_logger,
+ const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
+ uint64_t file_number, const std::string& file_path, const Status& status,
+ const std::string& dbname) {
+ if (event_logger) {
+ JSONWriter jwriter;
+ AppendCurrentTime(&jwriter);
+
+ jwriter << "job" << job_id << "event"
+ << "blob_file_deletion"
+ << "file_number" << file_number;
+ if (!status.ok()) {
+ jwriter << "status" << status.ToString();
+ }
+
+ jwriter.EndObject();
+ event_logger->Log(jwriter);
+ }
+#ifndef ROCKSDB_LITE
+ if (listeners.empty()) {
+ return;
+ }
+ BlobFileDeletionInfo info(dbname, file_path, job_id, status);
+ for (const auto& listener : listeners) {
+ listener->OnBlobFileDeleted(info);
+ }
+ info.status.PermitUncheckedError();
+#else
+ (void)listeners;
+ (void)dbname;
+ (void)file_path;
+#endif // !ROCKSDB_LITE
+}
+
} // namespace ROCKSDB_NAMESPACE