-
Notifications
You must be signed in to change notification settings - Fork 4
/
rows.go
154 lines (136 loc) · 4.43 KB
/
rows.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// mysqlx - MySQL driver for Go's database/sql package and MySQL X Protocol.
// Copyright (c) 2017-2018 Alexey Palazhchenko
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package mysqlx
import (
"context"
"database/sql/driver"
"io"
"math"
"github.com/AlekSi/mysqlx/internal/proto/mysqlx_notice"
"github.com/AlekSi/mysqlx/internal/proto/mysqlx_resultset"
"github.com/AlekSi/mysqlx/internal/proto/mysqlx_sql"
)
// rows is an iterator over an executed query's results.
type rows struct {
c *conn
columns []mysqlx_resultset.ColumnMetaData
rows chan *mysqlx_resultset.Row
readErr error
}
// runReader reads rows and sends then to channel until all rows are read or error is encountered.
func (r *rows) runReader(ctx context.Context) {
defer close(r.rows)
for {
m, err := r.c.readMessage(ctx)
if err != nil {
r.readErr = err
return
}
switch m := m.(type) {
case *mysqlx_resultset.Row:
r.rows <- m
case *mysqlx_resultset.FetchDone:
continue
case *mysqlx_notice.Warning:
// TODO expose warnings?
continue
case *mysqlx_notice.SessionStateChanged:
switch m.GetParam() {
case mysqlx_notice.SessionStateChanged_ROWS_AFFECTED:
continue
default:
r.readErr = bugf("rows.runReader: unhandled session state change %v", m)
return
}
case *mysqlx_sql.StmtExecuteOk:
return
default:
r.readErr = bugf("rows.runReader: unhandled message %T", m)
return
}
}
}
// Columns returns the names of the columns.
// The number of columns of the result is inferred from the length of the slice.
// If a particular column name isn't known, an empty string should be returned for that entry.
func (r *rows) Columns() []string {
res := make([]string, len(r.columns))
for i, c := range r.columns {
res[i] = string(c.Name)
}
return res
}
// Close closes the rows iterator.
func (r *rows) Close() error {
// TODO limit a number of messages to drain there? and close connection? and return driver.ErrBadConn?
// drain messages until r.rows is closed in runReader
for range r.rows {
}
// FIXME should we return r.readErr instead of nil?
return nil
}
// Next is called to populate the next row of data into the provided slice.
// The provided slice will be the same size as the Columns() are wide.
// Next should return io.EOF when there are no more rows.
func (r *rows) Next(dest []driver.Value) error {
row, ok := <-r.rows
if !ok {
if r.readErr != nil {
return r.readErr
}
return io.EOF
}
// unmarshal all values, return first encountered error
var err error
for i, value := range row.Field {
d, e := unmarshalValue(value, &r.columns[i])
dest[i] = d
if err == nil {
err = e
}
}
return err
}
// RowsColumnTypeDatabaseTypeName returns the database system type name without the length.
// Type names should be uppercase. Returned types:
// SINT, UINT, DOUBLE, FLOAT, BYTES, TIME, DATETIME, SET, ENUM, BIT, DECIMAL.
func (r *rows) ColumnTypeDatabaseTypeName(index int) string {
return r.columns[index].Type.String()
}
// RowsColumnTypeLength return the length of the column type if the column is a variable length type.
// If the column is not a variable length type ok should return false.
// If length is not limited other than system limits, it should return math.MaxInt64.
// The following are examples of returned values for various types:
// TODO add examples
func (r *rows) ColumnTypeLength(index int) (length int64, ok bool) {
c := r.columns[index]
length = int64(c.GetLength())
if c.Type.String() == "BYTES" && length == math.MaxUint32 {
length = math.MaxInt64
}
ok = true
return
}
// RowsColumnTypeNullable returns true if it is known the column may be null,
// or false if the column is known to be not nullable.
// If the column nullability is unknown, ok should be false.
func (r *rows) ColumnTypeNullable(index int) (nullable, ok bool) {
nullable = (r.columns[index].GetFlags() & 0x0010) != 0
ok = true
return
}
// check interfaces
var (
_ driver.Rows = (*rows)(nil)
_ driver.RowsColumnTypeDatabaseTypeName = (*rows)(nil)
_ driver.RowsColumnTypeLength = (*rows)(nil)
_ driver.RowsColumnTypeNullable = (*rows)(nil)
// TODO
// _ driver.RowsColumnTypePrecisionScale = (*rows)(nil)
// _ driver.RowsColumnTypeScanType = (*rows)(nil)
// _ driver.RowsNextResultSet = (*rows)(nil)
)