Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone committed Oct 25, 2023
1 parent 6f46404 commit 1ea176e
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 21 deletions.
11 changes: 8 additions & 3 deletions internal/dslx/fxcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,29 @@ func NewStats[T any]() *Stats[T] {
}

// Observer returns a Func that observes the results of the previous pipeline stage. This function
// converts any error that it sees to [ErrSkip]. This function does not account for [ErrSkip].
// converts any error that it sees to [ErrSkip]. This function does not account for [ErrSkip], meaning
// that you will never see [ErrSkip] in the stats returned by [Stats.Export].
func (s *Stats[T]) Observer() Func[T, T] {
return FuncAdapter[T, T](func(ctx context.Context, minput *Maybe[T]) *Maybe[T] {
defer s.mu.Unlock()
s.mu.Lock()
var r string
if err := minput.Error; err != nil {
if errors.Is(err, ErrSkip) {
return NewMaybeWithError[T](ErrSkip) // as documented
}
r = err.Error()
}
s.m[r]++
if r != "" {
return NewMaybeWithError[T](ErrSkip)
return NewMaybeWithError[T](ErrSkip) // as documented
}
return minput
})
}

// Export exports the current stats without clearing the internally used map.
// Export exports the current stats without clearing the internally used map such that
// statistics accumulate over time and never reset for the [*Stats] lifecycle.
func (s *Stats[T]) Export() (out map[string]int64) {
out = make(map[string]int64)
defer s.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions internal/dslx/fxstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func Collect[T any](c <-chan T) (v []T) {
// StreamList creates a channel out of static values. This function will
// close the channel when it has streamed all the available elements.
func StreamList[T any](ts ...T) <-chan T {
c := make(chan T, len(ts))
defer close(c) // as documented
c := make(chan T, len(ts)) // buffer so writing does not block
defer close(c) // as documented
for _, t := range ts {
c <- t
}
Expand Down
47 changes: 33 additions & 14 deletions internal/dslx/qa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/apex/log"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/gopacket/layers"
"github.com/ooni/netem"
"github.com/ooni/probe-cli/v3/internal/dslx"
"github.com/ooni/probe-cli/v3/internal/model"
Expand Down Expand Up @@ -179,15 +180,16 @@ func TestMeasureResolvedAddressesQA(t *testing.T) {
ServerIPAddress: "8.8.8.8",
ServerPort: 443,
})
dpi.AddRule(&netem.DPICloseConnectionForServerEndpoint{
Logger: log.Log,
ServerIPAddress: "8.8.4.4",
ServerPort: 443,
})
},
expectTCP: map[string]int64{
"": 1,
"connection_refused": 1,
},
expectTLS: map[string]int64{
"": 1,
"dslx: error already processed by a previous stage": 1,
"connection_refused": 2,
},
expectTLS: map[string]int64{},
expectQUIC: map[string]int64{"": 2},
}, {
name: "TLS handshake reset with minimal runtime",
Expand All @@ -205,12 +207,34 @@ func TestMeasureResolvedAddressesQA(t *testing.T) {
"connection_reset": 2,
},
expectQUIC: map[string]int64{"": 2},
}, {
name: "QUIC handshake timeout with minimal runtime",
newRuntime: func(netx model.MeasuringNetwork) dslx.Runtime {
return dslx.NewMinimalRuntime(log.Log, time.Now(), dslx.MinimalRuntimeOptionMeasuringNetwork(netx))
},
configureDPI: func(dpi *netem.DPIEngine) {
dpi.AddRule(&netem.DPIDropTrafficForServerEndpoint{
Logger: log.Log,
ServerIPAddress: "8.8.8.8",
ServerPort: 443,
ServerProtocol: layers.IPProtocolUDP,
})
dpi.AddRule(&netem.DPIDropTrafficForServerEndpoint{
Logger: log.Log,
ServerIPAddress: "8.8.4.4",
ServerPort: 443,
ServerProtocol: layers.IPProtocolUDP,
})
},
expectTCP: map[string]int64{"": 2},
expectTLS: map[string]int64{"": 2},
expectQUIC: map[string]int64{
"generic_timeout_error": 2,
},
}}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// define the scenario characteristics with multiple IP addresses per host

// create an internet testing scenario
env := netemx.MustNewScenario(netemx.InternetScenario)
defer env.Close()
Expand Down Expand Up @@ -247,12 +271,7 @@ func TestMeasureResolvedAddressesQA(t *testing.T) {
// measure 443/udp
dslx.Compose5(
dslx.MakeEndpoint("udp", 443),
dslx.QUICHandshake(
rt,
// TODO(???): understand why certificate verification always
// fails when we're using netem along with HTTP/3
dslx.TLSHandshakeOptionInsecureSkipVerify(true),
),
dslx.QUICHandshake(rt),
quicHandshakeStats.Observer(),
dslx.HTTPRequestOverQUIC(rt),
dslx.Discard[*dslx.HTTPResponse](),
Expand Down
3 changes: 1 addition & 2 deletions internal/dslx/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/ooni/probe-cli/v3/internal/logx"
"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/quic-go/quic-go"
)

Expand All @@ -35,7 +34,7 @@ func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *Q
)

// setup
udpListener := netxlite.NewUDPListener()
udpListener := trace.NewUDPListener()
quicDialer := trace.NewQUICDialerWithoutResolver(udpListener, rt.Logger())
const timeout = 10 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
Expand Down
5 changes: 5 additions & 0 deletions internal/dslx/runtimeminimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (tx *minimalTrace) NewTLSHandshakerStdlib(dl model.DebugLogger) model.TLSHa
return tx.netx.NewTLSHandshakerStdlib(dl)
}

// NewUDPListener implements Trace
func (tx *minimalTrace) NewUDPListener() model.UDPListener {
return tx.netx.NewUDPListener()
}

// QUICHandshakes implements Trace.
func (tx *minimalTrace) QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult) {
return []*model.ArchivalTLSOrQUICHandshakeResult{}
Expand Down
3 changes: 3 additions & 0 deletions internal/dslx/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type Trace interface {
// NewStdlibResolver returns a possibly-trace-ware system resolver.
NewStdlibResolver(logger model.DebugLogger) model.Resolver

// NewUDPListener implements model.Measuring Network.
NewUDPListener() model.UDPListener

// QUICHandshakes collects all the QUIC handshake results collected so far.
QUICHandshakes() (out []*model.ArchivalTLSOrQUICHandshakeResult)

Expand Down
1 change: 1 addition & 0 deletions internal/measurexlite/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package measurexlite

import "github.com/ooni/probe-cli/v3/internal/model"

// NewUDPListener implements model.Measuring Network.
func (tx *Trace) NewUDPListener() model.UDPListener {
return tx.Netx.NewUDPListener()
}

0 comments on commit 1ea176e

Please sign in to comment.