Skip to content

Commit

Permalink
try to sketch out an ooni/data-inspired pipeline
Browse files Browse the repository at this point in the history
I'm doing this mainly to explore whether we could have more
robust webconnectivity v0.5 analysis code
  • Loading branch information
bassosimone committed Nov 23, 2023
1 parent 41fbd3f commit e5e4c37
Show file tree
Hide file tree
Showing 9 changed files with 756 additions and 0 deletions.
42 changes: 42 additions & 0 deletions internal/pipeline/canonical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pipeline

import (
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/optional"
)

// CanonicalMeasurement is the canonical measurement structure assumed
// by the probe-engine data processing pipeline.
type CanonicalMeasurement struct {
// Input contains the input we measured.
Input string `json:"input"`

// TestKeys contains the test-specific measurements.
TestKeys optional.Value[*CanonicalTestKeys] `json:"test_keys"`
}

// CanonicalTestKeys is the canonical container for observations
// generated by most OONI experiments. This is the data format ingested
// by this package for generating, e.g., [*WebEndpointObservations]
type CanonicalTestKeys struct {
// Control contains the OPTIONAL TH response.
Control optional.Value[*model.THResponse] `json:"control"`

// NetworkEvents contains I/O events.
NetworkEvents []*model.ArchivalNetworkEvent `json:"network_events"`

// Queries contains the DNS queries results.
Queries []*model.ArchivalDNSLookupResult `json:"queries"`

// Requests contains HTTP request results.
Requests []*model.ArchivalHTTPRequestResult `json:"requests"`

// TCPConnect contains the TCP connect results.
TCPConnect []*model.ArchivalTCPConnectResult `json:"tcp_connect"`

// TLSHandshakes contains the TLS handshakes results.
TLSHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"tls_handshakes"`

// QUICHandshakes contains the QUIC handshakes results.
QUICHandshakes []*model.ArchivalTLSOrQUICHandshakeResult `json:"quic_handshakes"`
}
185 changes: 185 additions & 0 deletions internal/pipeline/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package pipeline

import (
"errors"
"net/url"

"github.com/ooni/probe-cli/v3/internal/optional"
)

// DB is a database containing observations.
//
// This struct is not goroutine safe. The zero value is invalid. Use the
// [NewDB] to construct a valid instance.
type DB struct {
dnsByTxID map[int64]*DNSObservation
thDNSAddrs map[string]bool
thDNSFailure *string
thEpntByEpnt map[string]*EndpointObservationTH
thWeb optional.Value[*WebObservationTH]
urlHostname string
webByTxID map[int64]*WebEndpointObservation
webFinalRequest optional.Value[*WebEndpointObservation]
}

// NewObservationsDB constructs a new [*DB] instance.
func NewDB() *DB {
return &DB{
dnsByTxID: map[int64]*DNSObservation{},
thDNSAddrs: map[string]bool{},
thDNSFailure: nil,
thEpntByEpnt: map[string]*EndpointObservationTH{},
thWeb: optional.None[*WebObservationTH](),
urlHostname: "",
webByTxID: map[int64]*WebEndpointObservation{},
webFinalRequest: optional.None[*WebEndpointObservation](),
}
}

// Ingest ingests measurement results and populates the database.
func (db *DB) Ingest(m *CanonicalMeasurement) error {
// Extra the hostname from the input URL
URL, err := url.Parse(m.Input)
if err != nil {
return err
}
db.urlHostname = URL.Hostname()

// Obtain the test keys or stop early
tk := m.TestKeys.UnwrapOr(nil)
if tk == nil {
return nil
}

// Build knowledge about existing TCP endpoints
if err := db.addNetworkEventsTCPConnect(tk.NetworkEvents...); err != nil {
return err
}
if err := db.addTLSHandshakeEvents(tk.TLSHandshakes...); err != nil {
return err
}

// Build knowledge about QUIC endpoints
if err := db.addQUICHandshakeEvents(tk.QUICHandshakes...); err != nil {
return err
}

// Enrich dataset with HTTP round trips information
if err := db.addHTTPRoundTrips(tk.Requests...); err != nil {
return err
}

// Build knowledge about DNS lookups.
if err := db.addDNSLookups(tk.Queries...); err != nil {
return err
}

// Process a control response if available
if thResp := tk.Control.UnwrapOr(nil); thResp != nil {
// Add DNS results first
if err := db.thAddDNS(thResp); err != nil {
return err
}

// Then create TCP connect entries
if err := db.thAddTCPConnect(thResp); err != nil {
return err
}

// Then extend information using TLS
if err := db.thAddTLSHandshake(thResp); err != nil {
return err
}

// Finally, add information about HTTP
if err := db.thAddHTTPResponse(thResp); err != nil {
return err
}
}

// Cross reference data structures.
db.buildXrefsDNS()
db.buildXrefTH()
if err := db.maybeFindFinalRequest(); err != nil {
return err
}

return nil
}

func (db *DB) buildXrefsDNS() {
// map addresses to who resolved them
addrToGetaddrinfo := make(map[string][]*DNSObservation)
addrToUDP := make(map[string][]*DNSObservation)
addrToHTTPS := make(map[string][]*DNSObservation)
for _, dobs := range db.dnsByTxID {
switch dobs.Engine {
case "system", "getaddrinfo", "golang_net_resolver", "go":
for _, addr := range dobs.IPAddrs {
addrToGetaddrinfo[addr] = append(addrToGetaddrinfo[addr], dobs)
}

case "udp":
for _, addr := range dobs.IPAddrs {
addrToUDP[addr] = append(addrToUDP[addr], dobs)
}

case "doh":
for _, addr := range dobs.IPAddrs {
addrToHTTPS[addr] = append(addrToHTTPS[addr], dobs)
}
}
}

// create cross references inside the endpoints
for _, wobs := range db.webByTxID {
wobs.DNSLookupGetaddrinfoXref = addrToGetaddrinfo[wobs.IPAddress]
wobs.DNSLookupUDPXref = addrToUDP[wobs.IPAddress]
wobs.DNSLookupHTTPSXref = addrToHTTPS[wobs.IPAddress]
}
}

func (db *DB) buildXrefTH() {
for _, wobs := range db.webByTxID {
// create cross references with TH DNS lookups
_, ok := db.thDNSAddrs[wobs.IPAddress]
wobs.DNSLookupTHXref = ok

// create cross references with TH endpoints
if xref, ok := db.thEpntByEpnt[wobs.Endpoint]; ok {
wobs.THEndpointXref = optional.Some(xref)
}
}
}

var errMultipleFinalRequests = errors.New("analysis: multiple final requests")

func (db *DB) maybeFindFinalRequest() error {
// find all the possible final request candidates
var finals []*WebEndpointObservation
for _, wobs := range db.webByTxID {
switch code := wobs.HTTPResponseStatusCode.UnwrapOr(0); code {
case 0, 301, 302, 307, 308:
// this is a redirect or a nonexisting response in the case of zero

default:
// found candidate
finals = append(finals, wobs)
}
}

// Implementation note: the final request is a request that is not a redirect and
// we expect to see just one of them. This code is written assuming we will have
// more than a final request in the future and to fail in such a case.
switch {
case len(finals) > 1:
return errMultipleFinalRequests

case len(finals) == 1:
db.webFinalRequest = optional.Some(finals[0])
return nil

default:
return nil
}
}
78 changes: 78 additions & 0 deletions internal/pipeline/dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package pipeline

import (
"github.com/ooni/probe-cli/v3/internal/model"
)

// DNSObservation is a DNS observation made by the probe.
//
// Optional values represent data that may not be there if we do not
// find the expected events. Non-optional data should always be there.
//
// This type is inspired by and adapted from https://github.com/ooni/data
// and adapts the WebObservation type to probe-engine.
type DNSObservation struct {
// TransactionID is the ID of the transaction.
TransactionID int64

// QueryType is the DNS query type (e.g., "A").
QueryType string

// Failure is the failure that occurred.
Failure *string

// Engine is the engined used by the probe to lookup.
Engine string

// ResolverAddress contains the resolver endpoint address.
ResolverAddress string

// IPAddrs contains the resolved IP addresses.
IPAddrs []string

// T0 is when we started performing the lookup.
T0 float64

// T is when the lookup completed.
T float64
}

func (db *DB) addDNSLookups(evs ...*model.ArchivalDNSLookupResult) error {
for _, ev := range evs {
dobs, err := db.newDNSObservation(ev.TransactionID)
if err != nil {
return err
}
dobs.QueryType = ev.QueryType
dobs.Failure = ev.Failure
dobs.Engine = ev.Engine
dobs.ResolverAddress = ev.ResolverAddress
dobs.T0 = ev.T0
dobs.T = ev.T

for _, ans := range ev.Answers {
switch ans.AnswerType {
case "A":
dobs.IPAddrs = append(dobs.IPAddrs, ans.IPv4)

case "AAAA":
dobs.IPAddrs = append(dobs.IPAddrs, ans.IPv6)

default:
// nothing
}
}
}
return nil
}

func (db *DB) newDNSObservation(txid int64) (*DNSObservation, error) {
if _, good := db.webByTxID[txid]; good {
return nil, errTransactionAlreadyExists
}
dobs := &DNSObservation{
TransactionID: txid,
}
db.dnsByTxID[txid] = dobs
return dobs, nil
}
12 changes: 12 additions & 0 deletions internal/pipeline/headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pipeline

// HeaderOrigin indicates the header origin
type HeaderOrigin int64

const (
// HeaderOriginProbe indicates that the header was seen by the probe
HeaderOriginProbe = HeaderOrigin(1 << iota)

// HeaderOriginTH indicates that the header was seen by the TH
HeaderOriginTH
)
39 changes: 39 additions & 0 deletions internal/pipeline/qa_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pipeline

import (
"fmt"
"path/filepath"

"github.com/ooni/probe-cli/v3/internal/must"
"github.com/ooni/probe-cli/v3/internal/optional"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)

type flatDB struct {
XdnsByTxID map[int64]*DNSObservation
XthEpntByEpnt map[string]*EndpointObservationTH
XthWeb optional.Value[*WebObservationTH]
XwebByTxID map[int64]*WebEndpointObservation
}

func Example() {
rawJSON := must.ReadFile(filepath.Join("testdata", "youtube.json"))
var meas CanonicalMeasurement
must.UnmarshalJSON(rawJSON, &meas)

db := NewDB()
runtimex.Try0(db.Ingest(&meas))

fdb := &flatDB{
XdnsByTxID: db.dnsByTxID,
XthEpntByEpnt: db.thEpntByEpnt,
XthWeb: db.thWeb,
XwebByTxID: db.webByTxID,
}
rawDB := must.MarshalJSON(fdb)
must.WriteFile(filepath.Join("testdata", "youtube_db.json"), rawDB, 0600)

fmt.Printf("true\n")
// Output:
// true
}
1 change: 1 addition & 0 deletions internal/pipeline/testdata/youtube.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/pipeline/testdata/youtube_db.json

Large diffs are not rendered by default.

Loading

0 comments on commit e5e4c37

Please sign in to comment.