forked from databricks/databricks-sql-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
doc.go
229 lines (159 loc) · 6.7 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
Package dbsql implements the go driver to Databricks SQL
# Usage
Clients should use the database/sql package in conjunction with the driver:
import (
"database/sql"
_ "github.com/databricks/databricks-sql-go"
)
func main() {
db, err := sql.Open("databricks", "token:<token>@<hostname>:<port>/<endpoint_path>")
if err != nil {
log.Fatal(err)
}
defer db.Close()
}
# Connection via DSN (Data Source Name)
Use sql.Open() to create a database handle via a data source name string:
db, err := sql.Open("databricks", "<dsn_string>")
The DSN format is:
token:[my_token]@[hostname]:[port]/[endpoint http path]?param=value
Supported optional connection parameters can be specified in param=value and include:
- catalog: Sets the initial catalog name in the session
- schema: Sets the initial schema name in the session
- maxRows: Sets up the max rows fetched per request. Default is 100000
- timeout: Adds timeout (in seconds) for the server query execution. Default is no timeout
- userAgentEntry: Used to identify partners. Set as a string with format <isv-name+product-name>
Supported optional session parameters can be specified in param=value and include:
- ansi_mode: (Boolean string). Session statements will adhere to rules defined by ANSI SQL specification.
- timezone: (e.g. "America/Los_Angeles"). Sets the timezone of the session
# Connection via new connector object
Use sql.OpenDB() to create a database handle via a new connector object created with dbsql.NewConnector():
import (
"database/sql"
dbsql "github.com/databricks/databricks-sql-go"
)
func main() {
connector, err := dbsql.NewConnector(
dbsql.WithServerHostname(<hostname>),
dbsql.WithPort(<port>),
dbsql.WithHTTPPath(<http_path>),
dbsql.WithAccessToken(<my_token>)
)
if err != nil {
log.Fatal(err)
}
db := sql.OpenDB(connector)
defer db.Close()
...
}
Supported functional options include:
- WithServerHostname(<hostname> string): Sets up the server hostname. Mandatory
- WithPort(<port> int): Sets up the server port. Mandatory
- WithAccessToken(<my_token> string): Sets up the Personal Access Token. Mandatory
- WithHTTPPath(<http_path> string): Sets up the endpoint to the warehouse. Mandatory
- WithInitialNamespace(<catalog> string, <schema> string): Sets up the catalog and schema name in the session. Optional
- WithMaxRows(<max_rows> int): Sets up the max rows fetched per request. Default is 100000. Optional
- WithSessionParams(<params_map> map[string]string): Sets up session parameters including "timezone" and "ansi_mode". Optional
- WithTimeout(<timeout> Duration). Adds timeout (in time.Duration) for the server query execution. Default is no timeout. Optional
- WithUserAgentEntry(<isv-name+product-name> string). Used to identify partners. Optional
# Query cancellation and timeout
Cancelling a query via context cancellation or timeout is supported.
// Set up context timeout
ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second)
defer cancel()
// Execute query. Query will be cancelled after 30 seconds if still running
res, err := db.ExecContext(ctx, "CREATE TABLE example(id int, message string)")
# CorrelationId and ConnId
Use the driverctx package under driverctx/ctx.go to add CorrelationId and ConnId to the context.
CorrelationId and ConnId makes it convenient to parse and create metrics in logging.
**Connection Id**
Internal id to track what happens under a connection. Connections can be reused so this would track across queries.
**Query Id**
Internal id to track what happens under a query. Useful because the same query can be used with multiple connections.
**Correlation Id**
External id, such as request ID, to track what happens under a request. Useful to track multiple connections in the same request.
ctx := dbsqlctx.NewContextWithCorrelationId(context.Background(), "workflow-example")
# Logging
Use the logger package under logger.go to set up logging (from zerolog).
By default, logging level is `warn`. If you want to disable logging, use `disabled`.
The user can also utilize Track() and Duration() to custom log the elapsed time of anything tracked.
import (
dbsqllog "github.com/databricks/databricks-sql-go/logger"
dbsqlctx "github.com/databricks/databricks-sql-go/driverctx"
)
func main() {
// Optional. Set the logging level with SetLogLevel()
if err := dbsqllog.SetLogLevel("debug"); err != nil {
log.Fatal(err)
}
// Optional. Set logging output with SetLogOutput()
// Default is os.Stderr. If running in terminal, logger will use ConsoleWriter to prettify logs
dbsqllog.SetLogOutput(os.Stdout)
// Optional. Set correlation id with NewContextWithCorrelationId
ctx := dbsqlctx.NewContextWithCorrelationId(context.Background(), "workflow-example")
// Optional. Track time spent and log elapsed time
msg, start := logger.Track("Run Main")
defer log.Duration(msg, start)
db, err := sql.Open("databricks", "<dsn_string>")
...
}
The result log may look like this:
{"level":"debug","connId":"01ed6545-5669-1ec7-8c7e-6d8a1ea0ab16","corrId":"workflow-example","queryId":"01ed6545-57cc-188a-bfc5-d9c0eaf8e189","time":1668558402,"message":"Run Main elapsed time: 1.298712292s"}
# Supported Data Types
==================================
Databricks Type --> Golang Type
==================================
BOOLEAN --> bool
TINYINT --> int8
SMALLINT --> int16
INT --> int32
BIGINT --> int64
FLOAT --> float32
DOUBLE --> float64
VOID --> nil
STRING --> string
DATE --> time.Time
TIMESTAMP --> time.Time
DECIMAL(p,s) --> sql.RawBytes
BINARY --> sql.RawBytes
ARRAY<elementType> --> sql.RawBytes
STRUCT --> sql.RawBytes
MAP<keyType, valueType> --> sql.RawBytes
INTERVAL (year-month) --> string
INTERVAL (day-time) --> string
For ARRAY, STRUCT, and MAP types, sql.Scan can cast sql.RawBytes to JSON string, which can be unmarshalled to Golang
arrays, maps, and structs. For example:
type structVal struct {
StringField string `json:"string_field"`
ArrayField []int `json:"array_field"`
}
type row struct {
arrayVal []int
mapVal map[string]int
structVal structVal
}
res := []row{}
for rows.Next() {
r := row{}
tempArray := []byte{}
tempStruct := []byte{}
tempMap := []byte{}
if err := rows.Scan(&tempArray, &tempMap, &tempStruct); err != nil {
log.Fatal(err)
}
if err := json.Unmarshal(tempArray, &r.arrayVal); err != nil {
log.Fatal(err)
}
if err := json.Unmarshal(tempMap, &r.mapVal); err != nil {
log.Fatal(err)
}
if err := json.Unmarshal(tempStruct, &r.structVal); err != nil {
log.Fatal(err)
}
res = append(res, r)
}
May generate the following row:
{arrayVal:[1,2,3] mapVal:{"key1":1} structVal:{"string_field":"string_val","array_field":[4,5,6]}}
*/
package dbsql