diff --git a/client.go b/client.go index 1d2e24d5..f84ff1ae 100644 --- a/client.go +++ b/client.go @@ -1256,8 +1256,38 @@ func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*rive return nil, nil, err } - metadata := insertOpts.Metadata - if len(metadata) == 0 { + // As with other insertion options, a params metadata from an insert's + // InsertOpts will generally take precedence over one coming from a job's + // InsertOpts, but metdata may be merged if MetadataReconcile indicates + // that they should be. + var metadata []byte + switch { + case len(insertOpts.Metadata) > 0 && len(jobInsertOpts.Metadata) > 0: + var deepMerge bool + + switch insertOpts.MetadataReconcile { + case MetadataReconcileExclude: + metadata = insertOpts.Metadata + + case MetadataReconcileMergeDeep: + deepMerge = true + fallthrough + + case MetadataReconcileMerge: + var err error + metadata, err = mergeMetadata(insertOpts.Metadata, jobInsertOpts.Metadata, deepMerge) + if err != nil { + return nil, nil, err + } + } + + case len(insertOpts.Metadata) > 0: + metadata = insertOpts.Metadata + + case len(jobInsertOpts.Metadata) > 0: + metadata = jobInsertOpts.Metadata + + default: metadata = []byte("{}") } @@ -1796,3 +1826,55 @@ func defaultClientIDWithHost(startedAt time.Time, host string) string { return host + "_" + startedAt.Format(rfc3339Compact) } + +// Merge two metadata objects (expected to be in JSON) and return the result. +// +// Where duplicate keys are present, values from primary metadata supersede +// those of the secondary. If deepMerge is true, nested hashes will be further +// merged. +func mergeMetadata(primaryMetadata, secondaryMetadata []byte, deepMerge bool) ([]byte, error) { + var primaryMetadataMap map[string]any + if err := json.Unmarshal(primaryMetadata, &primaryMetadataMap); err != nil { + return nil, fmt.Errorf("error unmarshing primary metadata: %w", err) + } + + var secondaryMetadataMap map[string]any + if err := json.Unmarshal(secondaryMetadata, &secondaryMetadataMap); err != nil { + return nil, fmt.Errorf("error unmarshing secondary metadata: %w", err) + } + + mergeMetadataMaps(primaryMetadataMap, secondaryMetadataMap, deepMerge) + + mergedMetadata, err := json.Marshal(primaryMetadataMap) + if err != nil { + return nil, fmt.Errorf("error marshaling merged metadata: %w", err) + } + + return mergedMetadata, nil +} + +// Merges two unmarshaled metadata maps. The merge is destructive as the primary +// map is mutated to contain values from the secondary map (if applicable). +// Returns the primary map for convenience. +// +// Where duplicate keys are present, values from primary metadata supersede +// those of the secondary. If deepMerge is true, nested hashes will be further +// merged. +func mergeMetadataMaps(primaryMetadataMap, secondaryMetadataMap map[string]any, deepMerge bool) map[string]any { + for key, secondaryVal := range secondaryMetadataMap { + primaryVal, primaryValOK := primaryMetadataMap[key] + if primaryValOK && deepMerge { + var ( + primaryValMap, primaryMapOK = primaryVal.(map[string]any) + secondaryValMap, secondaryMapOK = secondaryVal.(map[string]any) + ) + + if primaryMapOK && secondaryMapOK { + mergeMetadataMaps(primaryValMap, secondaryValMap, deepMerge) + } + } else if !primaryValOK { + primaryMetadataMap[key] = secondaryVal + } + } + return primaryMetadataMap +} diff --git a/client_test.go b/client_test.go index b80fae19..e5c1d26e 100644 --- a/client_test.go +++ b/client_test.go @@ -4187,7 +4187,16 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { t.Run("WorkerInsertOptsOverrides", func(t *testing.T) { t.Parallel() - insertParams, _, err := insertParamsFromArgsAndOptions(&customInsertOptsJobArgs{}, nil) + args := &customInsertOptsJobArgs{ + insertOpts: InsertOpts{ + MaxAttempts: 42, + Priority: 2, + Queue: "other", + Tags: []string{"tag1", "tag2"}, + }, + } + + insertParams, _, err := insertParamsFromArgsAndOptions(args, nil) require.NoError(t, err) // All these come from overrides in customInsertOptsJobArgs's definition: require.Equal(t, 42, insertParams.MaxAttempts) @@ -4244,6 +4253,164 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) { require.EqualError(t, err, "JobUniqueOpts.ByPeriod should not be less than 1 second") require.Nil(t, insertParams) }) + + t.Run("MetadataReconcileExclude", func(t *testing.T) { + t.Parallel() + + args := &customInsertOptsJobArgs{ + insertOpts: InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey2": "subval2", + }, + "key_ignored": "ignored", + }), + }, + } + + insertParams, _, err := insertParamsFromArgsAndOptions(args, &InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + }, + "key2": "val2", + }), + }) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "key1": map[string]any{ // subhash does not merge + "subkey1": "subval1", + }, + "key2": "val2", + }, mustUnmarshal(t, insertParams.Metadata)) + }) + + t.Run("MetadataReconcileMerge", func(t *testing.T) { + t.Parallel() + + args := &customInsertOptsJobArgs{ + insertOpts: InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey2": "subval2", + }, + "key_merged": "merged", + }), + }, + } + + insertParams, _, err := insertParamsFromArgsAndOptions(args, &InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + }, + "key2": "val2", + }), + MetadataReconcile: MetadataReconcileMerge, + }) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "key1": map[string]any{ // subhash does not merge + "subkey1": "subval1", + }, + "key2": "val2", + "key_merged": "merged", + }, mustUnmarshal(t, insertParams.Metadata)) + }) + + t.Run("MetadataReconcileMergeDeep", func(t *testing.T) { + t.Parallel() + + args := &customInsertOptsJobArgs{ + insertOpts: InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey2": "subval2", + }, + "key_merged": "merged", + }), + }, + } + + insertParams, _, err := insertParamsFromArgsAndOptions(args, &InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + }, + "key2": "val2", + }), + MetadataReconcile: MetadataReconcileMergeDeep, + }) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "key1": map[string]any{ // subhash does not merge + "subkey1": "subval1", + "subkey2": "subval2", + }, + "key2": "val2", + "key_merged": "merged", + }, mustUnmarshal(t, insertParams.Metadata)) + }) + + t.Run("MetadataFromJobOnly", func(t *testing.T) { + t.Parallel() + + args := &customInsertOptsJobArgs{ + insertOpts: InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": "key2", + "key2": "val2", + }), + }, + } + + insertParams, _, err := insertParamsFromArgsAndOptions(args, &InsertOpts{ + Metadata: nil, + }) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "key1": "key2", + "key2": "val2", + }, mustUnmarshal(t, insertParams.Metadata)) + }) + + t.Run("MetadataFromParamsOnly", func(t *testing.T) { + t.Parallel() + + args := &customInsertOptsJobArgs{ + insertOpts: InsertOpts{ + Metadata: nil, + }, + } + + insertParams, _, err := insertParamsFromArgsAndOptions(args, &InsertOpts{ + Metadata: mustMarshal(t, map[string]any{ + "key1": "key2", + "key2": "val2", + }), + }) + require.NoError(t, err) + require.Equal(t, map[string]any{ + "key1": "key2", + "key2": "val2", + }, mustUnmarshal(t, insertParams.Metadata)) + }) + + t.Run("MetadataDefault", func(t *testing.T) { + t.Parallel() + + args := &customInsertOptsJobArgs{ + insertOpts: InsertOpts{ + Metadata: nil, + }, + } + + insertParams, _, err := insertParamsFromArgsAndOptions(args, &InsertOpts{ + Metadata: nil, + }) + require.NoError(t, err) + require.Equal(t, map[string]any{}, mustUnmarshal(t, insertParams.Metadata)) + }) } func TestID(t *testing.T) { @@ -4267,17 +4434,14 @@ func TestID(t *testing.T) { }) } -type customInsertOptsJobArgs struct{} +type customInsertOptsJobArgs struct { + insertOpts InsertOpts +} func (w *customInsertOptsJobArgs) Kind() string { return "customInsertOpts" } func (w *customInsertOptsJobArgs) InsertOpts() InsertOpts { - return InsertOpts{ - MaxAttempts: 42, - Priority: 2, - Queue: "other", - Tags: []string{"tag1", "tag2"}, - } + return w.insertOpts } func (w *customInsertOptsJobArgs) Work(context.Context, *Job[noOpArgs]) error { return nil } @@ -4511,3 +4675,198 @@ func TestDefaultClientIDWithHost(t *testing.T) { require.Equal(t, strings.Repeat("a", 60)+"_2024_03_07T04_39_12", defaultClientIDWithHost(startedAt, strings.Repeat("a", 60))) require.Equal(t, strings.Repeat("a", 60)+"_2024_03_07T04_39_12", defaultClientIDWithHost(startedAt, strings.Repeat("a", 61))) } + +func TestMergeMetadata(t *testing.T) { + t.Parallel() + + t.Run("Simple", func(t *testing.T) { + t.Parallel() + + mergedMetadata, err := mergeMetadata( + mustMarshal(t, map[string]any{ + "key1": "val1", + "key2": "val2", + }), mustMarshal(t, map[string]any{ + "key3": "val3", + "key4": "val4", + }), false, + ) + require.NoError(t, err) + + require.Equal(t, map[string]any{ + "key1": "val1", + "key2": "val2", + "key3": "val3", + "key4": "val4", + }, mustUnmarshal(t, mergedMetadata)) + }) + + t.Run("ShallowMerge", func(t *testing.T) { + t.Parallel() + + mergedMetadata, err := mergeMetadata( + mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + }, + }), mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey2": "subval2", + }, + }), false, + ) + require.NoError(t, err) + + require.Equal(t, map[string]any{ + "key1": map[string]any{ // subhash does not merge + "subkey1": "subval1", + }, + }, mustUnmarshal(t, mergedMetadata)) + }) + + t.Run("DeepMerge", func(t *testing.T) { + t.Parallel() + + mergedMetadata, err := mergeMetadata( + mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + "subkey2": map[string]any{ + "subsubkey1": "subsubval1", + }, + }, + "key2": "val2", + }), mustMarshal(t, map[string]any{ + "key1": map[string]any{ + "subkey2": map[string]any{ + "subsubkey2": "subsubval2", + }, + "subkey3": "subval3", + }, + }), true, + ) + require.NoError(t, err) + + require.Equal(t, map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + "subkey2": map[string]any{ + "subsubkey1": "subsubval1", + "subsubkey2": "subsubval2", + }, + "subkey3": "subval3", + }, + "key2": "val2", + }, mustUnmarshal(t, mergedMetadata)) + }) +} + +func TestMergeMetadataMaps(t *testing.T) { + t.Parallel() + + t.Run("NoConflictingKeys", func(t *testing.T) { + t.Parallel() + + require.Equal(t, map[string]any{ + "key1": "val1", + "key2": "val2", + "key3": "val3", + "key4": "val4", + }, + mergeMetadataMaps(map[string]any{ + "key1": "val1", + "key2": "val2", + }, map[string]any{ + "key3": "val3", + "key4": "val4", + }, false), + ) + }) + + t.Run("PrimaryValuesPreferred", func(t *testing.T) { + t.Parallel() + + require.Equal(t, map[string]any{ + "key1": "val1", + "key2": "val2", + }, + mergeMetadataMaps(map[string]any{ + "key1": "val1", + "key2": "val2", + }, map[string]any{ + "key1": "val2_1", + "key2": "val2_2", + }, false), + ) + }) + + t.Run("ShallowMerge", func(t *testing.T) { + t.Parallel() + + require.Equal(t, map[string]any{ + "key1": map[string]any{ // subhash does not merge + "subkey1": "subval1", + }, + }, + mergeMetadataMaps(map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + }, + }, map[string]any{ + "key1": map[string]any{ + "subkey2": "subval2", + }, + }, false), + ) + }) + + t.Run("DeepMerge", func(t *testing.T) { + t.Parallel() + + require.Equal(t, map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + "subkey2": map[string]any{ + "subsubkey1": "subsubval1", + "subsubkey2": "subsubval2", + }, + "subkey3": "subval3", + }, + "key2": "val2", + }, + mergeMetadataMaps(map[string]any{ + "key1": map[string]any{ + "subkey1": "subval1", + "subkey2": map[string]any{ + "subsubkey1": "subsubval1", + }, + }, + "key2": "val2", + }, map[string]any{ + "key1": map[string]any{ + "subkey2": map[string]any{ + "subsubkey2": "subsubval2", + }, + "subkey3": "subval3", + }, + }, true), + ) + }) +} + +func mustMarshal(t *testing.T, v any) []byte { + t.Helper() + + data, err := json.Marshal(v) + require.NoError(t, err) + return data +} + +func mustUnmarshal(t *testing.T, data []byte) map[string]any { + t.Helper() + + var dataMap map[string]any + err := json.Unmarshal(data, &dataMap) + require.NoError(t, err) + return dataMap +} diff --git a/insert_opts.go b/insert_opts.go index 4cdd5124..13c40e50 100644 --- a/insert_opts.go +++ b/insert_opts.go @@ -23,6 +23,14 @@ type InsertOpts struct { // field by River. Metadata []byte + // MetadataReconcile defines how, in the presence of multiple InsertOpts + // like one from the job itself and another one from a Client.Insert param, + // how metadata will be reconciled between the two. For example, two + // metadata can be merged together, or one can exclude the other. + // + // Defaults to MetadataReconcileExclude. + MetadataReconcile MetadataReconcile + // Pending indicates that the job should be inserted in the `pending` state. // Pending jobs are not immediately available to be worked and are never // deleted, but they can be used to indicate work which should be performed in @@ -67,6 +75,38 @@ type InsertOpts struct { UniqueOpts UniqueOpts } +// MetadataReconcile defines how, in the presence of multiple InsertOpts +// like one from the job itself and another one from a Client.Insert param, +// how metadata will bereconciled between the two. For example, two metadata +// can be merged together, or one can exclude the other. +type MetadataReconcile int + +const ( + // MetadataReconcileExclude prefers the dominant metadata as is. Secondary + // metadata is ignored/excluded. + // + // This most is faster than any other reconciliation mode because no + // metadata objects need to be parsed. + MetadataReconcileExclude MetadataReconcile = iota + + // MetadataReconcileMerge attempts to do a shallow merge on the dominant + // and secondary metadatas. If duplicate values are present, the one from + // the dominant metadata is preferred. + // + // Metadata must be unmarshable to a map with string keys for merging to be + // successful. + MetadataReconcileMerge + + // MetadataReconcileMergeDeep attempts to do a deep merge on the dominant + // and secondary metadatas. Each value in an object is checked to see if + // it's an object itself, and if so, merged recursively. If values aren't + // subobject, the one from the dominant metadata is preferred. + // + // Metadata must be unmarshable to a map with string keys for merging to be + // successful. + MetadataReconcileMergeDeep +) + // UniqueOpts contains parameters for uniqueness for a job. // // When the options struct is uninitialized (its zero value) no uniqueness at is