]> git.proxmox.com Git - ceph.git/blame - ceph/src/pmdk/src/libpmemobj/memops.c
import ceph 16.2.7
[ceph.git] / ceph / src / pmdk / src / libpmemobj / memops.c
CommitLineData
a4b75251
TL
1// SPDX-License-Identifier: BSD-3-Clause
2/* Copyright 2016-2020, Intel Corporation */
3
4/*
5 * memops.c -- aggregated memory operations helper implementation
6 *
7 * The operation collects all of the required memory modifications that
8 * need to happen in an atomic way (all of them or none), and abstracts
9 * away the storage type (transient/persistent) and the underlying
10 * implementation of how it's actually performed - in some cases using
11 * the redo log is unnecessary and the allocation process can be sped up
12 * a bit by completely omitting that whole machinery.
13 *
14 * The modifications are not visible until the context is processed.
15 */
16
17#include "memops.h"
18#include "obj.h"
19#include "out.h"
20#include "ravl.h"
21#include "valgrind_internal.h"
22#include "vecq.h"
23#include "sys_util.h"
24
25#define ULOG_BASE_SIZE 1024
26#define OP_MERGE_SEARCH 64
27
28enum operation_state {
29 OPERATION_IDLE,
30 OPERATION_IN_PROGRESS,
31 OPERATION_CLEANUP,
32};
33
34struct operation_log {
35 size_t capacity; /* capacity of the ulog log */
36 size_t offset; /* data offset inside of the log */
37 struct ulog *ulog; /* DRAM allocated log of modifications */
38};
39
40/*
41 * operation_context -- context of an ongoing palloc operation
42 */
43struct operation_context {
44 enum log_type type;
45
46 ulog_extend_fn extend; /* function to allocate next ulog */
47 ulog_free_fn ulog_free; /* function to free next ulogs */
48
49 const struct pmem_ops *p_ops;
50 struct pmem_ops t_ops; /* used for transient data processing */
51 struct pmem_ops s_ops; /* used for shadow copy data processing */
52
53 size_t ulog_curr_offset; /* offset in the log for buffer stores */
54 size_t ulog_curr_capacity; /* capacity of the current log */
55 size_t ulog_curr_gen_num; /* transaction counter in the current log */
56 struct ulog *ulog_curr; /* current persistent log */
57 size_t total_logged; /* total amount of buffer stores in the logs */
58
59 struct ulog *ulog; /* pointer to the persistent ulog log */
60 size_t ulog_base_nbytes; /* available bytes in initial ulog log */
61 size_t ulog_capacity; /* sum of capacity, incl all next ulog logs */
62 int ulog_auto_reserve; /* allow or do not to auto ulog reservation */
63 int ulog_any_user_buffer; /* set if any user buffer is added */
64
65 struct ulog_next next; /* vector of 'next' fields of persistent ulog */
66
67 enum operation_state state; /* operation sanity check */
68
69 struct operation_log pshadow_ops; /* shadow copy of persistent ulog */
70 struct operation_log transient_ops; /* log of transient changes */
71
72 /* collection used to look for potential merge candidates */
73 VECQ(, struct ulog_entry_val *) merge_entries;
74};
75
76/*
77 * operation_log_transient_init -- (internal) initialize operation log
78 * containing transient memory resident changes
79 */
80static int
81operation_log_transient_init(struct operation_log *log)
82{
83 log->capacity = ULOG_BASE_SIZE;
84 log->offset = 0;
85
86 struct ulog *src = Zalloc(sizeof(struct ulog) +
87 ULOG_BASE_SIZE);
88 if (src == NULL) {
89 ERR("!Zalloc");
90 return -1;
91 }
92
93 /* initialize underlying redo log structure */
94 src->capacity = ULOG_BASE_SIZE;
95
96 log->ulog = src;
97
98 return 0;
99}
100
101/*
102 * operation_log_persistent_init -- (internal) initialize operation log
103 * containing persistent memory resident changes
104 */
105static int
106operation_log_persistent_init(struct operation_log *log,
107 size_t ulog_base_nbytes)
108{
109 log->capacity = ULOG_BASE_SIZE;
110 log->offset = 0;
111
112 struct ulog *src = Zalloc(sizeof(struct ulog) +
113 ULOG_BASE_SIZE);
114 if (src == NULL) {
115 ERR("!Zalloc");
116 return -1;
117 }
118
119 /* initialize underlying redo log structure */
120 src->capacity = ulog_base_nbytes;
121 memset(src->unused, 0, sizeof(src->unused));
122
123 log->ulog = src;
124
125 return 0;
126}
127
128/*
129 * operation_transient_clean -- cleans pmemcheck address state
130 */
131static int
132operation_transient_clean(void *base, const void *addr, size_t len,
133 unsigned flags)
134{
135 VALGRIND_SET_CLEAN(addr, len);
136
137 return 0;
138}
139
140/*
141 * operation_transient_drain -- noop
142 */
143static void
144operation_transient_drain(void *base)
145{
146}
147
148/*
149 * operation_transient_memcpy -- transient memcpy wrapper
150 */
151static void *
152operation_transient_memcpy(void *base, void *dest, const void *src, size_t len,
153 unsigned flags)
154{
155 return memcpy(dest, src, len);
156}
157
158/*
159 * operation_new -- creates new operation context
160 */
161struct operation_context *
162operation_new(struct ulog *ulog, size_t ulog_base_nbytes,
163 ulog_extend_fn extend, ulog_free_fn ulog_free,
164 const struct pmem_ops *p_ops, enum log_type type)
165{
166 struct operation_context *ctx = Zalloc(sizeof(*ctx));
167 if (ctx == NULL) {
168 ERR("!Zalloc");
169 goto error_ctx_alloc;
170 }
171
172 ctx->ulog = ulog;
173 ctx->ulog_base_nbytes = ulog_base_nbytes;
174 ctx->ulog_capacity = ulog_capacity(ulog,
175 ulog_base_nbytes, p_ops);
176 ctx->extend = extend;
177 ctx->ulog_free = ulog_free;
178 ctx->state = OPERATION_IDLE;
179 VEC_INIT(&ctx->next);
180 ulog_rebuild_next_vec(ulog, &ctx->next, p_ops);
181 ctx->p_ops = p_ops;
182 ctx->type = type;
183 ctx->ulog_any_user_buffer = 0;
184
185 ctx->ulog_curr_offset = 0;
186 ctx->ulog_curr_capacity = 0;
187 ctx->ulog_curr = NULL;
188
189 ctx->t_ops.base = NULL;
190 ctx->t_ops.flush = operation_transient_clean;
191 ctx->t_ops.memcpy = operation_transient_memcpy;
192 ctx->t_ops.drain = operation_transient_drain;
193
194 ctx->s_ops.base = p_ops->base;
195 ctx->s_ops.flush = operation_transient_clean;
196 ctx->s_ops.memcpy = operation_transient_memcpy;
197 ctx->s_ops.drain = operation_transient_drain;
198
199 VECQ_INIT(&ctx->merge_entries);
200
201 if (operation_log_transient_init(&ctx->transient_ops) != 0)
202 goto error_ulog_alloc;
203
204 if (operation_log_persistent_init(&ctx->pshadow_ops,
205 ulog_base_nbytes) != 0)
206 goto error_ulog_alloc;
207
208 return ctx;
209
210error_ulog_alloc:
211 operation_delete(ctx);
212error_ctx_alloc:
213 return NULL;
214}
215
216/*
217 * operation_delete -- deletes operation context
218 */
219void
220operation_delete(struct operation_context *ctx)
221{
222 VECQ_DELETE(&ctx->merge_entries);
223 VEC_DELETE(&ctx->next);
224 Free(ctx->pshadow_ops.ulog);
225 Free(ctx->transient_ops.ulog);
226 Free(ctx);
227}
228
229/*
230 * operation_user_buffer_remove -- removes range from the tree and returns 0
231 */
232static int
233operation_user_buffer_remove(void *base, void *addr)
234{
235 PMEMobjpool *pop = base;
236 if (!pop->ulog_user_buffers.verify)
237 return 0;
238
239 util_mutex_lock(&pop->ulog_user_buffers.lock);
240
241 struct ravl *ravl = pop->ulog_user_buffers.map;
242 enum ravl_predicate predict = RAVL_PREDICATE_EQUAL;
243
244 struct user_buffer_def range;
245 range.addr = addr;
246 range.size = 0;
247
248 struct ravl_node *n = ravl_find(ravl, &range, predict);
249 ASSERTne(n, NULL);
250 ravl_remove(ravl, n);
251
252 util_mutex_unlock(&pop->ulog_user_buffers.lock);
253
254 return 0;
255}
256
257/*
258 * operation_free_logs -- free all logs except first
259 */
260void
261operation_free_logs(struct operation_context *ctx, uint64_t flags)
262{
263 int freed = ulog_free_next(ctx->ulog, ctx->p_ops, ctx->ulog_free,
264 operation_user_buffer_remove, flags);
265 if (freed) {
266 ctx->ulog_capacity = ulog_capacity(ctx->ulog,
267 ctx->ulog_base_nbytes, ctx->p_ops);
268 VEC_CLEAR(&ctx->next);
269 ulog_rebuild_next_vec(ctx->ulog, &ctx->next, ctx->p_ops);
270 }
271
272 ASSERTeq(VEC_SIZE(&ctx->next), 0);
273}
274
275/*
276 * operation_merge -- (internal) performs operation on a field
277 */
278static inline void
279operation_merge(struct ulog_entry_base *entry, uint64_t value,
280 ulog_operation_type type)
281{
282 struct ulog_entry_val *e = (struct ulog_entry_val *)entry;
283
284 switch (type) {
285 case ULOG_OPERATION_AND:
286 e->value &= value;
287 break;
288 case ULOG_OPERATION_OR:
289 e->value |= value;
290 break;
291 case ULOG_OPERATION_SET:
292 e->value = value;
293 break;
294 default:
295 ASSERT(0); /* unreachable */
296 }
297}
298
299/*
300 * operation_try_merge_entry -- tries to merge the incoming log entry with
301 * existing entries
302 *
303 * Because this requires a reverse foreach, it cannot be implemented using
304 * the on-media ulog log structure since there's no way to find what's
305 * the previous entry in the log. Instead, the last N entries are stored
306 * in a collection and traversed backwards.
307 */
308static int
309operation_try_merge_entry(struct operation_context *ctx,
310 void *ptr, uint64_t value, ulog_operation_type type)
311{
312 int ret = 0;
313 uint64_t offset = OBJ_PTR_TO_OFF(ctx->p_ops->base, ptr);
314
315 struct ulog_entry_val *e;
316 VECQ_FOREACH_REVERSE(e, &ctx->merge_entries) {
317 if (ulog_entry_offset(&e->base) == offset) {
318 if (ulog_entry_type(&e->base) == type) {
319 operation_merge(&e->base, value, type);
320 return 1;
321 } else {
322 break;
323 }
324 }
325 }
326
327 return ret;
328}
329
330/*
331 * operation_merge_entry_add -- adds a new entry to the merge collection,
332 * keeps capacity at OP_MERGE_SEARCH. Removes old entries in FIFO fashion.
333 */
334static void
335operation_merge_entry_add(struct operation_context *ctx,
336 struct ulog_entry_val *entry)
337{
338 if (VECQ_SIZE(&ctx->merge_entries) == OP_MERGE_SEARCH)
339 (void) VECQ_DEQUEUE(&ctx->merge_entries);
340
341 if (VECQ_ENQUEUE(&ctx->merge_entries, entry) != 0) {
342 /* this is fine, only runtime perf will get slower */
343 LOG(2, "out of memory - unable to track entries");
344 }
345}
346
347/*
348 * operation_add_typed_value -- adds new entry to the current operation, if the
349 * same ptr address already exists and the operation type is set,
350 * the new value is not added and the function has no effect.
351 */
352int
353operation_add_typed_entry(struct operation_context *ctx,
354 void *ptr, uint64_t value,
355 ulog_operation_type type, enum operation_log_type log_type)
356{
357 struct operation_log *oplog = log_type == LOG_PERSISTENT ?
358 &ctx->pshadow_ops : &ctx->transient_ops;
359
360 /*
361 * Always make sure to have one extra spare cacheline so that the
362 * ulog log entry creation has enough room for zeroing.
363 */
364 if (oplog->offset + CACHELINE_SIZE == oplog->capacity) {
365 size_t ncapacity = oplog->capacity + ULOG_BASE_SIZE;
366 struct ulog *ulog = Realloc(oplog->ulog,
367 SIZEOF_ULOG(ncapacity));
368 if (ulog == NULL)
369 return -1;
370 oplog->capacity += ULOG_BASE_SIZE;
371 oplog->ulog = ulog;
372 oplog->ulog->capacity = oplog->capacity;
373
374 /*
375 * Realloc invalidated the ulog entries that are inside of this
376 * vector, need to clear it to avoid use after free.
377 */
378 VECQ_CLEAR(&ctx->merge_entries);
379 }
380
381 if (log_type == LOG_PERSISTENT &&
382 operation_try_merge_entry(ctx, ptr, value, type) != 0)
383 return 0;
384
385 struct ulog_entry_val *entry = ulog_entry_val_create(
386 oplog->ulog, oplog->offset, ptr, value, type,
387 log_type == LOG_TRANSIENT ? &ctx->t_ops : &ctx->s_ops);
388
389 if (log_type == LOG_PERSISTENT)
390 operation_merge_entry_add(ctx, entry);
391
392 oplog->offset += ulog_entry_size(&entry->base);
393
394 return 0;
395}
396
397/*
398 * operation_add_value -- adds new entry to the current operation with
399 * entry type autodetected based on the memory location
400 */
401int
402operation_add_entry(struct operation_context *ctx, void *ptr, uint64_t value,
403 ulog_operation_type type)
404{
405 const struct pmem_ops *p_ops = ctx->p_ops;
406 PMEMobjpool *pop = (PMEMobjpool *)p_ops->base;
407
408 int from_pool = OBJ_OFF_IS_VALID(pop,
409 (uintptr_t)ptr - (uintptr_t)p_ops->base);
410
411 return operation_add_typed_entry(ctx, ptr, value, type,
412 from_pool ? LOG_PERSISTENT : LOG_TRANSIENT);
413}
414
415/*
416 * operation_add_buffer -- adds a buffer operation to the log
417 */
418int
419operation_add_buffer(struct operation_context *ctx,
420 void *dest, void *src, size_t size, ulog_operation_type type)
421{
422 size_t real_size = size + sizeof(struct ulog_entry_buf);
423
424 /* if there's no space left in the log, reserve some more */
425 if (ctx->ulog_curr_capacity == 0) {
426 ctx->ulog_curr_gen_num = ctx->ulog->gen_num;
427 if (operation_reserve(ctx, ctx->total_logged + real_size) != 0)
428 return -1;
429
430 ctx->ulog_curr = ctx->ulog_curr == NULL ? ctx->ulog :
431 ulog_next(ctx->ulog_curr, ctx->p_ops);
432 ASSERTne(ctx->ulog_curr, NULL);
433 ctx->ulog_curr_offset = 0;
434 ctx->ulog_curr_capacity = ctx->ulog_curr->capacity;
435 }
436
437 size_t curr_size = MIN(real_size, ctx->ulog_curr_capacity);
438 size_t data_size = curr_size - sizeof(struct ulog_entry_buf);
439 size_t entry_size = ALIGN_UP(curr_size, CACHELINE_SIZE);
440
441 /*
442 * To make sure that the log is consistent and contiguous, we need
443 * make sure that the header of the entry that would be located
444 * immediately after this one is zeroed.
445 */
446 struct ulog_entry_base *next_entry = NULL;
447 if (entry_size == ctx->ulog_curr_capacity) {
448 struct ulog *u = ulog_next(ctx->ulog_curr, ctx->p_ops);
449 if (u != NULL)
450 next_entry = (struct ulog_entry_base *)u->data;
451 } else {
452 size_t next_entry_offset = ctx->ulog_curr_offset + entry_size;
453 next_entry = (struct ulog_entry_base *)(ctx->ulog_curr->data +
454 next_entry_offset);
455 }
456 if (next_entry != NULL)
457 ulog_clobber_entry(next_entry, ctx->p_ops);
458
459 /* create a persistent log entry */
460 struct ulog_entry_buf *e = ulog_entry_buf_create(ctx->ulog_curr,
461 ctx->ulog_curr_offset,
462 ctx->ulog_curr_gen_num,
463 dest, src, data_size,
464 type, ctx->p_ops);
465 ASSERT(entry_size == ulog_entry_size(&e->base));
466 ASSERT(entry_size <= ctx->ulog_curr_capacity);
467
468 ctx->total_logged += entry_size;
469 ctx->ulog_curr_offset += entry_size;
470 ctx->ulog_curr_capacity -= entry_size;
471
472 /*
473 * Recursively add the data to the log until the entire buffer is
474 * processed.
475 */
476 return size - data_size == 0 ? 0 : operation_add_buffer(ctx,
477 (char *)dest + data_size,
478 (char *)src + data_size,
479 size - data_size, type);
480}
481
482/*
483 * operation_user_buffer_range_cmp -- compares addresses of
484 * user buffers
485 */
486int
487operation_user_buffer_range_cmp(const void *lhs, const void *rhs)
488{
489 const struct user_buffer_def *l = lhs;
490 const struct user_buffer_def *r = rhs;
491
492 if (l->addr > r->addr)
493 return 1;
494 else if (l->addr < r->addr)
495 return -1;
496
497 return 0;
498}
499
500/*
501 * operation_user_buffer_try_insert -- adds a user buffer range to the tree,
502 * if the buffer already exists in the tree function returns -1, otherwise
503 * it returns 0
504 */
505static int
506operation_user_buffer_try_insert(PMEMobjpool *pop,
507 struct user_buffer_def *userbuf)
508{
509 int ret = 0;
510
511 if (!pop->ulog_user_buffers.verify)
512 return ret;
513
514 util_mutex_lock(&pop->ulog_user_buffers.lock);
515
516 void *addr_end = (char *)userbuf->addr + userbuf->size;
517 struct user_buffer_def search;
518 search.addr = addr_end;
519 struct ravl_node *n = ravl_find(pop->ulog_user_buffers.map,
520 &search, RAVL_PREDICATE_LESS_EQUAL);
521 if (n != NULL) {
522 struct user_buffer_def *r = ravl_data(n);
523 void *r_end = (char *)r->addr + r->size;
524
525 if (r_end > userbuf->addr && r->addr < addr_end) {
526 /* what was found overlaps with what is being added */
527 ret = -1;
528 goto out;
529 }
530 }
531
532 if (ravl_emplace_copy(pop->ulog_user_buffers.map, userbuf) == -1) {
533 ASSERTne(errno, EEXIST);
534 ret = -1;
535 }
536
537out:
538 util_mutex_unlock(&pop->ulog_user_buffers.lock);
539 return ret;
540}
541
542/*
543 * operation_user_buffer_verify_align -- verify if the provided buffer can be
544 * used as a transaction log, and if so - perform necessary alignments
545 */
546int
547operation_user_buffer_verify_align(struct operation_context *ctx,
548 struct user_buffer_def *userbuf)
549{
550 /*
551 * Address of the buffer has to be aligned up, and the size
552 * has to be aligned down, taking into account the number of bytes
553 * the address was incremented by. The remaining size has to be large
554 * enough to contain the header and at least one ulog entry.
555 */
556 uint64_t buffer_offset = OBJ_PTR_TO_OFF(ctx->p_ops->base,
557 userbuf->addr);
558 ptrdiff_t size_diff = (intptr_t)ulog_by_offset(buffer_offset,
559 ctx->p_ops) - (intptr_t)userbuf->addr;
560 ssize_t capacity_unaligned = (ssize_t)userbuf->size - size_diff
561 - (ssize_t)sizeof(struct ulog);
562 if (capacity_unaligned < (ssize_t)CACHELINE_SIZE) {
563 ERR("Capacity insufficient");
564 return -1;
565 }
566
567 size_t capacity_aligned = ALIGN_DOWN((size_t)capacity_unaligned,
568 CACHELINE_SIZE);
569
570 userbuf->addr = ulog_by_offset(buffer_offset, ctx->p_ops);
571 userbuf->size = capacity_aligned + sizeof(struct ulog);
572
573 if (operation_user_buffer_try_insert(ctx->p_ops->base, userbuf)) {
574 ERR("Buffer currently used");
575 return -1;
576 }
577
578 return 0;
579}
580
581/*
582 * operation_add_user_buffer -- add user buffer to the ulog
583 */
584void
585operation_add_user_buffer(struct operation_context *ctx,
586 struct user_buffer_def *userbuf)
587{
588 uint64_t buffer_offset = OBJ_PTR_TO_OFF(ctx->p_ops->base,
589 userbuf->addr);
590 size_t capacity = userbuf->size - sizeof(struct ulog);
591
592 ulog_construct(buffer_offset, capacity, ctx->ulog->gen_num,
593 1, ULOG_USER_OWNED, ctx->p_ops);
594
595 struct ulog *last_log;
596 /* if there is only one log */
597 if (!VEC_SIZE(&ctx->next))
598 last_log = ctx->ulog;
599 else /* get last element from vector */
600 last_log = ulog_by_offset(VEC_BACK(&ctx->next), ctx->p_ops);
601
602 ASSERTne(last_log, NULL);
603 size_t next_size = sizeof(last_log->next);
604 VALGRIND_ADD_TO_TX(&last_log->next, next_size);
605 last_log->next = buffer_offset;
606 pmemops_persist(ctx->p_ops, &last_log->next, next_size);
607
608 VEC_PUSH_BACK(&ctx->next, buffer_offset);
609 ctx->ulog_capacity += capacity;
610 operation_set_any_user_buffer(ctx, 1);
611}
612
613/*
614 * operation_set_auto_reserve -- set auto reserve value for context
615 */
616void
617operation_set_auto_reserve(struct operation_context *ctx, int auto_reserve)
618{
619 ctx->ulog_auto_reserve = auto_reserve;
620}
621
622/*
623 * operation_set_any_user_buffer -- set ulog_any_user_buffer value for context
624 */
625void
626operation_set_any_user_buffer(struct operation_context *ctx,
627 int any_user_buffer)
628{
629 ctx->ulog_any_user_buffer = any_user_buffer;
630}
631
632/*
633 * operation_get_any_user_buffer -- get ulog_any_user_buffer value from context
634 */
635int
636operation_get_any_user_buffer(struct operation_context *ctx)
637{
638 return ctx->ulog_any_user_buffer;
639}
640
641/*
642 * operation_process_persistent_redo -- (internal) process using ulog
643 */
644static void
645operation_process_persistent_redo(struct operation_context *ctx)
646{
647 ASSERTeq(ctx->pshadow_ops.capacity % CACHELINE_SIZE, 0);
648
649 ulog_store(ctx->ulog, ctx->pshadow_ops.ulog,
650 ctx->pshadow_ops.offset, ctx->ulog_base_nbytes,
651 ctx->ulog_capacity,
652 &ctx->next, ctx->p_ops);
653
654 ulog_process(ctx->pshadow_ops.ulog, OBJ_OFF_IS_VALID_FROM_CTX,
655 ctx->p_ops);
656
657 ulog_clobber(ctx->ulog, &ctx->next, ctx->p_ops);
658}
659
660/*
661 * operation_process_persistent_undo -- (internal) process using ulog
662 */
663static void
664operation_process_persistent_undo(struct operation_context *ctx)
665{
666 ASSERTeq(ctx->pshadow_ops.capacity % CACHELINE_SIZE, 0);
667
668 ulog_process(ctx->ulog, OBJ_OFF_IS_VALID_FROM_CTX, ctx->p_ops);
669}
670
671/*
672 * operation_reserve -- (internal) reserves new capacity in persistent ulog log
673 */
674int
675operation_reserve(struct operation_context *ctx, size_t new_capacity)
676{
677 if (new_capacity > ctx->ulog_capacity) {
678 if (ctx->extend == NULL) {
679 ERR("no extend function present");
680 return -1;
681 }
682
683 if (ulog_reserve(ctx->ulog,
684 ctx->ulog_base_nbytes,
685 ctx->ulog_curr_gen_num,
686 ctx->ulog_auto_reserve,
687 &new_capacity, ctx->extend,
688 &ctx->next, ctx->p_ops) != 0)
689 return -1;
690 ctx->ulog_capacity = new_capacity;
691 }
692
693 return 0;
694}
695
696/*
697 * operation_init -- initializes runtime state of an operation
698 */
699void
700operation_init(struct operation_context *ctx)
701{
702 struct operation_log *plog = &ctx->pshadow_ops;
703 struct operation_log *tlog = &ctx->transient_ops;
704
705 VALGRIND_ANNOTATE_NEW_MEMORY(ctx, sizeof(*ctx));
706 VALGRIND_ANNOTATE_NEW_MEMORY(tlog->ulog, sizeof(struct ulog) +
707 tlog->capacity);
708 VALGRIND_ANNOTATE_NEW_MEMORY(plog->ulog, sizeof(struct ulog) +
709 plog->capacity);
710 tlog->offset = 0;
711 plog->offset = 0;
712 VECQ_REINIT(&ctx->merge_entries);
713
714 ctx->ulog_curr_offset = 0;
715 ctx->ulog_curr_capacity = 0;
716 ctx->ulog_curr_gen_num = 0;
717 ctx->ulog_curr = NULL;
718 ctx->total_logged = 0;
719 ctx->ulog_auto_reserve = 1;
720 ctx->ulog_any_user_buffer = 0;
721}
722
723/*
724 * operation_start -- initializes and starts a new operation
725 */
726void
727operation_start(struct operation_context *ctx)
728{
729 operation_init(ctx);
730 ASSERTeq(ctx->state, OPERATION_IDLE);
731 ctx->state = OPERATION_IN_PROGRESS;
732}
733
734void
735operation_resume(struct operation_context *ctx)
736{
737 operation_start(ctx);
738 ctx->total_logged = ulog_base_nbytes(ctx->ulog);
739}
740
741/*
742 * operation_cancel -- cancels a running operation
743 */
744void
745operation_cancel(struct operation_context *ctx)
746{
747 ASSERTeq(ctx->state, OPERATION_IN_PROGRESS);
748 ctx->state = OPERATION_IDLE;
749}
750
751/*
752 * operation_process -- processes registered operations
753 *
754 * The order of processing is important: persistent, transient.
755 * This is because the transient entries that reside on persistent memory might
756 * require write to a location that is currently occupied by a valid persistent
757 * state but becomes a transient state after operation is processed.
758 */
759void
760operation_process(struct operation_context *ctx)
761{
762 /*
763 * If there's exactly one persistent entry there's no need to involve
764 * the redo log. We can simply assign the value, the operation will be
765 * atomic.
766 */
767 int redo_process = ctx->type == LOG_TYPE_REDO &&
768 ctx->pshadow_ops.offset != 0;
769 if (redo_process &&
770 ctx->pshadow_ops.offset == sizeof(struct ulog_entry_val)) {
771 struct ulog_entry_base *e = (struct ulog_entry_base *)
772 ctx->pshadow_ops.ulog->data;
773 ulog_operation_type t = ulog_entry_type(e);
774 if (t == ULOG_OPERATION_SET || t == ULOG_OPERATION_AND ||
775 t == ULOG_OPERATION_OR) {
776 ulog_entry_apply(e, 1, ctx->p_ops);
777 redo_process = 0;
778 }
779 }
780
781 if (redo_process) {
782 operation_process_persistent_redo(ctx);
783 ctx->state = OPERATION_CLEANUP;
784 } else if (ctx->type == LOG_TYPE_UNDO && ctx->total_logged != 0) {
785 operation_process_persistent_undo(ctx);
786 ctx->state = OPERATION_CLEANUP;
787 }
788
789 /* process transient entries with transient memory ops */
790 if (ctx->transient_ops.offset != 0)
791 ulog_process(ctx->transient_ops.ulog, NULL, &ctx->t_ops);
792}
793
794/*
795 * operation_finish -- finalizes the operation
796 */
797void
798operation_finish(struct operation_context *ctx, unsigned flags)
799{
800 ASSERTne(ctx->state, OPERATION_IDLE);
801
802 if (ctx->type == LOG_TYPE_UNDO && ctx->total_logged != 0)
803 ctx->state = OPERATION_CLEANUP;
804
805 if (ctx->ulog_any_user_buffer) {
806 flags |= ULOG_ANY_USER_BUFFER;
807 ctx->state = OPERATION_CLEANUP;
808 }
809
810 if (ctx->state != OPERATION_CLEANUP)
811 goto out;
812
813 if (ctx->type == LOG_TYPE_UNDO) {
814 int ret = ulog_clobber_data(ctx->ulog,
815 ctx->total_logged, ctx->ulog_base_nbytes,
816 &ctx->next, ctx->ulog_free,
817 operation_user_buffer_remove,
818 ctx->p_ops, flags);
819 if (ret == 0)
820 goto out;
821 } else if (ctx->type == LOG_TYPE_REDO) {
822 int ret = ulog_free_next(ctx->ulog, ctx->p_ops,
823 ctx->ulog_free, operation_user_buffer_remove,
824 flags);
825 if (ret == 0)
826 goto out;
827 }
828
829 /* clobbering shrunk the ulog */
830 ctx->ulog_capacity = ulog_capacity(ctx->ulog,
831 ctx->ulog_base_nbytes, ctx->p_ops);
832 VEC_CLEAR(&ctx->next);
833 ulog_rebuild_next_vec(ctx->ulog, &ctx->next, ctx->p_ops);
834
835out:
836 ctx->state = OPERATION_IDLE;
837}