chore(tvix/tracing): bump to opentelemetry 0.28
According to https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-2612865575, we can now remove the spawn_blocking calls and channel logic for flushing / shutdown. Change-Id: I0fabc1fe0968a736e7d44dfde41f4a6adac5e272 Reviewed-on: https://cl.tvl.fyi/c/depot/+/13154 Tested-by: BuildkiteCI Reviewed-by: edef <edef@edef.eu>
This commit is contained in:
parent
9d5a556533
commit
661fe698a1
10 changed files with 235 additions and 245 deletions
|
|
@ -13,7 +13,7 @@ thiserror.workspace = true
|
|||
|
||||
tracing-opentelemetry = { workspace = true, optional = true }
|
||||
opentelemetry = { workspace = true, optional = true }
|
||||
opentelemetry-otlp = { workspace = true, optional = true }
|
||||
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic"], optional = true }
|
||||
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"], optional = true }
|
||||
tracing-tracy = { workspace = true, features = ["flush-on-exit"], optional = true }
|
||||
opentelemetry-http = { workspace = true, optional = true }
|
||||
|
|
@ -35,7 +35,7 @@ otlp = [
|
|||
"dep:opentelemetry_sdk",
|
||||
"dep:opentelemetry-http",
|
||||
"dep:opentelemetry-semantic-conventions",
|
||||
"reqwest-tracing?/opentelemetry_0_27",
|
||||
"reqwest-tracing?/opentelemetry_0_28",
|
||||
]
|
||||
tracy = [
|
||||
"dep:tracing-tracy"
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
use indicatif::ProgressStyle;
|
||||
use std::sync::LazyLock;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_indicatif::{
|
||||
filter::IndicatifFilter, util::FilteredFormatFields, writer, IndicatifLayer, IndicatifWriter,
|
||||
|
|
@ -11,13 +10,9 @@ use tracing_subscriber::{
|
|||
EnvFilter, Layer, Registry,
|
||||
};
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
use opentelemetry::{trace::Tracer, KeyValue};
|
||||
#[cfg(feature = "otlp")]
|
||||
use opentelemetry_sdk::{
|
||||
propagation::TraceContextPropagator,
|
||||
resource::{ResourceDetector, SdkProvidedResourceDetector},
|
||||
Resource,
|
||||
propagation::TraceContextPropagator, resource::SdkProvidedResourceDetector, Resource,
|
||||
};
|
||||
#[cfg(feature = "tracy")]
|
||||
use tracing_tracy::TracyLayer;
|
||||
|
|
@ -48,22 +43,21 @@ pub enum Error {
|
|||
#[error(transparent)]
|
||||
Init(#[from] tracing_subscriber::util::TryInitError),
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
#[error(transparent)]
|
||||
MpscSend(#[from] mpsc::error::SendError<oneshot::Sender<()>>),
|
||||
|
||||
#[error(transparent)]
|
||||
OneshotRecv(#[from] oneshot::error::RecvError),
|
||||
OTEL(#[from] opentelemetry_sdk::error::OTelSdkError),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TracingHandle {
|
||||
#[cfg(feature = "otlp")]
|
||||
/// A channel that can be sent to whenever traces/metrics should be flushed.
|
||||
/// Once flushing is finished, the sent oneshot::Sender will get triggered.
|
||||
flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>>,
|
||||
|
||||
stdout_writer: IndicatifWriter<writer::Stdout>,
|
||||
stderr_writer: IndicatifWriter<writer::Stderr>,
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
meter_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
|
||||
}
|
||||
|
||||
impl TracingHandle {
|
||||
|
|
@ -91,52 +85,31 @@ impl TracingHandle {
|
|||
/// It will wait until the flush is complete.
|
||||
pub async fn flush(&self) -> Result<(), Error> {
|
||||
#[cfg(feature = "otlp")]
|
||||
if let Some(flush_tx) = &self.flush_tx {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
// Request the flush.
|
||||
flush_tx.send(tx).await?;
|
||||
|
||||
// Wait for it to be done.
|
||||
rx.await?;
|
||||
{
|
||||
if let Some(tracer_provider) = &self.tracer_provider {
|
||||
tracer_provider.force_flush()?;
|
||||
}
|
||||
if let Some(meter_provider) = &self.meter_provider {
|
||||
meter_provider.force_flush()?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This will flush all all attached tracing providers and will wait until the flush is completed.
|
||||
/// This will flush all attached tracing providers and will wait until the flush is completed, then call shutdown.
|
||||
/// If no tracing providers like otlp are attached then this will be a noop.
|
||||
///
|
||||
/// This should only be called on a regular shutdown.
|
||||
/// If you correctly need to shutdown tracing on ctrl_c use [force_shutdown](#method.force_shutdown)
|
||||
/// otherwise you will get otlp errors.
|
||||
pub async fn shutdown(&self) -> Result<(), Error> {
|
||||
self.flush().await
|
||||
}
|
||||
|
||||
/// This will flush all all attached tracing providers and will wait until the flush is completed.
|
||||
/// After this it will do some other necessary cleanup.
|
||||
/// If no tracing providers like otlp are attached then this will be a noop.
|
||||
///
|
||||
/// This should only be used if the tool received an ctrl_c otherwise you will get otlp errors.
|
||||
/// If you need to shutdown tracing on a regular exit, you should use the [shutdown](#method.shutdown)
|
||||
/// method.
|
||||
pub async fn force_shutdown(&self) -> Result<(), Error> {
|
||||
self.flush().await?;
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
{
|
||||
// Because of a bug within otlp we currently have to use spawn_blocking otherwise
|
||||
// calling `shutdown_tracer_provider` can block forever. See
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
|
||||
//
|
||||
// This still throws an error, if the tool exits regularly: "OpenTelemetry trace error
|
||||
// occurred. oneshot canceled", but not having this leads to errors if we cancel with
|
||||
// ctrl_c.
|
||||
// So this should right now only be used on ctrl_c, for a regular exit use the
|
||||
// [shutdown](#shutdown) method
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
opentelemetry::global::shutdown_tracer_provider();
|
||||
})
|
||||
.await;
|
||||
if let Some(tracer_provider) = &self.tracer_provider {
|
||||
tracer_provider.shutdown()?;
|
||||
}
|
||||
if let Some(meter_provider) = &self.meter_provider {
|
||||
meter_provider.shutdown()?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -216,26 +189,34 @@ impl TracingBuilder {
|
|||
let layered = layered.and_then(TracyLayer::default());
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
let mut flush_tx: Option<mpsc::Sender<oneshot::Sender<()>>> = None;
|
||||
let mut g_tracer_provider = None;
|
||||
#[cfg(feature = "otlp")]
|
||||
let mut g_meter_provider = None;
|
||||
|
||||
// Setup otlp if a service_name is configured
|
||||
#[cfg(feature = "otlp")]
|
||||
let layered = layered.and_then({
|
||||
if let Some(service_name) = self.service_name {
|
||||
if let Some(service_name) = self.service_name.map(String::from) {
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
|
||||
// register a text map propagator for trace propagation
|
||||
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
|
||||
|
||||
let (tracer, meter_provider, sender) =
|
||||
gen_otlp_tracer_meter_provider(service_name.to_string());
|
||||
let tracer_provider = gen_tracer_provider(service_name.clone())
|
||||
.expect("Unable to configure trace provider");
|
||||
|
||||
flush_tx = Some(sender);
|
||||
let meter_provider =
|
||||
gen_meter_provider(service_name).expect("Unable to configure meter provider");
|
||||
|
||||
// Register the returned meter provider as the global one.
|
||||
// FUTUREWORK: store in the struct and provide getter instead?
|
||||
opentelemetry::global::set_meter_provider(meter_provider);
|
||||
// FUTUREWORK: store in the struct and provide getter too?
|
||||
opentelemetry::global::set_meter_provider(meter_provider.clone());
|
||||
|
||||
g_tracer_provider = Some(tracer_provider.clone());
|
||||
g_meter_provider = Some(meter_provider.clone());
|
||||
|
||||
// Create a tracing layer with the configured tracer
|
||||
Some(tracing_opentelemetry::layer().with_tracer(tracer))
|
||||
Some(tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("tvix")))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
|
@ -256,10 +237,13 @@ impl TracingBuilder {
|
|||
.try_init()?;
|
||||
|
||||
Ok(TracingHandle {
|
||||
#[cfg(feature = "otlp")]
|
||||
flush_tx,
|
||||
stdout_writer,
|
||||
stderr_writer,
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
meter_provider: g_meter_provider,
|
||||
#[cfg(feature = "otlp")]
|
||||
tracer_provider: g_tracer_provider,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -269,18 +253,11 @@ fn gen_resources(service_name: String) -> Resource {
|
|||
// use SdkProvidedResourceDetector.detect to detect resources,
|
||||
// but replace the default service name with our default.
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/issues/1298
|
||||
|
||||
let resources = SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
|
||||
// SdkProvidedResourceDetector currently always sets
|
||||
// `service.name`, but we don't like its default.
|
||||
if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
|
||||
resources.merge(&Resource::new([KeyValue::new(
|
||||
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
|
||||
service_name,
|
||||
)]))
|
||||
} else {
|
||||
resources
|
||||
}
|
||||
//
|
||||
Resource::builder()
|
||||
.with_service_name(service_name)
|
||||
.with_detector(Box::new(SdkProvidedResourceDetector))
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Returns an OTLP tracer, and the TX part of a channel, which can be used
|
||||
|
|
@ -288,17 +265,18 @@ fn gen_resources(service_name: String) -> Resource {
|
|||
#[cfg(feature = "otlp")]
|
||||
fn gen_tracer_provider(
|
||||
service_name: String,
|
||||
) -> Result<opentelemetry_sdk::trace::TracerProvider, opentelemetry::trace::TraceError> {
|
||||
use opentelemetry_otlp::SpanExporter;
|
||||
use opentelemetry_sdk::{runtime, trace::TracerProvider};
|
||||
) -> Result<opentelemetry_sdk::trace::SdkTracerProvider, opentelemetry::trace::TraceError> {
|
||||
use opentelemetry_otlp::{ExportConfig, SpanExporter, WithExportConfig};
|
||||
|
||||
let exporter = SpanExporter::builder().with_tonic().build()?;
|
||||
let exporter = SpanExporter::builder()
|
||||
.with_tonic()
|
||||
.with_export_config(ExportConfig::default())
|
||||
.build()?;
|
||||
|
||||
let tracer_provider = TracerProvider::builder()
|
||||
.with_batch_exporter(exporter, runtime::Tokio)
|
||||
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
|
||||
.with_batch_exporter(exporter)
|
||||
.with_resource(gen_resources(service_name))
|
||||
.build();
|
||||
|
||||
// Unclear how to configure this
|
||||
// let batch_config = BatchConfigBuilder::default()
|
||||
// // the default values for `max_export_batch_size` is set to 512, which we will fill
|
||||
|
|
@ -321,6 +299,14 @@ fn gen_tracer_provider(
|
|||
Ok(tracer_provider)
|
||||
}
|
||||
|
||||
// Metric export interval should be less than or equal to 15s
|
||||
// if the metrics may be converted to Prometheus metrics.
|
||||
// Prometheus' query engine and compatible implementations
|
||||
// require ~4 data points / interval for range queries,
|
||||
// so queries ranging over 1m requre <= 15s scrape intervals.
|
||||
// OTEL SDKS also respect the env var `OTEL_METRIC_EXPORT_INTERVAL` (no underscore prefix).
|
||||
const _OTEL_METRIC_EXPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[cfg(feature = "otlp")]
|
||||
fn gen_meter_provider(
|
||||
service_name: String,
|
||||
|
|
@ -328,76 +314,18 @@ fn gen_meter_provider(
|
|||
use std::time::Duration;
|
||||
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_sdk::{
|
||||
metrics::{PeriodicReader, SdkMeterProvider},
|
||||
runtime,
|
||||
};
|
||||
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
|
||||
let exporter = opentelemetry_otlp::MetricExporter::builder()
|
||||
.with_tonic()
|
||||
.with_timeout(Duration::from_secs(10))
|
||||
.build()?;
|
||||
|
||||
let reader = PeriodicReader::builder(exporter)
|
||||
.with_interval(_OTEL_METRIC_EXPORT_INTERVAL)
|
||||
.build();
|
||||
|
||||
Ok(SdkMeterProvider::builder()
|
||||
.with_reader(
|
||||
PeriodicReader::builder(exporter, runtime::Tokio)
|
||||
.with_interval(Duration::from_secs(3))
|
||||
.with_timeout(Duration::from_secs(10))
|
||||
.build(),
|
||||
)
|
||||
.with_reader(reader)
|
||||
.with_resource(gen_resources(service_name))
|
||||
.build())
|
||||
}
|
||||
|
||||
/// Returns an OTLP tracer, and a meter provider, as well as the TX part
|
||||
/// of a channel, which can be used to request flushes (and signal back the
|
||||
/// completion of the flush).
|
||||
#[cfg(feature = "otlp")]
|
||||
fn gen_otlp_tracer_meter_provider(
|
||||
service_name: String,
|
||||
) -> (
|
||||
impl Tracer + tracing_opentelemetry::PreSampledTracer,
|
||||
opentelemetry_sdk::metrics::SdkMeterProvider,
|
||||
mpsc::Sender<oneshot::Sender<()>>,
|
||||
) {
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
let tracer_provider =
|
||||
gen_tracer_provider(service_name.clone()).expect("Unable to configure trace provider");
|
||||
let meter_provider =
|
||||
gen_meter_provider(service_name).expect("Unable to configure meter provider");
|
||||
|
||||
// tracer_provider needs to be kept around so we can request flushes later.
|
||||
let tracer = tracer_provider.tracer("tvix");
|
||||
|
||||
// Set up a channel for flushing trace providers later
|
||||
let (flush_tx, mut flush_rx) = mpsc::channel::<oneshot::Sender<()>>(16);
|
||||
|
||||
// Spawning a task that listens on rx for any message. Once we receive a message we
|
||||
// correctly call flush on the tracer_provider.
|
||||
tokio::spawn({
|
||||
let meter_provider = meter_provider.clone();
|
||||
|
||||
async move {
|
||||
while let Some(m) = flush_rx.recv().await {
|
||||
// Because of a bug within otlp we currently have to use spawn_blocking
|
||||
// otherwise will calling `force_flush` block forever, especially if the
|
||||
// tool was closed with ctrl_c. See
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/issues/1395#issuecomment-1953280335
|
||||
let _ = tokio::task::spawn_blocking({
|
||||
let tracer_provider = tracer_provider.clone();
|
||||
let meter_provider = meter_provider.clone();
|
||||
|
||||
move || {
|
||||
tracer_provider.force_flush();
|
||||
if let Err(e) = meter_provider.force_flush() {
|
||||
eprintln!("failed to flush meter provider: {}", e);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
let _ = m.send(());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(tracer, meter_provider, flush_tx)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue