Skip to content

Commit

Permalink
feat: support batch points write method (openGemini#26)
Browse files Browse the repository at this point in the history
Signed-off-by: PennyYoon <525296438@qq.com>
  • Loading branch information
Chenxulin97 authored Dec 5, 2023
1 parent 24ab006 commit ab1de2e
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
2 changes: 2 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Client interface {
Ping(idx int) error
Query(query Query) (*QueryResult, error)

// WriteBatchPoints batch points to assigned database
WriteBatchPoints(database string, bp *BatchPoints) error
// CreateDatabase Create database
CreateDatabase(database string) error
// CreateDatabaseWithRp Create database with retention policy
Expand Down
1 change: 1 addition & 0 deletions opengemini/url_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
UrlPing = "/ping"
UrlQuery = "/query"
UrlStatus = "/status"
UrlWrite = "/write"
)

var noAuthRequired = map[string]map[string]struct{}{
Expand Down
58 changes: 58 additions & 0 deletions opengemini/write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package opengemini

import (
"bytes"
"compress/gzip"
"errors"
"io"
"net/http"
"net/url"
)

func (c *client) WriteBatchPoints(database string, bp *BatchPoints) error {
var buffer bytes.Buffer

var writer io.Writer

if c.config.GzipEnabled {
writer = gzip.NewWriter(&buffer)
} else {
writer = &buffer
}
for _, p := range bp.Points {
if p == nil {
continue
}
if _, err := io.WriteString(writer, p.String()); err != nil {
return err
}
if _, err := writer.Write([]byte{'\n'}); err != nil {
return err
}
}
if closer, ok := writer.(io.Closer); ok {
if err := closer.Close(); err != nil {
return err
}
}

req := requestDetails{
queryValues: make(url.Values),
body: &buffer,
}
req.queryValues.Add("db", database)
resp, err := c.executeHttpPost(UrlWrite, req)
if err != nil {
return err
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
reason, err := io.ReadAll(resp.Body)
if err != nil {
return errors.New("write failed and couldn't get the error for " + err.Error())
}
return errors.New("write failed for " + string(reason))
}
return nil
}
62 changes: 62 additions & 0 deletions opengemini/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package opengemini

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestClient_Write(t *testing.T) {
c := testDefaultClient(t)

// create a test database with rand suffix
database := randomDatabaseName()
err := c.CreateDatabase(database)
assert.Nil(t, err)

// delete test database before exit test case
defer func() {
err := c.DropDatabase(database)
assert.Nil(t, err)
}()

bp := &BatchPoints{}
testMeasurement := randomMeasurement()
// point1 will write success with four kinds variant type field
point1 := &Point{}
point1.SetMeasurement(testMeasurement)
point1.AddTag("Tag", "Test1")
point1.AddField("stringField", "test1")
point1.AddField("intField", 897870)
point1.AddField("doubleField", 834.5433)
point1.AddField("boolField", true)
bp.AddPoint(point1)

// point2 will parse fail for having no field
point2 := &Point{}
point2.SetMeasurement(testMeasurement)
point2.AddTag("Tag", "Test2")
bp.AddPoint(point2)

// point3 will write success with timestamp
point3 := &Point{}
point3.SetMeasurement(testMeasurement)
point3.AddTag("Tag", "Test3")
point3.AddField("stringField", "test3")
point3.AddField("boolField", false)
point3.Time = time.Now()
bp.AddPoint(point3)

err = c.WriteBatchPoints(database, bp)
assert.Nil(t, err)

// check whether write success
q := Query{
Database: database,
Command: "select * from " + testMeasurement,
}
time.Sleep(time.Second * 5)
result, err := c.Query(q)
assert.Nil(t, err)
assert.Equal(t, 2, len(result.Results[0].Series[0].Values))
}

0 comments on commit ab1de2e

Please sign in to comment.