In Pravega Metrics Framework, we use Dropwizard Metrics as the underlying library, and provide our own API to make it easier to use.
1. Metrics interfaces and examples usage¶
There are four basic interfaces: StatsProvider, StatsLogger (short for Statistics Logger), OpStatsLogger (short for Operation Statistics Logger, and it is included in StatsLogger) and Dynamic Logger. StatsProvider provides us the whole Metric service; StatsLogger is the place at which we register and get required Metrics (Counter/Gauge/Timer/Histograms); while OpStatsLogger is a sub-metric for complex ones (Timer/Histograms).
1.1. Metrics Service Provider — Interface StatsProvider¶
The starting point of Pravega Metric framework is the StatsProvider interface, it provides start and stop method for Metric service. Regarding the reporters, currently we have support for CSV reporter and StatsD reporter.
public interface StatsProvider { void start(MetricsConfig conf); void close(); StatsLogger createStatsLogger(String scope); DynamicLogger createDynamicLogger(); }
- start(): Initializes MetricRegistry and reporters for our Metrics service.
- close(): Shutdown of Metrics service.
- createStatsLogger(): Creates and returns a StatsLogger instance, which is used to retrieve a metric and do metric insertion and collection in Pravega code.
- createDynamicLogger(): Create a dynamic logger.
1.2. Metric Logger — interface StatsLogger¶
Using this interface we can register required metrics for simple types like Counter and Gauge and some complex statistics type of Metric OpStatsLogger, through which we provide Timer and Histogram.
public interface StatsLogger { OpStatsLogger createStats(String name); Counter createCounter(String name); Meter createMeter(String name); <T extends Number> Gauge registerGauge(String name, Supplier<T> value); StatsLogger createScopeLogger(String scope); }
- createStats(): Register and get a OpStatsLogger, which is used for complex type of metrics.
- createCounter(): Register and get a Counter metric.
- createMeter(): Create and register a Meter metric.
- registerGauge(): Register a Gauge metric.
- createScopeLogger(): Create the stats logger under given scope name.
1.3. Metric Sub Logger — OpStatsLogger¶
OpStatsLogger provides complex statistics type of Metric, usually it is used in operations such as CreateSegment, ReadSegment, we could use it to record the number of operation, time/duration of each operation.
public interface OpStatsLogger { void reportSuccessEvent(Duration duration); void reportFailEvent(Duration duration); void reportSuccessValue(long value); void reportFailValue(long value); OpStatsData toOpStatsData(); void clear(); }
- reportSuccessEvent() : Used to track Timer of a successful operation and will record the latency in Nanoseconds in required metric.
- reportFailEvent() : Used to track Timer of a failed operation and will record the latency in Nanoseconds in required metric.
- reportSuccessValue() : Used to track Histogram of a success value.
- reportFailValue() : Used to track Histogram of a failed value.
- toOpStatsData() : Used to support JMX exports and inner test.
- clear : Used to clear stats for this operation.
1.4 Metric Logger — interface DynamicLogger¶
A simple interface that only exposes simple type metrics: Counter/Gauge/Meter.
public interface DynamicLogger { void incCounterValue(String name, long delta); void updateCounterValue(String name, long value); void freezeCounter(String name); <T extends Number> void reportGaugeValue(String name, T value); void freezeGaugeValue(String name); void recordMeterEvents(String name, long number); }
- incCounterValue() : Increase Counter with given value.
- updateCounterValue() : Updates the counter with given value.
- freezeCounter() : Notifies that the counter will not be updated.
- reportGaugeValue() : Reports Gauge value.
- freezeGaugeValue() : Notifies that the gauge value will not be updated.
- recordMeterEvents() : Record the occurrence of a given number of events in Meter.
2. Example for starting a Metric service¶
This example is from file It starts Pravega SegmentStore service and a Metrics service is started as a sub service.
public final class ServiceStarter { ... private StatsProvider statsProvider; ... private void start() { ..."Initializing metrics provider ..."); MetricsConfig config = MetricsConfig.builder() .with(MetricsConfig.METRICS_PREFIX, "metrics-prefix") .build(); MetricsProvider.initialize(metricsConfig); statsProvider = MetricsProvider.getMetricsProvider(); statsProvider.start(); // Here metric service is started as a sub-service ... } private void shutdown() { ... if (this.statsProvider != null) { statsProvider.close(); statsProvider = null;"Metrics statsProvider is now closed."); } ... } ... }
2.1. Example for Dynamic Counter and OpStatsLogger(Timer)¶
This is an example from In this class, we registered two metrics: One timer (createStreamSegment), one dynamic counter (segmentReadBytes).
public class PravegaRequestProcessor extends FailingRequestProcessor implements RequestProcessor { private static final StatsLogger STATS_LOGGER = MetricsProvider.createStatsLogger("segmentstore"); private static final DynamicLogger DYNAMIC_LOGGER = MetricsProvider.getDynamicLogger(); private final OpStatsLogger createStreamSegment = STATS_LOGGER.createStats(SEGMENT_CREATE_LATENCY); private void handleReadResult(ReadSegment request, ReadResult result) { String segment = request.getSegment(); ArrayList<ReadResultEntryContents> cachedEntries = new ArrayList<>(); ReadResultEntry nonCachedEntry = collectCachedEntries(request.getOffset(), result, cachedEntries); boolean truncated = nonCachedEntry != null && nonCachedEntry.getType() == Truncated; boolean endOfSegment = nonCachedEntry != null && nonCachedEntry.getType() == EndOfStreamSegment; boolean atTail = nonCachedEntry != null && nonCachedEntry.getType() == Future; if (!cachedEntries.isEmpty() || endOfSegment) { // We managed to collect some data. Send it. ByteBuffer data = copyData(cachedEntries); SegmentRead reply = new SegmentRead(segment, request.getOffset(), atTail, endOfSegment, data); connection.send(reply); DYNAMIC_LOGGER.incCounterValue(nameFromSegment(SEGMENT_READ_BYTES, segment), reply.getData().array().length); // Increasing the counter value for the counter metric SEGMENT_READ_BYTES } else if (truncated) { // We didn't collect any data, instead we determined that the current read offset was truncated. // Determine the current Start Offset and send that back. segmentStore.getStreamSegmentInfo(segment, false, TIMEOUT) .thenAccept(info -> connection.send(new SegmentIsTruncated(nonCachedEntry.getStreamSegmentOffset(), segment, info.getStartOffset()))) .exceptionally(e -> handleException(nonCachedEntry.getStreamSegmentOffset(), segment, "Read segment", e)); } else { Preconditions.checkState(nonCachedEntry != null, "No ReadResultEntries returned from read!?"); nonCachedEntry.requestContent(TIMEOUT); nonCachedEntry.getContent() .thenAccept(contents -> { ByteBuffer data = copyData(Collections.singletonList(contents)); SegmentRead reply = new SegmentRead(segment, nonCachedEntry.getStreamSegmentOffset(), false, endOfSegment, data); connection.send(reply); DYNAMIC_LOGGER.incCounterValue(nameFromSegment(SEGMENT_READ_BYTES, segment), reply.getData().array().length); // Increasing the counter value for the counter metric SEGMENT_READ_BYTES }) .exceptionally(e -> { if (Exceptions.unwrap(e) instanceof StreamSegmentTruncatedException) { // The Segment may have been truncated in Storage after we got this entry but before we managed // to make a read. In that case, send the appropriate error back. connection.send(new SegmentIsTruncated(nonCachedEntry.getStreamSegmentOffset(), segment, nonCachedEntry.getStreamSegmentOffset())); } else { handleException(nonCachedEntry.getStreamSegmentOffset(), segment, "Read segment", e); } return null; }) .exceptionally(e -> handleException(nonCachedEntry.getStreamSegmentOffset(), segment, "Read segment", e)); } } @Override public void createSegment(CreateSegment createStreamsSegment) { Timer timer = new Timer(); Collection<AttributeUpdate> attributes = Arrays.asList( new AttributeUpdate(SCALE_POLICY_TYPE, AttributeUpdateType.Replace, ((Byte) createStreamsSegment.getScaleType()).longValue()), new AttributeUpdate(SCALE_POLICY_RATE, AttributeUpdateType.Replace, ((Integer) createStreamsSegment.getTargetRate()).longValue()), new AttributeUpdate(CREATION_TIME, AttributeUpdateType.None, System.currentTimeMillis()) ); if (!verifyToken(createStreamsSegment.getSegment(), createStreamsSegment.getRequestId(), createStreamsSegment.getDelegationToken(), READ_UPDATE, "Create Segment")) { return; } log.debug("Creating stream segment {}", createStreamsSegment); segmentStore.createStreamSegment(createStreamsSegment.getSegment(), attributes, TIMEOUT) .thenAccept(v -> { createStreamSegment.reportSuccessEvent(timer.getElapsed()); // Reporting success event for Timer metric createStreamSegment connection.send(new SegmentCreated(createStreamsSegment.getRequestId(), createStreamsSegment.getSegment())); }) .whenComplete((res, e) -> { if (e == null) { if (statsRecorder != null) { statsRecorder.createSegment(createStreamsSegment.getSegment(), createStreamsSegment.getScaleType(), createStreamsSegment.getTargetRate()); } } else { createStreamSegment.reportFailEvent(timer.getElapsed()); // Reporting fail event for Timer metric createStreamSegment handleException(createStreamsSegment.getRequestId(), createStreamsSegment.getSegment(), "Create segment", e); } }); } … }
- Get a StatsLogger from MetricsProvider:
StatsLogger STATS_LOGGER = MetricsProvider.getStatsLogger();
- Register all the desired metrics through StatsLogger:
- Use these metrics within code at appropriate place where you would like to collect and record the values.
Here CREATE_STREAM_SEGMENT is the name of this metric, and CREATE_STREAM_SEGMENT is the name of our Metrics logger, it will track operations of createSegment, and we will get the time of each createSegment operation happened, how long each operation takes, and other numbers computed based on them.
2.1.1 Output example of OpStatsLogger¶
An example output of OpStatsLogger CREATE_SEGMENT reported through CSV reporter:
$ cat CREATE_STREAM_SEGMENT.csv t,count,max,mean,min,stddev,p50,p75,p95,p98,p99,p999,mean_rate,m1_rate,m5_rate,m15_rate,rate_unit,duration_unit 1480928806,1,8.973952,8.973952,8.973952,0.000000,8.973952,8.973952,8.973952,8.973952,8.973952,8.973952,0.036761,0.143306,0.187101,0.195605,calls/second,millisecond
2.2. Example for Dynamic Gauge and OpStatsLogger(Histogram)¶
This is an example from In this class, we report a Dynamic Gauge which represents the open transactions and one histogram (CREATE_STREAM).
public abstract class AbstractStreamMetadataStore implements StreamMetadataStore { ... private static final OpStatsLogger CREATE_STREAM = STATS_LOGGER.createStats(MetricsNames.CREATE_STREAM); // get stats logger from MetricsProvider private static final DynamicLogger DYNAMIC_LOGGER = MetricsProvider.getDynamicLogger(); // get dynamic logger from MetricsProvider ... @Override public CompletableFuture<CreateStreamResponse> createStream(final String scope, final String name, final StreamConfiguration configuration, final long createTimestamp, final OperationContext context, final Executor executor) { return withCompletion(getStream(scope, name, context).create(configuration, createTimestamp), executor) .thenApply(result -> { if (result.getStatus().equals(CreateStreamResponse.CreateStatus.NEW)) { CREATE_STREAM.reportSuccessValue(1); // Report success event for histogram metric CREATE_STREAM DYNAMIC_LOGGER.reportGaugeValue(nameFromStream(OPEN_TRANSACTIONS, scope, name), 0); // Report gauge value for Dynamic Gauge metric OPEN_TRANSACTIONS } return result; }); } ... }
2.3 Example for Dynamic Meter¶
This is an example from io.pravega.segmentstore.server.SegmentStoreMetrics. In this class, we report a Dynamic Meter which represents the segments created.
public final class SegmentStoreMetrics { private static final DynamicLogger DYNAMIC_LOGGER = MetricsProvider.getDynamicLogger(); public void createSegment() { DYNAMIC_LOGGER.recordMeterEvents(this.createSegmentCount, 1); // Record event for meter metric createSegmentCount } }
3. Metric reporter and Configurations¶
Reporters are the way through which we export all the measurements being made by the metrics. We currently provide StatsD and CSV output. It is not difficult to add new output formats, such as JMX/SLF4J. CSV reporter will export each Metric out into one file. StatsD reporter will export Metrics through UDP/TCP to a StatsD server. The reporter could be configured through MetricsConfig.
public class MetricsConfig extends ComponentConfig { //region Members public static final String COMPONENT_CODE = "metrics"; public final static String ENABLE_STATISTICS = "enableStatistics"; //enable metric, or will report nothing, default = true, public final static Property<Long> DYNAMIC_CACHE_SIZE = "dynamicCacheSize"; //dynamic cache size , default = 10000000L public final static Property<Integer> DYNAMIC_CACHE_EVICTION_DURATION_MINUTES = "dynamicCacheEvictionDurationMs"; //dynamic cache evcition duration, default = 30 public final static String OUTPUT_FREQUENCY = "statsOutputFrequencySeconds"; //reporter output frequency, default = 60 public final static String METRICS_PREFIX = "metricsPrefix"; //Metrics Prefix, default = "pravega" public final static String CSV_ENDPOINT = "csvEndpoint"; // CSV reporter output dir, default = "/tmp/csv" public final static String STATSD_HOST = "statsDHost"; // StatsD server host for the reporting, default = "localhost" public final static String STATSD_PORT = "statsDPort"; // StatsD server port, default = "8125" public final static Property<String> GRAPHITE_HOST = "graphiteHost"; // Graphite server host for the reporting, default = "localhost" public final static Property<Integer> GRAPHITE_PORT = "graphitePort"; // Graphite server port, default = "2003" public final static Property<String> JMX_DOMAIN = "jmxDomain"; // JMX domain for the reporting, default = "domain" public final static Property<String> GANGLIA_HOST = "gangliaHost"; // Ganglia server host for the reporting, default = "localhost" public final static Property<Integer> GANGLIA_PORT = "gangliaPort"; // Ganglia server port, default = "8649" public final static Property<Boolean> ENABLE_CSV_REPORTER = "enableCSVReporter"; // Enables CSV reporter, default = true public final static Property<Boolean> ENABLE_STATSD_REPORTER = "enableStatsdReporter"; // Enables StatsD reporter, default = true public final static Property<Boolean> ENABLE_GRAPHITE_REPORTER = "enableGraphiteReporter"; // Enables Graphite reporter, default = false public final static Property<Boolean> ENABLE_JMX_REPORTER = "enableJMXReporter"; // Enables JMX reporter, default = false public final static Property<Boolean> ENABLE_GANGLIA_REPORTER ="enableGangliaReporter"; // Enables Ganglia reporter, default = false public final static Property<Boolean> ENABLE_CONSOLE_REPORTER = "enableConsoleReporter"; // Enables Console reporter, default = false ... }
4. Steps to add your own Metrics¶
- Step 1. When start a segment store/controller service, start a Metrics service as a sub service. Reference above example in ServiceStarter.start()
public class AddMetrics { statsProvider = MetricsProvider.getProvider(); statsProvider.start(metricsConfig); // Step 2. In the class that need Metrics: get StatsLogger through MetricsProvider; then get Metrics from StatsLogger; at last report it at the right place. static final StatsLogger STATS_LOGGER = MetricsProvider.getStatsLogger(); // <--- 1 static final DynamicLogger DYNAMIC_LOGGER = MetricsProvider.getDynamicLogger(); static class Metrics { // < --- 2 //Using Stats Logger static final String CREATE_STREAM = "stream_created"; static final OpStatsLogger CREATE_STREAM = STATS_LOGGER.createStats(CREATE_STREAM); static final String SEGMENT_CREATE_LATENCY = "segment_create_latency_ms"; static final OpStatsLogger createStreamSegment = STATS_LOGGER.createStats(SEGMENT_CREATE_LATENCY); //Using Dynamic Logger static final String SEGMENT_READ_BYTES = "segmentstore.segment_read_bytes"; //Dynamic Counter static final String OPEN_TRANSACTIONS = "controller.transactions_opened"; //Dynamic Gauge ... } //to report success or increment Metrics.CREATE_STREAM.reportSuccessValue(1); // < --- 3 Metrics.createStreamSegment.reportSuccessEvent(timer.getElapsed()); DYNAMIC_LOGGER.incCounterValue(Metrics.SEGMENT_READ_BYTES, 1); DYNAMIC_LOGGER.reportGaugeValue(OPEN_TRANSACTIONS, 0); //in case of failure Metrics.CREATE_STREAM.reportFailValue(1); Metrics.createStreamSegment.reportFailEvent(timer.getElapsed()); //to freeze DYNAMIC_LOGGER.freezeCounter(Metrics.SEGMENT_READ_BYTES); DYNAMIC_LOGGER.freezeGaugeValue(OPEN_TRANSACTIONS); }
5. Available Metrics and their names¶
Metrics in Segment Store Service.
segmentstore.segment_read_latency_ms segmentstore.segment_write_latency_ms segmentstore.segment_create_latency_ms //Dynamic segmentstore.segment_read_bytes.$scope.$stream.$segment.Counter segmentstore.segment_write_bytes.$scope.$stream.$segment.Counter
Tier-2 Storage Metrics: Read/Write Latency, Read/Write Rate.
hdfs.tier2_read_latency_ms hdfs.tier2_write_latency_ms //Dynamic hdfs.tire2_read_bytes.Counter hdfs.tier2_write_bytes.Counter
Cache Metrics
rocksdb.cache_insert_latency rocksdb.cache_get_latency
Tier-1 DurableDataLog Metrics: Read/Write Latency, Read/Write Rate.
bookkeeper.bookkeeper_total_write_latency bookkeeper.bookkeeper_write_latency bookkeeper.bookkeeper_write_bytes bookkeeper.bookkeeper_write_queue_size bookkeeper.bookkeeper_write_queue_fill //Dynamic bookkeeper.bookkeeper_ledger_count.$containerId.Gauge
Container-specific metrics.
process_operations_latency.$containerId process_operations_batch_size.$containerId operation_queue_size.$containerId operation_processor_in_flight.$containerId operation_queue_wait_time.$containerId operation_processor_delay_ms.$containerId operation_commit_latency_ms.$containerId operation_latency_ms.$containerId operation_commit_metadata_txn_count.$containerId operation_commit_memory_latency_ms.$containerId operation_log_size.$containerId //Dynamic container_append_count.$containerId.Meter container_append_offset_count.$containerId.Meter container_update_attributes_count.$containerId.Meter container_get_attributes_count.$containerId.Meter container_read_count.$containerId.Meter container_get_info_count.$containerId.Meter container_create_segment_count.$containerId.Meter container_delete_segment_count.$containerId.Meter container_merge_segment_count.$containerId.Meter container_seal_count.$containerId.Meter container_truncate_count.$containerId.Meter active_segments.$containerId.Gauge
Metrics in Controller.
controller.stream_created controller.stream_sealed controller.stream_deleted //Dynamic controller.transactions_created.$scope.$stream.Counter controller.transactions_committed.$scope.$stream.Counter controller.transactions_aborted.$scope.$stream.Counter controller.transactions_opened.$scope.$stream.Gauge controller.transactions_timedout.$scope.$stream.Counter controller.segments_count.$scope.$stream.Gauge controller.$scope.$stream.segments_splits.$scope.$stream.Counter controller.$scope.$stream.segments_merges.$scope.$stream.Counter controller.retention_frequency.$scope.$stream.Meter controller.truncated_size.$scope.$stream.Gauge
General Metrics.
cache_size_bytes cache_gen thread_pool_queue_size thread_pool_active_threads