Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added GCS Producer Fix #111 #112

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/jrconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"mongoConfig": "./mongoDB/config.json",
"elasticConfig": "./elastic/config.json",
"s3Config": "./s3/config.json",
"gcsConfig": "./gcs/config.json",
"url": ""
}
}
31 changes: 26 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ module github.com/ugol/jr
go 1.20

require (
cloud.google.com/go/storage v1.35.1
github.com/actgardner/gogen-avro/v10 v10.2.1
github.com/aws/aws-sdk-go v1.45.20
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/elastic/go-elasticsearch/v8 v8.10.0
github.com/go-chi/chi/v5 v5.0.10
github.com/google/uuid v1.3.1
github.com/google/uuid v1.4.0
github.com/redis/go-redis/v9 v9.2.1
github.com/spf13/cobra v1.7.0
github.com/spf13/viper v1.16.0
Expand All @@ -18,10 +19,19 @@ require (
)

require (
cloud.google.com/go v0.110.8 // indirect
cloud.google.com/go/compute v1.23.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.3 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -31,8 +41,20 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sync v0.3.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.150.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)

require (
Expand All @@ -41,7 +63,6 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/heetch/avro v0.4.4 // indirect
github.com/iancoleman/orderedmap v0.3.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/invopop/jsonschema v0.11.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -54,7 +75,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/squeeze69/codicefiscale v1.0.4 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/sys v0.13.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
91 changes: 45 additions & 46 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cmd/emitterCreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func init() {
emitterCreateCmd.Flags().StringVar(&valueTemplate, "valueTemplate", constants.DEFAULT_VALUE_TEMPLATE, "template name to use for the value")
emitterCreateCmd.Flags().StringVar(&keyTemplate, "keyTemplate", constants.DEFAULT_KEY, "template to use for the key")
emitterCreateCmd.Flags().StringVar(&outputTemplate, "outputTemplate", constants.DEFAULT_OUTPUT_TEMPLATE, "Formatting of K,V on standard output")
emitterCreateCmd.Flags().StringVarP(&output, "output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, redis, mongo, elastic, s3")
emitterCreateCmd.Flags().StringVarP(&output, "output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, redis, mongo, elastic, s3, gcs")
emitterCreateCmd.Flags().StringVar(&topic, "topic", constants.DEFAULT_TOPIC, "Default topic to write to if using output='kafka'")
emitterCreateCmd.Flags().BoolVar(&kcat, "kcat", false, "If you want to pipe jr with kcat, use this flag: it is equivalent to --output stdout --outputTemplate '{{key}},{{value}}' --oneline")
emitterCreateCmd.Flags().BoolVarP(&oneline, "oneline", "l", false, "strips /n from output, for example to be pipelined to tools like kcat")
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sMongodb%s (--output = mongo)\n", Green, Reset)
fmt.Printf("%sElastic%s (--output = elastic)\n", Green, Reset)
fmt.Printf("%sS3%s (--output = s3)\n", Green, Reset)
fmt.Printf("%sGCS%s (--output = gcs)\n", Green, Reset)
fmt.Println()

},
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ jr template run --template "{{name}}"
configuration.GlobalCfg.ElasticConfig, _ = cmd.Flags().GetString(f.Name)
case "s3Config":
configuration.GlobalCfg.S3Config, _ = cmd.Flags().GetString(f.Name)
case "gcsConfig":
configuration.GlobalCfg.GCSConfig, _ = cmd.Flags().GetString(f.Name)
}
}
})
Expand Down Expand Up @@ -170,5 +172,6 @@ func init() {
templateRunCmd.Flags().String("mongoConfig", "", "MongoDB configuration")
templateRunCmd.Flags().String("elasticConfig", "", "Elastic Search configuration")
templateRunCmd.Flags().String("s3Config", "", "Amazon S3 configuration")
templateRunCmd.Flags().String("gcsConfig", "", "Google GCS configuration")

}
1 change: 1 addition & 0 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type GlobalConfiguration struct {
MongoConfig string
ElasticConfig string
S3Config string
GCSConfig string
Url string
EmbeddedTemplate bool
FileNameTemplate bool
Expand Down
13 changes: 13 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ugol/jr/pkg/producers/mongoDB"
"github.com/ugol/jr/pkg/producers/redis"
"github.com/ugol/jr/pkg/producers/s3"
"github.com/ugol/jr/pkg/producers/gcs"
"github.com/ugol/jr/pkg/producers/server"
"github.com/ugol/jr/pkg/tpl"
"log"
Expand Down Expand Up @@ -123,6 +124,11 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) {
return
}

if e.Output == "gcs" {
e.Producer = createGCSProducer(conf.GCSConfig)
return
}

if e.Output == "http" {
e.Producer = &server.JsonProducer{OutputTpl: &o}
return
Expand Down Expand Up @@ -171,6 +177,13 @@ func createS3Producer(s3Config string) Producer {
return sProducer
}

func createGCSProducer(gcsConfig string) Producer {
gProducer := &gcs.GCSProducer{}
gProducer.Initialize(gcsConfig)

return gProducer
}

func createKafkaProducer(conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager {

kManager := &kafka.KafkaManager{
Expand Down
3 changes: 3 additions & 0 deletions pkg/producers/gcs/config.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"bucket_name": "your-bucket-name"
}
74 changes: 74 additions & 0 deletions pkg/producers/gcs/gcsProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package gcs

import (
"context"
"encoding/json"
"io/ioutil"
"log"
"fmt"

"cloud.google.com/go/storage"
"github.com/google/uuid"
)

type Config struct {
Bucket string `json:"bucket_name"`
}

type GCSProducer struct {
client storage.Client
bucket string
}

func (p *GCSProducer) Initialize(configFile string) {
var config Config
file, err := ioutil.ReadFile(configFile)
err = json.Unmarshal(file, &config)
if err != nil {
log.Fatalf("Failed to parse configuration parameters: %s", err)
}

ctx := context.Background()
// Use Google Application Default Credentials to authorize and authenticate the client.
// More information about Application Default Credentials and how to enable is at
// https://developers.google.com/identity/protocols/application-default-credentials.
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}

p.client = *client
p.bucket = config.Bucket
}

func (p *GCSProducer) Produce(k []byte, v []byte, o any) {
ctx := context.Background()

bucket := p.bucket
var key string

if k == nil || len(k) == 0 {
// generate a UUID as index
id := uuid.New()
key = id.String() + "/.json"
} else {
key = string(k) + "/.json"
}

objectHandle := p.client.Bucket(bucket).Object(key)
writer := objectHandle.NewWriter(ctx)
kvPair := fmt.Sprintf("%s=%s\n", key, v)

_, err := writer.Write([]byte(kvPair))
if err != nil {
log.Fatalf("Failed to write to GCS: %v", err)
}

writer.Close()

}

func (p *GCSProducer) Close() error {
p.client.Close()
return nil
}