Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hologres-connector-flink-1.13 是不是有点问题,连接开源flink,可以使holo表插入数据,那边holo也可以查询新增数据,但是从holo那边查询就是一直报错: #5

Open
hecheng64 opened this issue Jan 17, 2022 · 1 comment

Comments

@hecheng64
Copy link

Flink SQL> create TEMPORARY table holo_source(

id INTEGER,
item INTEGER
) with (
'connector'='hologres',
'dbname'='dps_test',
'tablename'='flink_test',
'username'='',
'password'='
',
'endpoint'='hgpostcn-cn-2r42go4bp00c-cn-hangzhou.hologres.aliyuncs.com:80'
);
[INFO] Execute statement succeed.

Flink SQL> select id,item from holo_source;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> BATCH_PHYSICAL]
There is 1 empty subset: rel#133:RelSubset#2.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows
121:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, holo_source]], fields=[id, item])

Root: rel#131:RelSubset#3.BATCH_PHYSICAL.any.[]
Original rel:
FlinkLogicalSink(subset=[rel#119:RelSubset#1.LOGICAL.any.[]], table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[id, item]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 123
FlinkLogicalTableSourceScan(subset=[rel#122:RelSubset#0.LOGICAL.any.[]], table=[[default_catalog, default_database, holo_source]], fields=[id, item]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}, id = 121

Sets:
Set#2, type: RecordType(INTEGER id, INTEGER item)
rel#128:RelSubset#2.LOGICAL.any.[], best=rel#121
rel#121:FlinkLogicalTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, holo_source],fields=id, item), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}
rel#133:RelSubset#2.BATCH_PHYSICAL.any.[], best=null
Set#3, type: RecordType(INTEGER id, INTEGER item)
rel#130:RelSubset#3.LOGICAL.any.[], best=rel#129
rel#129:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#128,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item), rowcount=1.0E8, cumulative cost={2.0E8 rows, 2.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}
rel#131:RelSubset#3.BATCH_PHYSICAL.any.[], best=null
rel#132:AbstractConverter.BATCH_PHYSICAL.any., rowcount=1.0E8, cumulative cost={inf}
rel#134:BatchPhysicalSink.BATCH_PHYSICAL.any.[](input=RelSubset#133,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item), rowcount=1.0E8, cumulative cost={inf}

Graphviz:
digraph G {
root [style=filled,label="Root"];
subgraph cluster2{
label="Set 2 RecordType(INTEGER id, INTEGER item)";
rel121 [label="rel#121:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, holo_source],fields=id, item\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
subset128 [label="rel#128:RelSubset#2.LOGICAL.any.[]"]
subset133 [label="rel#133:RelSubset#2.BATCH_PHYSICAL.any.[]",color=red]
}
subgraph cluster3{
label="Set 3 RecordType(INTEGER id, INTEGER item)";
rel129 [label="rel#129:FlinkLogicalSink\ninput=RelSubset#128,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item\nrows=1.0E8, cost={2.0E8 rows, 2.0E8 cpu, 8.0E8 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
rel132 [label="rel#132:AbstractConverter\ninput=RelSubset#130,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8, cost={inf}",shape=box]
rel134 [label="rel#134:BatchPhysicalSink\ninput=RelSubset#133,table=default_catalog.default_database.Unregistered_Collect_Sink_1,fields=id, item\nrows=1.0E8, cost={inf}",shape=box]
subset130 [label="rel#130:RelSubset#3.LOGICAL.any.[]"]
subset131 [label="rel#131:RelSubset#3.BATCH_PHYSICAL.any.[]"]
}
root -> subset131;
subset128 -> rel121[color=blue];
subset130 -> rel129[color=blue]; rel129 -> subset128[color=blue];
subset131 -> rel132; rel132 -> subset130;
subset131 -> rel134; rel134 -> subset133;
}

@huafengw
Copy link
Collaborator

@hecheng64 现在hologres-connector还不支持作为Flink的Source

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants