Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
xBis7 committed Nov 12, 2024
1 parent e1f721d commit fdb92df
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ traces:
All the children spans will be created using context propagation.
This doesn't replace any of the existing spans, but it creates additional ones.
The default value is False.
version_added: 2.10.2
version_added: 2.10.3
type: string
example: ~
default: "False"
Expand Down
12 changes: 4 additions & 8 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,11 +872,7 @@ def is_effective_leaf(task):
return leaf_tis

@staticmethod
def _set_dagrun_span_attrs(span: Span, dag_run: DagRun):
# This is necessary to avoid an error in case of testing a paused dag.
if dag_run.queued_at is None and dag_run.start_date is not None:
dag_run.queued_at = dag_run.start_date

def _set_dagrun_span_attrs(span: Span, dag_run: DagRun, dagv: DagVersion):
if dag_run._state is DagRunState.FAILED:
span.set_attribute("airflow.dag_run.error", True)
attributes = {
Expand All @@ -897,7 +893,7 @@ def _set_dagrun_span_attrs(span: Span, dag_run: DagRun):
"airflow.dag_run.run_type": str(dag_run.run_type),
"airflow.dag_run.data_interval_start": str(dag_run.data_interval_start),
"airflow.dag_run.data_interval_end": str(dag_run.data_interval_end),
"airflow.dag_run.dag_hash": str(dag_run.dag_hash),
"airflow.dag_version.version": str(dagv.version if dagv else None),
"airflow.dag_run.conf": str(dag_run.conf),
}
if span.is_recording():
Expand Down Expand Up @@ -1101,7 +1097,7 @@ def recalculate(self) -> _UnfinishedStates:
self.state,
)

self._set_dagrun_span_attrs(span=active_span, dag_run=self)
self._set_dagrun_span_attrs(span=active_span, dag_run=self, dagv=dagv)
active_span.end()
# Remove the span from the dict.
self.active_spans.delete(self.run_id)
Expand All @@ -1114,7 +1110,7 @@ def recalculate(self) -> _UnfinishedStates:
)

with Trace.start_span_from_dagrun(dagrun=self) as span:
self._set_dagrun_span_attrs(span=span, dag_run=self)
self._set_dagrun_span_attrs(span=span, dag_run=self, dagv=dagv)

session.flush()

Expand Down
12 changes: 9 additions & 3 deletions airflow/traces/otel_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def __init__(
use_simple_processor: bool = False,
):
self.span_exporter = span_exporter
if use_simple_processor:
self.use_simple_processor = use_simple_processor
if self.use_simple_processor:
# With a BatchSpanProcessor, spans are exported at an interval.
# A task can run fast and finish before spans have enough time to get exported to the collector.
# When creating spans from inside a task, a SimpleSpanProcessor needs to be used because
Expand Down Expand Up @@ -105,9 +106,14 @@ def get_otel_tracer_provider(
debug = conf.getboolean("traces", "otel_debugging_on")
if debug is True:
log.info("[ConsoleSpanExporter] is being used")
tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
if self.use_simple_processor:
span_processor_for_tracer_prov = SimpleSpanProcessor(ConsoleSpanExporter())
else:
span_processor_for_tracer_prov = BatchSpanProcessor(ConsoleSpanExporter())
else:
tracer_provider.add_span_processor(self.span_processor)
span_processor_for_tracer_prov = self.span_processor

tracer_provider.add_span_processor(span_processor_for_tracer_prov)
return tracer_provider

def get_tracer(
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/otel/dags/otel_test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from opentelemetry import trace

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.traces import otel_tracer
from airflow.traces.otel_tracer import CTX_PROP_SUFFIX
from airflow.traces.tracer import Trace
Expand Down

0 comments on commit fdb92df

Please sign in to comment.