1 // SPDX-License-Identifier: BSD-3-Clause
2 /* Copyright 2016-2020, Intel Corporation */
5 * memops.c -- aggregated memory operations helper implementation
7 * The operation collects all of the required memory modifications that
8 * need to happen in an atomic way (all of them or none), and abstracts
9 * away the storage type (transient/persistent) and the underlying
10 * implementation of how it's actually performed - in some cases using
11 * the redo log is unnecessary and the allocation process can be sped up
12 * a bit by completely omitting that whole machinery.
14 * The modifications are not visible until the context is processed.
21 #include "valgrind_internal.h"
25 #define ULOG_BASE_SIZE 1024
26 #define OP_MERGE_SEARCH 64
28 enum operation_state
{
30 OPERATION_IN_PROGRESS
,
34 struct operation_log
{
35 size_t capacity
; /* capacity of the ulog log */
36 size_t offset
; /* data offset inside of the log */
37 struct ulog
*ulog
; /* DRAM allocated log of modifications */
41 * operation_context -- context of an ongoing palloc operation
43 struct operation_context
{
46 ulog_extend_fn extend
; /* function to allocate next ulog */
47 ulog_free_fn ulog_free
; /* function to free next ulogs */
49 const struct pmem_ops
*p_ops
;
50 struct pmem_ops t_ops
; /* used for transient data processing */
51 struct pmem_ops s_ops
; /* used for shadow copy data processing */
53 size_t ulog_curr_offset
; /* offset in the log for buffer stores */
54 size_t ulog_curr_capacity
; /* capacity of the current log */
55 size_t ulog_curr_gen_num
; /* transaction counter in the current log */
56 struct ulog
*ulog_curr
; /* current persistent log */
57 size_t total_logged
; /* total amount of buffer stores in the logs */
59 struct ulog
*ulog
; /* pointer to the persistent ulog log */
60 size_t ulog_base_nbytes
; /* available bytes in initial ulog log */
61 size_t ulog_capacity
; /* sum of capacity, incl all next ulog logs */
62 int ulog_auto_reserve
; /* allow or do not to auto ulog reservation */
63 int ulog_any_user_buffer
; /* set if any user buffer is added */
65 struct ulog_next next
; /* vector of 'next' fields of persistent ulog */
67 enum operation_state state
; /* operation sanity check */
69 struct operation_log pshadow_ops
; /* shadow copy of persistent ulog */
70 struct operation_log transient_ops
; /* log of transient changes */
72 /* collection used to look for potential merge candidates */
73 VECQ(, struct ulog_entry_val
*) merge_entries
;
77 * operation_log_transient_init -- (internal) initialize operation log
78 * containing transient memory resident changes
81 operation_log_transient_init(struct operation_log
*log
)
83 log
->capacity
= ULOG_BASE_SIZE
;
86 struct ulog
*src
= Zalloc(sizeof(struct ulog
) +
93 /* initialize underlying redo log structure */
94 src
->capacity
= ULOG_BASE_SIZE
;
102 * operation_log_persistent_init -- (internal) initialize operation log
103 * containing persistent memory resident changes
106 operation_log_persistent_init(struct operation_log
*log
,
107 size_t ulog_base_nbytes
)
109 log
->capacity
= ULOG_BASE_SIZE
;
112 struct ulog
*src
= Zalloc(sizeof(struct ulog
) +
119 /* initialize underlying redo log structure */
120 src
->capacity
= ulog_base_nbytes
;
121 memset(src
->unused
, 0, sizeof(src
->unused
));
129 * operation_transient_clean -- cleans pmemcheck address state
132 operation_transient_clean(void *base
, const void *addr
, size_t len
,
135 VALGRIND_SET_CLEAN(addr
, len
);
141 * operation_transient_drain -- noop
144 operation_transient_drain(void *base
)
149 * operation_transient_memcpy -- transient memcpy wrapper
152 operation_transient_memcpy(void *base
, void *dest
, const void *src
, size_t len
,
155 return memcpy(dest
, src
, len
);
159 * operation_new -- creates new operation context
161 struct operation_context
*
162 operation_new(struct ulog
*ulog
, size_t ulog_base_nbytes
,
163 ulog_extend_fn extend
, ulog_free_fn ulog_free
,
164 const struct pmem_ops
*p_ops
, enum log_type type
)
166 struct operation_context
*ctx
= Zalloc(sizeof(*ctx
));
169 goto error_ctx_alloc
;
173 ctx
->ulog_base_nbytes
= ulog_base_nbytes
;
174 ctx
->ulog_capacity
= ulog_capacity(ulog
,
175 ulog_base_nbytes
, p_ops
);
176 ctx
->extend
= extend
;
177 ctx
->ulog_free
= ulog_free
;
178 ctx
->state
= OPERATION_IDLE
;
179 VEC_INIT(&ctx
->next
);
180 ulog_rebuild_next_vec(ulog
, &ctx
->next
, p_ops
);
183 ctx
->ulog_any_user_buffer
= 0;
185 ctx
->ulog_curr_offset
= 0;
186 ctx
->ulog_curr_capacity
= 0;
187 ctx
->ulog_curr
= NULL
;
189 ctx
->t_ops
.base
= NULL
;
190 ctx
->t_ops
.flush
= operation_transient_clean
;
191 ctx
->t_ops
.memcpy
= operation_transient_memcpy
;
192 ctx
->t_ops
.drain
= operation_transient_drain
;
194 ctx
->s_ops
.base
= p_ops
->base
;
195 ctx
->s_ops
.flush
= operation_transient_clean
;
196 ctx
->s_ops
.memcpy
= operation_transient_memcpy
;
197 ctx
->s_ops
.drain
= operation_transient_drain
;
199 VECQ_INIT(&ctx
->merge_entries
);
201 if (operation_log_transient_init(&ctx
->transient_ops
) != 0)
202 goto error_ulog_alloc
;
204 if (operation_log_persistent_init(&ctx
->pshadow_ops
,
205 ulog_base_nbytes
) != 0)
206 goto error_ulog_alloc
;
211 operation_delete(ctx
);
217 * operation_delete -- deletes operation context
220 operation_delete(struct operation_context
*ctx
)
222 VECQ_DELETE(&ctx
->merge_entries
);
223 VEC_DELETE(&ctx
->next
);
224 Free(ctx
->pshadow_ops
.ulog
);
225 Free(ctx
->transient_ops
.ulog
);
230 * operation_user_buffer_remove -- removes range from the tree and returns 0
233 operation_user_buffer_remove(void *base
, void *addr
)
235 PMEMobjpool
*pop
= base
;
236 if (!pop
->ulog_user_buffers
.verify
)
239 util_mutex_lock(&pop
->ulog_user_buffers
.lock
);
241 struct ravl
*ravl
= pop
->ulog_user_buffers
.map
;
242 enum ravl_predicate predict
= RAVL_PREDICATE_EQUAL
;
244 struct user_buffer_def range
;
248 struct ravl_node
*n
= ravl_find(ravl
, &range
, predict
);
250 ravl_remove(ravl
, n
);
252 util_mutex_unlock(&pop
->ulog_user_buffers
.lock
);
258 * operation_free_logs -- free all logs except first
261 operation_free_logs(struct operation_context
*ctx
, uint64_t flags
)
263 int freed
= ulog_free_next(ctx
->ulog
, ctx
->p_ops
, ctx
->ulog_free
,
264 operation_user_buffer_remove
, flags
);
266 ctx
->ulog_capacity
= ulog_capacity(ctx
->ulog
,
267 ctx
->ulog_base_nbytes
, ctx
->p_ops
);
268 VEC_CLEAR(&ctx
->next
);
269 ulog_rebuild_next_vec(ctx
->ulog
, &ctx
->next
, ctx
->p_ops
);
272 ASSERTeq(VEC_SIZE(&ctx
->next
), 0);
276 * operation_merge -- (internal) performs operation on a field
279 operation_merge(struct ulog_entry_base
*entry
, uint64_t value
,
280 ulog_operation_type type
)
282 struct ulog_entry_val
*e
= (struct ulog_entry_val
*)entry
;
285 case ULOG_OPERATION_AND
:
288 case ULOG_OPERATION_OR
:
291 case ULOG_OPERATION_SET
:
295 ASSERT(0); /* unreachable */
300 * operation_try_merge_entry -- tries to merge the incoming log entry with
303 * Because this requires a reverse foreach, it cannot be implemented using
304 * the on-media ulog log structure since there's no way to find what's
305 * the previous entry in the log. Instead, the last N entries are stored
306 * in a collection and traversed backwards.
309 operation_try_merge_entry(struct operation_context
*ctx
,
310 void *ptr
, uint64_t value
, ulog_operation_type type
)
313 uint64_t offset
= OBJ_PTR_TO_OFF(ctx
->p_ops
->base
, ptr
);
315 struct ulog_entry_val
*e
;
316 VECQ_FOREACH_REVERSE(e
, &ctx
->merge_entries
) {
317 if (ulog_entry_offset(&e
->base
) == offset
) {
318 if (ulog_entry_type(&e
->base
) == type
) {
319 operation_merge(&e
->base
, value
, type
);
331 * operation_merge_entry_add -- adds a new entry to the merge collection,
332 * keeps capacity at OP_MERGE_SEARCH. Removes old entries in FIFO fashion.
335 operation_merge_entry_add(struct operation_context
*ctx
,
336 struct ulog_entry_val
*entry
)
338 if (VECQ_SIZE(&ctx
->merge_entries
) == OP_MERGE_SEARCH
)
339 (void) VECQ_DEQUEUE(&ctx
->merge_entries
);
341 if (VECQ_ENQUEUE(&ctx
->merge_entries
, entry
) != 0) {
342 /* this is fine, only runtime perf will get slower */
343 LOG(2, "out of memory - unable to track entries");
348 * operation_add_typed_value -- adds new entry to the current operation, if the
349 * same ptr address already exists and the operation type is set,
350 * the new value is not added and the function has no effect.
353 operation_add_typed_entry(struct operation_context
*ctx
,
354 void *ptr
, uint64_t value
,
355 ulog_operation_type type
, enum operation_log_type log_type
)
357 struct operation_log
*oplog
= log_type
== LOG_PERSISTENT
?
358 &ctx
->pshadow_ops
: &ctx
->transient_ops
;
361 * Always make sure to have one extra spare cacheline so that the
362 * ulog log entry creation has enough room for zeroing.
364 if (oplog
->offset
+ CACHELINE_SIZE
== oplog
->capacity
) {
365 size_t ncapacity
= oplog
->capacity
+ ULOG_BASE_SIZE
;
366 struct ulog
*ulog
= Realloc(oplog
->ulog
,
367 SIZEOF_ULOG(ncapacity
));
370 oplog
->capacity
+= ULOG_BASE_SIZE
;
372 oplog
->ulog
->capacity
= oplog
->capacity
;
375 * Realloc invalidated the ulog entries that are inside of this
376 * vector, need to clear it to avoid use after free.
378 VECQ_CLEAR(&ctx
->merge_entries
);
381 if (log_type
== LOG_PERSISTENT
&&
382 operation_try_merge_entry(ctx
, ptr
, value
, type
) != 0)
385 struct ulog_entry_val
*entry
= ulog_entry_val_create(
386 oplog
->ulog
, oplog
->offset
, ptr
, value
, type
,
387 log_type
== LOG_TRANSIENT
? &ctx
->t_ops
: &ctx
->s_ops
);
389 if (log_type
== LOG_PERSISTENT
)
390 operation_merge_entry_add(ctx
, entry
);
392 oplog
->offset
+= ulog_entry_size(&entry
->base
);
398 * operation_add_value -- adds new entry to the current operation with
399 * entry type autodetected based on the memory location
402 operation_add_entry(struct operation_context
*ctx
, void *ptr
, uint64_t value
,
403 ulog_operation_type type
)
405 const struct pmem_ops
*p_ops
= ctx
->p_ops
;
406 PMEMobjpool
*pop
= (PMEMobjpool
*)p_ops
->base
;
408 int from_pool
= OBJ_OFF_IS_VALID(pop
,
409 (uintptr_t)ptr
- (uintptr_t)p_ops
->base
);
411 return operation_add_typed_entry(ctx
, ptr
, value
, type
,
412 from_pool
? LOG_PERSISTENT
: LOG_TRANSIENT
);
416 * operation_add_buffer -- adds a buffer operation to the log
419 operation_add_buffer(struct operation_context
*ctx
,
420 void *dest
, void *src
, size_t size
, ulog_operation_type type
)
422 size_t real_size
= size
+ sizeof(struct ulog_entry_buf
);
424 /* if there's no space left in the log, reserve some more */
425 if (ctx
->ulog_curr_capacity
== 0) {
426 ctx
->ulog_curr_gen_num
= ctx
->ulog
->gen_num
;
427 if (operation_reserve(ctx
, ctx
->total_logged
+ real_size
) != 0)
430 ctx
->ulog_curr
= ctx
->ulog_curr
== NULL
? ctx
->ulog
:
431 ulog_next(ctx
->ulog_curr
, ctx
->p_ops
);
432 ASSERTne(ctx
->ulog_curr
, NULL
);
433 ctx
->ulog_curr_offset
= 0;
434 ctx
->ulog_curr_capacity
= ctx
->ulog_curr
->capacity
;
437 size_t curr_size
= MIN(real_size
, ctx
->ulog_curr_capacity
);
438 size_t data_size
= curr_size
- sizeof(struct ulog_entry_buf
);
439 size_t entry_size
= ALIGN_UP(curr_size
, CACHELINE_SIZE
);
442 * To make sure that the log is consistent and contiguous, we need
443 * make sure that the header of the entry that would be located
444 * immediately after this one is zeroed.
446 struct ulog_entry_base
*next_entry
= NULL
;
447 if (entry_size
== ctx
->ulog_curr_capacity
) {
448 struct ulog
*u
= ulog_next(ctx
->ulog_curr
, ctx
->p_ops
);
450 next_entry
= (struct ulog_entry_base
*)u
->data
;
452 size_t next_entry_offset
= ctx
->ulog_curr_offset
+ entry_size
;
453 next_entry
= (struct ulog_entry_base
*)(ctx
->ulog_curr
->data
+
456 if (next_entry
!= NULL
)
457 ulog_clobber_entry(next_entry
, ctx
->p_ops
);
459 /* create a persistent log entry */
460 struct ulog_entry_buf
*e
= ulog_entry_buf_create(ctx
->ulog_curr
,
461 ctx
->ulog_curr_offset
,
462 ctx
->ulog_curr_gen_num
,
463 dest
, src
, data_size
,
465 ASSERT(entry_size
== ulog_entry_size(&e
->base
));
466 ASSERT(entry_size
<= ctx
->ulog_curr_capacity
);
468 ctx
->total_logged
+= entry_size
;
469 ctx
->ulog_curr_offset
+= entry_size
;
470 ctx
->ulog_curr_capacity
-= entry_size
;
473 * Recursively add the data to the log until the entire buffer is
476 return size
- data_size
== 0 ? 0 : operation_add_buffer(ctx
,
477 (char *)dest
+ data_size
,
478 (char *)src
+ data_size
,
479 size
- data_size
, type
);
483 * operation_user_buffer_range_cmp -- compares addresses of
487 operation_user_buffer_range_cmp(const void *lhs
, const void *rhs
)
489 const struct user_buffer_def
*l
= lhs
;
490 const struct user_buffer_def
*r
= rhs
;
492 if (l
->addr
> r
->addr
)
494 else if (l
->addr
< r
->addr
)
501 * operation_user_buffer_try_insert -- adds a user buffer range to the tree,
502 * if the buffer already exists in the tree function returns -1, otherwise
506 operation_user_buffer_try_insert(PMEMobjpool
*pop
,
507 struct user_buffer_def
*userbuf
)
511 if (!pop
->ulog_user_buffers
.verify
)
514 util_mutex_lock(&pop
->ulog_user_buffers
.lock
);
516 void *addr_end
= (char *)userbuf
->addr
+ userbuf
->size
;
517 struct user_buffer_def search
;
518 search
.addr
= addr_end
;
519 struct ravl_node
*n
= ravl_find(pop
->ulog_user_buffers
.map
,
520 &search
, RAVL_PREDICATE_LESS_EQUAL
);
522 struct user_buffer_def
*r
= ravl_data(n
);
523 void *r_end
= (char *)r
->addr
+ r
->size
;
525 if (r_end
> userbuf
->addr
&& r
->addr
< addr_end
) {
526 /* what was found overlaps with what is being added */
532 if (ravl_emplace_copy(pop
->ulog_user_buffers
.map
, userbuf
) == -1) {
533 ASSERTne(errno
, EEXIST
);
538 util_mutex_unlock(&pop
->ulog_user_buffers
.lock
);
543 * operation_user_buffer_verify_align -- verify if the provided buffer can be
544 * used as a transaction log, and if so - perform necessary alignments
547 operation_user_buffer_verify_align(struct operation_context
*ctx
,
548 struct user_buffer_def
*userbuf
)
551 * Address of the buffer has to be aligned up, and the size
552 * has to be aligned down, taking into account the number of bytes
553 * the address was incremented by. The remaining size has to be large
554 * enough to contain the header and at least one ulog entry.
556 uint64_t buffer_offset
= OBJ_PTR_TO_OFF(ctx
->p_ops
->base
,
558 ptrdiff_t size_diff
= (intptr_t)ulog_by_offset(buffer_offset
,
559 ctx
->p_ops
) - (intptr_t)userbuf
->addr
;
560 ssize_t capacity_unaligned
= (ssize_t
)userbuf
->size
- size_diff
561 - (ssize_t
)sizeof(struct ulog
);
562 if (capacity_unaligned
< (ssize_t
)CACHELINE_SIZE
) {
563 ERR("Capacity insufficient");
567 size_t capacity_aligned
= ALIGN_DOWN((size_t)capacity_unaligned
,
570 userbuf
->addr
= ulog_by_offset(buffer_offset
, ctx
->p_ops
);
571 userbuf
->size
= capacity_aligned
+ sizeof(struct ulog
);
573 if (operation_user_buffer_try_insert(ctx
->p_ops
->base
, userbuf
)) {
574 ERR("Buffer currently used");
582 * operation_add_user_buffer -- add user buffer to the ulog
585 operation_add_user_buffer(struct operation_context
*ctx
,
586 struct user_buffer_def
*userbuf
)
588 uint64_t buffer_offset
= OBJ_PTR_TO_OFF(ctx
->p_ops
->base
,
590 size_t capacity
= userbuf
->size
- sizeof(struct ulog
);
592 ulog_construct(buffer_offset
, capacity
, ctx
->ulog
->gen_num
,
593 1, ULOG_USER_OWNED
, ctx
->p_ops
);
595 struct ulog
*last_log
;
596 /* if there is only one log */
597 if (!VEC_SIZE(&ctx
->next
))
598 last_log
= ctx
->ulog
;
599 else /* get last element from vector */
600 last_log
= ulog_by_offset(VEC_BACK(&ctx
->next
), ctx
->p_ops
);
602 ASSERTne(last_log
, NULL
);
603 size_t next_size
= sizeof(last_log
->next
);
604 VALGRIND_ADD_TO_TX(&last_log
->next
, next_size
);
605 last_log
->next
= buffer_offset
;
606 pmemops_persist(ctx
->p_ops
, &last_log
->next
, next_size
);
608 VEC_PUSH_BACK(&ctx
->next
, buffer_offset
);
609 ctx
->ulog_capacity
+= capacity
;
610 operation_set_any_user_buffer(ctx
, 1);
614 * operation_set_auto_reserve -- set auto reserve value for context
617 operation_set_auto_reserve(struct operation_context
*ctx
, int auto_reserve
)
619 ctx
->ulog_auto_reserve
= auto_reserve
;
623 * operation_set_any_user_buffer -- set ulog_any_user_buffer value for context
626 operation_set_any_user_buffer(struct operation_context
*ctx
,
629 ctx
->ulog_any_user_buffer
= any_user_buffer
;
633 * operation_get_any_user_buffer -- get ulog_any_user_buffer value from context
636 operation_get_any_user_buffer(struct operation_context
*ctx
)
638 return ctx
->ulog_any_user_buffer
;
642 * operation_process_persistent_redo -- (internal) process using ulog
645 operation_process_persistent_redo(struct operation_context
*ctx
)
647 ASSERTeq(ctx
->pshadow_ops
.capacity
% CACHELINE_SIZE
, 0);
649 ulog_store(ctx
->ulog
, ctx
->pshadow_ops
.ulog
,
650 ctx
->pshadow_ops
.offset
, ctx
->ulog_base_nbytes
,
652 &ctx
->next
, ctx
->p_ops
);
654 ulog_process(ctx
->pshadow_ops
.ulog
, OBJ_OFF_IS_VALID_FROM_CTX
,
657 ulog_clobber(ctx
->ulog
, &ctx
->next
, ctx
->p_ops
);
661 * operation_process_persistent_undo -- (internal) process using ulog
664 operation_process_persistent_undo(struct operation_context
*ctx
)
666 ASSERTeq(ctx
->pshadow_ops
.capacity
% CACHELINE_SIZE
, 0);
668 ulog_process(ctx
->ulog
, OBJ_OFF_IS_VALID_FROM_CTX
, ctx
->p_ops
);
672 * operation_reserve -- (internal) reserves new capacity in persistent ulog log
675 operation_reserve(struct operation_context
*ctx
, size_t new_capacity
)
677 if (new_capacity
> ctx
->ulog_capacity
) {
678 if (ctx
->extend
== NULL
) {
679 ERR("no extend function present");
683 if (ulog_reserve(ctx
->ulog
,
684 ctx
->ulog_base_nbytes
,
685 ctx
->ulog_curr_gen_num
,
686 ctx
->ulog_auto_reserve
,
687 &new_capacity
, ctx
->extend
,
688 &ctx
->next
, ctx
->p_ops
) != 0)
690 ctx
->ulog_capacity
= new_capacity
;
697 * operation_init -- initializes runtime state of an operation
700 operation_init(struct operation_context
*ctx
)
702 struct operation_log
*plog
= &ctx
->pshadow_ops
;
703 struct operation_log
*tlog
= &ctx
->transient_ops
;
705 VALGRIND_ANNOTATE_NEW_MEMORY(ctx
, sizeof(*ctx
));
706 VALGRIND_ANNOTATE_NEW_MEMORY(tlog
->ulog
, sizeof(struct ulog
) +
708 VALGRIND_ANNOTATE_NEW_MEMORY(plog
->ulog
, sizeof(struct ulog
) +
712 VECQ_REINIT(&ctx
->merge_entries
);
714 ctx
->ulog_curr_offset
= 0;
715 ctx
->ulog_curr_capacity
= 0;
716 ctx
->ulog_curr_gen_num
= 0;
717 ctx
->ulog_curr
= NULL
;
718 ctx
->total_logged
= 0;
719 ctx
->ulog_auto_reserve
= 1;
720 ctx
->ulog_any_user_buffer
= 0;
724 * operation_start -- initializes and starts a new operation
727 operation_start(struct operation_context
*ctx
)
730 ASSERTeq(ctx
->state
, OPERATION_IDLE
);
731 ctx
->state
= OPERATION_IN_PROGRESS
;
735 operation_resume(struct operation_context
*ctx
)
737 operation_start(ctx
);
738 ctx
->total_logged
= ulog_base_nbytes(ctx
->ulog
);
742 * operation_cancel -- cancels a running operation
745 operation_cancel(struct operation_context
*ctx
)
747 ASSERTeq(ctx
->state
, OPERATION_IN_PROGRESS
);
748 ctx
->state
= OPERATION_IDLE
;
752 * operation_process -- processes registered operations
754 * The order of processing is important: persistent, transient.
755 * This is because the transient entries that reside on persistent memory might
756 * require write to a location that is currently occupied by a valid persistent
757 * state but becomes a transient state after operation is processed.
760 operation_process(struct operation_context
*ctx
)
763 * If there's exactly one persistent entry there's no need to involve
764 * the redo log. We can simply assign the value, the operation will be
767 int redo_process
= ctx
->type
== LOG_TYPE_REDO
&&
768 ctx
->pshadow_ops
.offset
!= 0;
770 ctx
->pshadow_ops
.offset
== sizeof(struct ulog_entry_val
)) {
771 struct ulog_entry_base
*e
= (struct ulog_entry_base
*)
772 ctx
->pshadow_ops
.ulog
->data
;
773 ulog_operation_type t
= ulog_entry_type(e
);
774 if (t
== ULOG_OPERATION_SET
|| t
== ULOG_OPERATION_AND
||
775 t
== ULOG_OPERATION_OR
) {
776 ulog_entry_apply(e
, 1, ctx
->p_ops
);
782 operation_process_persistent_redo(ctx
);
783 ctx
->state
= OPERATION_CLEANUP
;
784 } else if (ctx
->type
== LOG_TYPE_UNDO
&& ctx
->total_logged
!= 0) {
785 operation_process_persistent_undo(ctx
);
786 ctx
->state
= OPERATION_CLEANUP
;
789 /* process transient entries with transient memory ops */
790 if (ctx
->transient_ops
.offset
!= 0)
791 ulog_process(ctx
->transient_ops
.ulog
, NULL
, &ctx
->t_ops
);
795 * operation_finish -- finalizes the operation
798 operation_finish(struct operation_context
*ctx
, unsigned flags
)
800 ASSERTne(ctx
->state
, OPERATION_IDLE
);
802 if (ctx
->type
== LOG_TYPE_UNDO
&& ctx
->total_logged
!= 0)
803 ctx
->state
= OPERATION_CLEANUP
;
805 if (ctx
->ulog_any_user_buffer
) {
806 flags
|= ULOG_ANY_USER_BUFFER
;
807 ctx
->state
= OPERATION_CLEANUP
;
810 if (ctx
->state
!= OPERATION_CLEANUP
)
813 if (ctx
->type
== LOG_TYPE_UNDO
) {
814 int ret
= ulog_clobber_data(ctx
->ulog
,
815 ctx
->total_logged
, ctx
->ulog_base_nbytes
,
816 &ctx
->next
, ctx
->ulog_free
,
817 operation_user_buffer_remove
,
821 } else if (ctx
->type
== LOG_TYPE_REDO
) {
822 int ret
= ulog_free_next(ctx
->ulog
, ctx
->p_ops
,
823 ctx
->ulog_free
, operation_user_buffer_remove
,
829 /* clobbering shrunk the ulog */
830 ctx
->ulog_capacity
= ulog_capacity(ctx
->ulog
,
831 ctx
->ulog_base_nbytes
, ctx
->p_ops
);
832 VEC_CLEAR(&ctx
->next
);
833 ulog_rebuild_next_vec(ctx
->ulog
, &ctx
->next
, ctx
->p_ops
);
836 ctx
->state
= OPERATION_IDLE
;