Skip to content

Commit

Permalink
InsertOpts option to reconcile metadata where multiple are encountered
Browse files Browse the repository at this point in the history
Here, attempt to resolve #165 by better defining how metadata that may
be inserted via `InsertOpts` on `Insert` will interact with metadata
that may be inserted via a job implementing `InsertOpts()`. Currently, a
job's metadata is always ignored.

The behavior to reconcile two metadata objects has some nuance, and I
don't think any one solution will fit everyone's desires. Here, I
propose that we let users defined their own preferred behavior by
implementing a new `InsertOptsMetadataReconcile` option:

    type InsertOpts struct {
        ...

        // MetadataReconcile defines how, in the presence of multiple InsertOpts
        // like one from the job itself and another one from a Client.Insert param,
        // how metadata will be reconciled between the two. For example, two
        // metadata can be merged together, or one can exclude the other.
        //
        // Defaults to MetadataReconcileExclude.
        MetadataReconcile MetadataReconcile

The possible reconciliation modes:

* `MetadataReconcileExclude`: The more specific metadata (from `Insert`
  params) completely excludes the less specific metadata (from a job'
  opts), and the latter is discarded.

* `MetadataReconcileMerge`: Carries out a shallow merge. In case of
  conflict, keys from the more specific metadata win out.

* `MetadataReconcileMergeDeep`: Carries out a deep merge, recursing into
  every object to see if we can use it as a `map[string]any`. In case of
  conflict, keys from the more specific metadata win out.

The default setting is `MetadataReconcileExclude`. This is partly a
safety feature because it is possible to create oddly formed metadata
that might have trouble merging, and partly a performance feature.
Exclude doesn't require trying to parse any of the metadata so that we
can try to merge it.

A note on typing: I tried to marshal/unmarshal maps as `map[any]any`
instead of `map[string]any` so we could support all kinds of wild key
types simultaneously, but `encoding/json` is pretty strict about working
with only one type of key at a time, and failed when I tried. We may
have to keep it as an invariant that if you want to use one of the merge
modes, your metadata must be parseable as `map[string]any`. This
shouldn't be a problem because a struct with `json` tags on it will
always produce a compatible object. I've tried to document this.

Fixes #165.
  • Loading branch information
brandur committed May 2, 2024
1 parent 7f87768 commit 6c62db6
Show file tree
Hide file tree
Showing 3 changed files with 491 additions and 10 deletions.
86 changes: 84 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,8 +1256,38 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive
return nil, nil, err
}

metadata := insertOpts.Metadata
if len(metadata) == 0 {
// As with other insertion options, a params metadata from an insert's
// InsertOpts will generally take precedence over one coming from a job's
// InsertOpts, but metdata may be merged if MetadataReconcile indicates
// that they should be.
var metadata []byte
switch {
case len(insertOpts.Metadata) > 0 && len(jobInsertOpts.Metadata) > 0:
var deepMerge bool

switch insertOpts.MetadataReconcile {
case MetadataReconcileExclude:
metadata = insertOpts.Metadata

case MetadataReconcileMergeDeep:
deepMerge = true
fallthrough

case MetadataReconcileMerge:
var err error
metadata, err = mergeMetadata(insertOpts.Metadata, jobInsertOpts.Metadata, deepMerge)
if err != nil {
return nil, nil, err
}
}

case len(insertOpts.Metadata) > 0:
metadata = insertOpts.Metadata

case len(jobInsertOpts.Metadata) > 0:
metadata = jobInsertOpts.Metadata

default:
metadata = []byte("{}")
}

Expand Down Expand Up @@ -1796,3 +1826,55 @@ func defaultClientIDWithHost(startedAt time.Time, host string) string {

return host + "_" + startedAt.Format(rfc3339Compact)
}

// Merge two metadata objects (expected to be in JSON) and return the result.
//
// Where duplicate keys are present, values from primary metadata supersede
// those of the secondary. If deepMerge is true, nested hashes will be further
// merged.
func mergeMetadata(primaryMetadata, secondaryMetadata []byte, deepMerge bool) ([]byte, error) {
var primaryMetadataMap map[string]any
if err := json.Unmarshal(primaryMetadata, &primaryMetadataMap); err != nil {
return nil, fmt.Errorf("error unmarshing primary metadata: %w", err)
}

var secondaryMetadataMap map[string]any
if err := json.Unmarshal(secondaryMetadata, &secondaryMetadataMap); err != nil {
return nil, fmt.Errorf("error unmarshing secondary metadata: %w", err)
}

mergeMetadataMaps(primaryMetadataMap, secondaryMetadataMap, deepMerge)

mergedMetadata, err := json.Marshal(primaryMetadataMap)
if err != nil {
return nil, fmt.Errorf("error marshaling merged metadata: %w", err)
}

return mergedMetadata, nil
}

// Merges two unmarshaled metadata maps. The merge is destructive as the primary
// map is mutated to contain values from the secondary map (if applicable).
// Returns the primary map for convenience.
//
// Where duplicate keys are present, values from primary metadata supersede
// those of the secondary. If deepMerge is true, nested hashes will be further
// merged.
func mergeMetadataMaps(primaryMetadataMap, secondaryMetadataMap map[string]any, deepMerge bool) map[string]any {
for key, secondaryVal := range secondaryMetadataMap {
primaryVal, primaryValOK := primaryMetadataMap[key]
if primaryValOK && deepMerge {
var (
primaryValMap, primaryMapOK = primaryVal.(map[string]any)
secondaryValMap, secondaryMapOK = secondaryVal.(map[string]any)
)

if primaryMapOK && secondaryMapOK {
mergeMetadataMaps(primaryValMap, secondaryValMap, deepMerge)
}
} else if !primaryValOK {
primaryMetadataMap[key] = secondaryVal
}
}
return primaryMetadataMap
}
Loading

0 comments on commit 6c62db6

Please sign in to comment.