/* * iterator.c * * Babeltrace Library * * Copyright 2010-2011 EfficiOS Inc. and Linux Foundation * * Author: Mathieu Desnoyers * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include #include #include #include #include #include #include #include #include #include #include static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream, const struct bt_iter_pos *begin_pos, unsigned long stream_id); struct stream_saved_pos { /* * Use file_stream pointer to check if the trace collection we * restore to match the one we saved from, for each stream. */ struct ctf_file_stream *file_stream; size_t cur_index; /* current index in packet index */ ssize_t offset; /* offset from base, in bits. EOF for end of file. */ uint64_t current_real_timestamp; uint64_t current_cycles_timestamp; }; struct bt_saved_pos { struct trace_collection *tc; GArray *stream_saved_pos; /* Contains struct stream_saved_pos */ }; static int stream_read_event(struct ctf_file_stream *sin) { int ret; ret = sin->pos.parent.event_cb(&sin->pos.parent, &sin->parent); if (ret == EOF) return EOF; else if (ret == EAGAIN) /* Stream is inactive for now (live reading). */ return EAGAIN; else if (ret) { fprintf(stderr, "[error] Reading event failed.\n"); return ret; } return 0; } /* * Return true if a < b, false otherwise. * If time stamps are exactly the same, compare by stream path. This * ensures we get the same result between runs on the same trace * collection on different environments. * The result will be random for memory-mapped traces since there is no * fixed path leading to those (they have empty path string). */ static int stream_compare(void *a, void *b) { struct ctf_file_stream *s_a = a, *s_b = b; if (s_a->parent.real_timestamp < s_b->parent.real_timestamp) { return 1; } else if (likely(s_a->parent.real_timestamp > s_b->parent.real_timestamp)) { return 0; } else { return strcmp(s_a->parent.path, s_b->parent.path); } } void bt_iter_free_pos(struct bt_iter_pos *iter_pos) { if (!iter_pos) return; if (iter_pos->type == BT_SEEK_RESTORE && iter_pos->u.restore) { if (iter_pos->u.restore->stream_saved_pos) { g_array_free( iter_pos->u.restore->stream_saved_pos, TRUE); } g_free(iter_pos->u.restore); } g_free(iter_pos); } /* * seek_file_stream_by_timestamp * * Browse a filestream by index, if an index contains the timestamp passed in * argument, seek inside the corresponding packet it until we find the event we * are looking for (either the exact timestamp or the event just after the * timestamp). * * Return 0 if the seek succeded, EOF if we didn't find any packet * containing the timestamp, or a positive integer for error. * * TODO: this should be turned into a binary search! It is currently * doing a linear search in the packets. This is a O(n) operation on a * very frequent code path. */ static int seek_file_stream_by_timestamp(struct ctf_file_stream *cfs, uint64_t timestamp) { struct ctf_stream_pos *stream_pos; struct packet_index *index; int i, ret; stream_pos = &cfs->pos; for (i = 0; i < stream_pos->packet_index->len; i++) { index = &g_array_index(stream_pos->packet_index, struct packet_index, i); if (index->ts_real.timestamp_end < timestamp) continue; stream_pos->packet_seek(&stream_pos->parent, i, SEEK_SET); ret = bt_packet_seek_get_error(); if (ret < 0) { return EOF; } do { ret = stream_read_event(cfs); } while (cfs->parent.real_timestamp < timestamp && ret == 0); /* Can return either EOF, 0, or error (> 0). */ return ret; } /* * Cannot find the timestamp within the stream packets, return * EOF. */ return EOF; } /* * seek_ctf_trace_by_timestamp : for each file stream, seek to the event with * the corresponding timestamp * * Return 0 on success. * If the timestamp is not part of any file stream, return EOF to inform the * user the timestamp is out of the scope. * On other errors, return positive value. */ static int seek_ctf_trace_by_timestamp(struct ctf_trace *tin, uint64_t timestamp, struct ptr_heap *stream_heap) { int i, j, ret; int found = 0; struct bt_trace_descriptor *td = &tin->parent; if (td->interval_set) { /* * If this trace has an interval selected, don't allow seeks * before the selected interval. We seek to the start of the * interval, thereby presenting a shorter "virtual" trace. */ timestamp = max(timestamp, td->interval_real.timestamp_begin); } /* for each stream_class */ for (i = 0; i < tin->streams->len; i++) { struct ctf_stream_declaration *stream_class; stream_class = g_ptr_array_index(tin->streams, i); if (!stream_class) continue; /* for each file_stream */ for (j = 0; j < stream_class->streams->len; j++) { struct ctf_stream_definition *stream; struct ctf_file_stream *cfs; stream = g_ptr_array_index(stream_class->streams, j); if (!stream) continue; cfs = container_of(stream, struct ctf_file_stream, parent); ret = seek_file_stream_by_timestamp(cfs, timestamp); if (ret == 0) { /* Add to heap */ ret = bt_heap_insert(stream_heap, cfs); if (ret) { /* Return positive error. */ return -ret; } found = 1; } else if (ret > 0) { /* * Error in seek (not EOF), failure. */ return ret; } /* on EOF just do not put stream into heap. */ } } return found ? 0 : EOF; } /* * Find timestamp of last event in the stream. * * Return value: 0 if OK, positive error value on error, EOF if no * events were found. */ static int find_max_timestamp_ctf_file_stream(struct ctf_file_stream *cfs, uint64_t *timestamp_end) { int ret, count = 0, i; uint64_t timestamp = 0; struct ctf_stream_pos *stream_pos; stream_pos = &cfs->pos; /* * We start by the last packet, and iterate backwards until we * either find at least one event, or we reach the first packet * (some packets can be empty). */ for (i = stream_pos->packet_index->len - 1; i >= 0; i--) { stream_pos->packet_seek(&stream_pos->parent, i, SEEK_SET); ret = bt_packet_seek_get_error(); if (ret < 0) { return EOF; } count = 0; /* read each event until we reach the end of the stream */ do { ret = stream_read_event(cfs); if (ret == 0) { count++; timestamp = cfs->parent.real_timestamp; } } while (ret == 0); /* Error */ if (ret > 0) goto end; assert(ret == EOF); if (count) break; } if (count) { *timestamp_end = timestamp; ret = 0; } else { /* Return EOF if no events were found */ ret = EOF; } end: return ret; } /* * Find the stream within a stream class that contains the event with * the largest timestamp, and save that timestamp. * * Return 0 if OK, EOF if no events were found in the streams, or * positive value on error. */ static int find_max_timestamp_ctf_stream_class( struct ctf_stream_declaration *stream_class, struct ctf_file_stream **cfsp, uint64_t *max_timestamp) { int ret = EOF, i, found = 0; for (i = 0; i < stream_class->streams->len; i++) { struct ctf_stream_definition *stream; struct ctf_file_stream *cfs; uint64_t current_max_ts = 0; stream = g_ptr_array_index(stream_class->streams, i); if (!stream) continue; cfs = container_of(stream, struct ctf_file_stream, parent); ret = find_max_timestamp_ctf_file_stream(cfs, ¤t_max_ts); if (ret == EOF) continue; if (ret != 0) break; if (current_max_ts >= *max_timestamp) { *max_timestamp = current_max_ts; *cfsp = cfs; found = 1; } } assert(ret >= 0 || ret == EOF); if (found) { return 0; } return ret; } /* * seek_last_ctf_trace_collection: seek trace collection to last event. * * Return 0 if OK, EOF if no events were found, or positive error value * on error. */ static int seek_last_ctf_trace_collection(struct trace_collection *tc, struct ctf_file_stream **cfsp) { int i, j, ret; int found = 0; uint64_t max_timestamp = 0; if (!tc) return 1; /* For each trace in the trace_collection */ for (i = 0; i < tc->array->len; i++) { struct ctf_trace *tin; struct bt_trace_descriptor *td_read; td_read = g_ptr_array_index(tc->array, i); if (!td_read) continue; tin = container_of(td_read, struct ctf_trace, parent); /* For each stream_class in the trace */ for (j = 0; j < tin->streams->len; j++) { struct ctf_stream_declaration *stream_class; stream_class = g_ptr_array_index(tin->streams, j); if (!stream_class) continue; ret = find_max_timestamp_ctf_stream_class(stream_class, cfsp, &max_timestamp); if (ret > 0) goto end; if (ret == 0) found = 1; assert(ret == EOF || ret == 0); } } /* * Now we know in which file stream the last event is located, * and we know its timestamp. */ if (!found) { ret = EOF; } else { ret = seek_file_stream_by_timestamp(*cfsp, max_timestamp); assert(ret == 0); } end: return ret; } int bt_iter_set_pos(struct bt_iter *iter, const struct bt_iter_pos *iter_pos) { struct trace_collection *tc; int i, ret; if (!iter || !iter_pos) return -EINVAL; switch (iter_pos->type) { case BT_SEEK_RESTORE: if (!iter_pos->u.restore) return -EINVAL; bt_heap_free(iter->stream_heap); ret = bt_heap_init(iter->stream_heap, 0, stream_compare); if (ret < 0) goto error_heap_init; for (i = 0; i < iter_pos->u.restore->stream_saved_pos->len; i++) { struct stream_saved_pos *saved_pos; struct ctf_stream_pos *stream_pos; struct ctf_stream_definition *stream; saved_pos = &g_array_index( iter_pos->u.restore->stream_saved_pos, struct stream_saved_pos, i); stream = &saved_pos->file_stream->parent; stream_pos = &saved_pos->file_stream->pos; stream_pos->packet_seek(&stream_pos->parent, saved_pos->cur_index, SEEK_SET); /* * the timestamp needs to be restored after * packet_seek, because this function resets * the timestamp to the beginning of the packet */ stream->real_timestamp = saved_pos->current_real_timestamp; stream->cycles_timestamp = saved_pos->current_cycles_timestamp; stream_pos->offset = saved_pos->offset; stream_pos->last_offset = LAST_OFFSET_POISON; stream->current.real.begin = 0; stream->current.real.end = 0; stream->current.cycles.begin = 0; stream->current.cycles.end = 0; stream->prev.real.begin = 0; stream->prev.real.end = 0; stream->prev.cycles.begin = 0; stream->prev.cycles.end = 0; printf_debug("restored to cur_index = %" PRId64 " and " "offset = %" PRId64 ", timestamp = %" PRIu64 "\n", stream_pos->cur_index, stream_pos->offset, stream->real_timestamp); ret = stream_read_event(saved_pos->file_stream); if (ret != 0) { goto error; } /* Add to heap */ ret = bt_heap_insert(iter->stream_heap, saved_pos->file_stream); if (ret) goto error; } return 0; case BT_SEEK_TIME: tc = iter->ctx->tc; bt_heap_free(iter->stream_heap); ret = bt_heap_init(iter->stream_heap, 0, stream_compare); if (ret < 0) goto error_heap_init; /* for each trace in the trace_collection */ for (i = 0; i < tc->array->len; i++) { struct ctf_trace *tin; struct bt_trace_descriptor *td_read; td_read = g_ptr_array_index(tc->array, i); if (!td_read) continue; tin = container_of(td_read, struct ctf_trace, parent); ret = seek_ctf_trace_by_timestamp(tin, iter_pos->u.seek_time, iter->stream_heap); /* * Positive errors are failure. Negative value * is EOF (for which we continue with other * traces). 0 is success. Note: on EOF, it just * means that no stream has been added to the * iterator for that trace, which is fine. */ if (ret != 0 && ret != EOF) goto error; } return 0; case BT_SEEK_BEGIN: tc = iter->ctx->tc; bt_heap_free(iter->stream_heap); ret = bt_heap_init(iter->stream_heap, 0, stream_compare); if (ret < 0) goto error_heap_init; for (i = 0; i < tc->array->len; i++) { struct ctf_trace *tin; struct bt_trace_descriptor *td_read; int stream_id; td_read = g_ptr_array_index(tc->array, i); if (!td_read) continue; tin = container_of(td_read, struct ctf_trace, parent); /* Populate heap with each stream */ for (stream_id = 0; stream_id < tin->streams->len; stream_id++) { struct ctf_stream_declaration *stream; int filenr; stream = g_ptr_array_index(tin->streams, stream_id); if (!stream) continue; for (filenr = 0; filenr < stream->streams->len; filenr++) { struct ctf_file_stream *file_stream; file_stream = g_ptr_array_index( stream->streams, filenr); if (!file_stream) continue; ret = babeltrace_filestream_seek( file_stream, iter_pos, stream_id); if (ret != 0 && ret != EOF) { goto error; } if (ret == EOF) { /* Do not add EOF streams */ continue; } ret = bt_heap_insert(iter->stream_heap, file_stream); if (ret) goto error; } } } break; case BT_SEEK_LAST: { struct ctf_file_stream *cfs = NULL; tc = iter->ctx->tc; ret = seek_last_ctf_trace_collection(tc, &cfs); if (ret != 0 || !cfs) goto error; /* remove all streams from the heap */ bt_heap_free(iter->stream_heap); /* Create a new empty heap */ ret = bt_heap_init(iter->stream_heap, 0, stream_compare); if (ret < 0) goto error; /* Insert the stream that contains the last event */ ret = bt_heap_insert(iter->stream_heap, cfs); if (ret) goto error; break; } default: /* not implemented */ return -EINVAL; } return 0; error: bt_heap_free(iter->stream_heap); error_heap_init: if (bt_heap_init(iter->stream_heap, 0, stream_compare) < 0) { bt_heap_free(iter->stream_heap); g_free(iter->stream_heap); iter->stream_heap = NULL; ret = -ENOMEM; } return ret; } struct bt_iter_pos *bt_iter_get_pos(struct bt_iter *iter) { struct bt_iter_pos *pos; struct trace_collection *tc; struct ctf_file_stream *file_stream = NULL, *removed; struct ptr_heap iter_heap_copy; int ret; if (!iter) return NULL; tc = iter->ctx->tc; pos = g_new0(struct bt_iter_pos, 1); pos->type = BT_SEEK_RESTORE; pos->u.restore = g_new0(struct bt_saved_pos, 1); pos->u.restore->tc = tc; pos->u.restore->stream_saved_pos = g_array_new(FALSE, TRUE, sizeof(struct stream_saved_pos)); if (!pos->u.restore->stream_saved_pos) goto error; ret = bt_heap_copy(&iter_heap_copy, iter->stream_heap); if (ret < 0) goto error_heap; /* iterate over each stream in the heap */ file_stream = bt_heap_maximum(&iter_heap_copy); while (file_stream != NULL) { struct stream_saved_pos saved_pos; assert(file_stream->pos.last_offset != LAST_OFFSET_POISON); saved_pos.offset = file_stream->pos.last_offset; saved_pos.file_stream = file_stream; saved_pos.cur_index = file_stream->pos.cur_index; saved_pos.current_real_timestamp = file_stream->parent.real_timestamp; saved_pos.current_cycles_timestamp = file_stream->parent.cycles_timestamp; g_array_append_val( pos->u.restore->stream_saved_pos, saved_pos); printf_debug("stream : %" PRIu64 ", cur_index : %zd, " "offset : %zd, " "timestamp = %" PRIu64 "\n", file_stream->parent.stream_id, saved_pos.cur_index, saved_pos.offset, saved_pos.current_real_timestamp); /* remove the stream from the heap copy */ removed = bt_heap_remove(&iter_heap_copy); assert(removed == file_stream); file_stream = bt_heap_maximum(&iter_heap_copy); } bt_heap_free(&iter_heap_copy); return pos; error_heap: g_array_free(pos->u.restore->stream_saved_pos, TRUE); error: g_free(pos); return NULL; } struct bt_iter_pos *bt_iter_create_time_pos(struct bt_iter *unused, uint64_t timestamp) { struct bt_iter_pos *pos; pos = g_new0(struct bt_iter_pos, 1); pos->type = BT_SEEK_TIME; pos->u.seek_time = timestamp; return pos; } /* * babeltrace_filestream_seek: seek a filestream to given position. * * The stream_id parameter is only useful for BT_SEEK_RESTORE. */ static int babeltrace_filestream_seek(struct ctf_file_stream *file_stream, const struct bt_iter_pos *begin_pos, unsigned long stream_id) { int ret = 0; if (!file_stream || !begin_pos) return -EINVAL; switch (begin_pos->type) { case BT_SEEK_CUR: /* * just insert into the heap we should already know * the timestamps */ break; case BT_SEEK_BEGIN: file_stream->pos.packet_seek(&file_stream->pos.parent, 0, SEEK_SET); ret = stream_read_event(file_stream); break; case BT_SEEK_TIME: case BT_SEEK_RESTORE: default: assert(0); /* Not yet defined */ } return ret; } int bt_iter_add_trace(struct bt_iter *iter, struct bt_trace_descriptor *td_read) { struct ctf_trace *tin; int stream_id, ret = 0; tin = container_of(td_read, struct ctf_trace, parent); /* Populate heap with each stream */ for (stream_id = 0; stream_id < tin->streams->len; stream_id++) { struct ctf_stream_declaration *stream; int filenr; stream = g_ptr_array_index(tin->streams, stream_id); if (!stream) continue; for (filenr = 0; filenr < stream->streams->len; filenr++) { struct ctf_file_stream *file_stream; struct bt_iter_pos pos; file_stream = g_ptr_array_index(stream->streams, filenr); if (!file_stream) continue; pos.type = BT_SEEK_BEGIN; ret = babeltrace_filestream_seek(file_stream, &pos, stream_id); if (ret == EOF) { ret = 0; continue; } else if (ret != 0 && ret != EAGAIN) { goto error; } /* Add to heap */ ret = bt_heap_insert(iter->stream_heap, file_stream); if (ret) goto error; } } error: return ret; } int bt_iter_init(struct bt_iter *iter, struct bt_context *ctx, const struct bt_iter_pos *begin_pos, const struct bt_iter_pos *end_pos) { int i; int ret = 0; if (!iter || !ctx || !ctx->tc || !ctx->tc->array) return -EINVAL; if (ctx->current_iterator) { ret = -1; goto error_ctx; } iter->stream_heap = g_new(struct ptr_heap, 1); iter->end_pos = end_pos; bt_context_get(ctx); iter->ctx = ctx; ret = bt_heap_init(iter->stream_heap, 0, stream_compare); if (ret < 0) goto error_heap_init; for (i = 0; i < ctx->tc->array->len; i++) { struct bt_trace_descriptor *td_read; td_read = g_ptr_array_index(ctx->tc->array, i); if (!td_read) continue; ret = bt_iter_add_trace(iter, td_read); if (ret < 0) goto error; } ctx->current_iterator = iter; if (begin_pos && begin_pos->type != BT_SEEK_BEGIN) { ret = bt_iter_set_pos(iter, begin_pos); if (ret) { goto error; } } return ret; error: bt_heap_free(iter->stream_heap); error_heap_init: bt_context_put(ctx); iter->ctx = NULL; g_free(iter->stream_heap); iter->stream_heap = NULL; error_ctx: return ret; } struct bt_iter *bt_iter_create(struct bt_context *ctx, const struct bt_iter_pos *begin_pos, const struct bt_iter_pos *end_pos) { struct bt_iter *iter; int ret; if (!ctx) return NULL; iter = g_new0(struct bt_iter, 1); ret = bt_iter_init(iter, ctx, begin_pos, end_pos); if (ret) { g_free(iter); return NULL; } return iter; } void bt_iter_fini(struct bt_iter *iter) { assert(iter); if (iter->stream_heap) { bt_heap_free(iter->stream_heap); g_free(iter->stream_heap); } iter->ctx->current_iterator = NULL; bt_context_put(iter->ctx); iter->ctx = NULL; } void bt_iter_destroy(struct bt_iter *iter) { assert(iter); bt_iter_fini(iter); g_free(iter); } int bt_iter_next(struct bt_iter *iter) { struct ctf_file_stream *file_stream, *removed; int ret; bool event_outside_interval = false; if (!iter) return -EINVAL; file_stream = bt_heap_maximum(iter->stream_heap); if (!file_stream) { /* end of file for all streams */ ret = 0; goto end; } ret = stream_read_event(file_stream); if (file_stream->pos.parent.trace && file_stream->pos.parent.trace->interval_set) { event_outside_interval = file_stream->parent.real_timestamp > file_stream->pos.parent.trace->interval_real.timestamp_end; } if (ret == EOF || event_outside_interval) { removed = bt_heap_remove(iter->stream_heap); assert(removed == file_stream); ret = 0; goto end; } else if (ret == EAGAIN) { /* * Live streaming: the stream is inactive for now, we * just updated the timestamp_end to skip over this * stream up to a certain point in time. * * Since we can't guarantee that a stream will ever have * any activity, we can't rely on the fact that * bt_iter_next will be called for each stream and deal * with inactive streams. So instead, we return 0 here * to the caller and let the read API handle the * retry case. */ ret = 0; goto reinsert; } else if (ret == -ERANGE) { removed = bt_heap_remove(iter->stream_heap); assert(removed == file_stream); goto end; } else if (ret) { goto end; } reinsert: /* Reinsert the file stream into the heap, and rebalance. */ removed = bt_heap_replace_max(iter->stream_heap, file_stream); assert(removed == file_stream); end: return ret; }