diff --git a/src/proxy/query.go b/src/proxy/query.go index 9fdb0fc2..c56ffdcb 100644 --- a/src/proxy/query.go +++ b/src/proxy/query.go @@ -19,6 +19,7 @@ import ( "github.com/xelabs/go-mysqlstack/sqldb" "github.com/xelabs/go-mysqlstack/sqlparser" + "github.com/xelabs/go-mysqlstack/sqlparser/depends/common" querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query" "github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes" ) @@ -233,8 +234,20 @@ func (spanner *Spanner) ComQuery(session *driver.Session, query string, bindVari spanner.auditLog(session, W, xbase.UPDATE, query, qr, status) return returnQuery(qr, callback, err) case *sqlparser.Select: + streamingFetch := false txSession := spanner.sessions.getTxnSession(session) if txSession.getStreamingFetchVar() { + streamingFetch = true + } else { + if len(node.Comments) > 0 { + comment := strings.Replace(common.BytesToString(node.Comments[0]), " ", "", -1) + if comment == "/*+streaming*/" { + streamingFetch = true + } + } + } + + if streamingFetch { if err = spanner.handleSelectStream(session, query, node, callback); err != nil { log.Error("proxy.select.for.backup:[%s].error:%+v", xbase.TruncateQuery(query, 256), err) return err diff --git a/src/proxy/query_test.go b/src/proxy/query_test.go index ac2c814d..3f36db9f 100644 --- a/src/proxy/query_test.go +++ b/src/proxy/query_test.go @@ -223,6 +223,14 @@ func TestProxyQuerys(t *testing.T) { _, err = client.FetchAll(query, -1) assert.Nil(t, err) } + { // select * from t1 as ...; + query := "select /*+ streaming */ * from test.t1 as aliaseTable" + qr, err := client.FetchAll(query, -1) + assert.Nil(t, err) + want := 60510 + got := int(qr.RowsAffected) + assert.Equal(t, want, got) + } { // select 1 from dual query := "select 1 from dual" qr, err := client.FetchAll(query, -1) @@ -315,6 +323,13 @@ func TestProxyQuerys(t *testing.T) { _, err = client.FetchAll(query, -1) assert.Nil(t, err) } + { + query := "select /*+ streaming */ a from test.dual" + _, err = client.FetchAll(query, -1) + want := "Table 'dual' doesn't exist (errno 1146) (sqlstate 42S02)" + got := err.Error() + assert.Equal(t, want, got) + } } }