Skip to content

Commit

Permalink
[Improve][mysql-cdc] Support mysql 5.5 versions (apache#6710)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Apr 24, 2024
1 parent e310353 commit 058f559
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 92 deletions.
23 changes: 20 additions & 3 deletions docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL

## Supported DataSource Info

| Datasource | Supported versions | Driver | Url | Maven |
|------------|-------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
| MySQL | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x </li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
| Datasource | Supported versions | Driver | Url | Maven |
|------------|------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
| MySQL | <li> [MySQL](https://dev.mysql.com/doc): 5.5, 5.6, 5.7, 8.0.x </li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |

## Using Dependency

Expand Down Expand Up @@ -92,9 +92,11 @@ server-id = 223344
log_bin = mysql-bin
expire_logs_days = 10
binlog_format = row
# mysql 5.6+ requires binlog_row_image to be set to FULL
binlog_row_image = FULL
# enable gtid mode
# mysql 5.6+ requires gtid_mode to be set to ON
gtid_mode = on
enforce_gtid_consistency = on
```
Expand All @@ -107,6 +109,21 @@ enforce_gtid_consistency = on

4. Confirm your changes by checking the binlog status once more:

MySQL 5.5:

```sql
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| log_bin | ON |
+--------------------------+----------------+
5 rows in set (0.00 sec)
```

MySQL 5.6+:

```sql
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo
default:
break;
}
return MySqlTypeConverter.INSTANCE.convert(builder.build());
return MySqlTypeConverter.DEFAULT_INSTANCE.convert(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;

import com.google.common.base.Preconditions;
import com.mysql.cj.MysqlType;
Expand All @@ -39,6 +40,7 @@
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
Expand All @@ -56,9 +58,14 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
SYS_DATABASES.add("sys");
}

private MySqlVersion version;
private MySqlTypeConverter typeConverter;

public MySqlCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo, null);
this.version = resolveVersion();
this.typeConverter = new MySqlTypeConverter(version);
}

@Override
Expand Down Expand Up @@ -130,7 +137,8 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
// e.g. `varchar(10)` is 40
long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
// e.g. `timestamp(3)` is 3
int timePrecision = resultSet.getInt("DATETIME_PRECISION");
int timePrecision =
MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION");

Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0));
Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));
Expand All @@ -152,12 +160,13 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
.defaultValue(defaultValue)
.comment(comment)
.build();
return MySqlTypeConverter.INSTANCE.convert(typeDefine);
return typeConverter.convert(typeDefine);
}

@Override
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return MysqlCreateTableSqlBuilder.builder(tablePath, table).build(table.getCatalogName());
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter)
.build(table.getCatalogName());
}

@Override
Expand All @@ -179,7 +188,8 @@ protected String getDropDatabaseSql(String databaseName) {
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new MySqlTypeMapper());
return CatalogUtils.getCatalogTable(
defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter));
}

@Override
Expand All @@ -193,4 +203,18 @@ public String getExistDataSql(TablePath tablePath) {
"SELECT * FROM `%s`.`%s` LIMIT 1;",
tablePath.getDatabaseName(), tablePath.getTableName());
}

private MySqlVersion resolveVersion() {
try (Statement statement = getConnection(defaultUrl).createStatement();
ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) {
resultSet.next();
return MySqlVersion.parse(resultSet.getString(1));
} catch (Exception e) {
log.info(
"Failed to get mysql version, fallback to default version: {}",
MySqlVersion.V_5_7,
e);
return MySqlVersion.V_5_7;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
Expand Down Expand Up @@ -57,20 +58,23 @@ public class MysqlCreateTableSqlBuilder {

private String fieldIde;

private MysqlCreateTableSqlBuilder(String tableName) {
private final MySqlTypeConverter typeConverter;

private MysqlCreateTableSqlBuilder(String tableName, MySqlTypeConverter typeConverter) {
checkNotNull(tableName, "tableName must not be null");
this.tableName = tableName;
this.typeConverter = typeConverter;
}

public static MysqlCreateTableSqlBuilder builder(
TablePath tablePath, CatalogTable catalogTable) {
TablePath tablePath, CatalogTable catalogTable, MySqlTypeConverter typeConverter) {
checkNotNull(tablePath, "tablePath must not be null");
checkNotNull(catalogTable, "catalogTable must not be null");

TableSchema tableSchema = catalogTable.getTableSchema();
checkNotNull(tableSchema, "tableSchema must not be null");

return new MysqlCreateTableSqlBuilder(tablePath.getTableName())
return new MysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter)
.comment(catalogTable.getComment())
// todo: set charset and collate
.engine(null)
Expand Down Expand Up @@ -167,10 +171,16 @@ private String buildColumnIdentifySql(Column column, String catalogName) {
final List<String> columnSqls = new ArrayList<>();
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
boolean isSupportDef = true;
if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {

if ((SqlType.TIME.equals(column.getDataType().getSqlType())
|| SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
&& column.getScale() != null) {
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
columnSqls.add(column.getSourceType());
} else {
BasicTypeDefine<MysqlType> typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
}
// nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public SeaTunnelDataType<?> toSeaTunnelType(
.scale(scale)
.build();

return MySqlTypeConverter.INSTANCE.convert(typeDefine).getDataType();
return MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine).getDataType();
}

@Override
Expand All @@ -122,7 +122,8 @@ public MysqlType toConnectorType(
.nullable(true)
.build();

BasicTypeDefine<MysqlType> typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
BasicTypeDefine<MysqlType> typeDefine =
MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
return typeDefine.getNativeType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ public class MySqlTypeConverter implements TypeConverter<BasicTypeDefine<MysqlTy
public static final long POWER_2_24 = (long) Math.pow(2, 24);
public static final long POWER_2_32 = (long) Math.pow(2, 32);
public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
public static final MySqlTypeConverter INSTANCE = new MySqlTypeConverter();
public static final MySqlTypeConverter DEFAULT_INSTANCE =
new MySqlTypeConverter(MySqlVersion.V_5_7);

private final MySqlVersion version;

public MySqlTypeConverter(MySqlVersion version) {
this.version = version;
}

@Override
public String identifier() {
Expand Down Expand Up @@ -462,7 +469,9 @@ public BasicTypeDefine<MysqlType> reconvert(Column column) {
case TIME:
builder.nativeType(MysqlType.TIME);
builder.dataType(MYSQL_TIME);
if (column.getScale() != null && column.getScale() > 0) {
if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
builder.columnType(MYSQL_TIME);
} else if (column.getScale() != null && column.getScale() > 0) {
int timeScale = column.getScale();
if (timeScale > MAX_TIME_SCALE) {
timeScale = MAX_TIME_SCALE;
Expand All @@ -484,7 +493,9 @@ public BasicTypeDefine<MysqlType> reconvert(Column column) {
case TIMESTAMP:
builder.nativeType(MysqlType.DATETIME);
builder.dataType(MYSQL_DATETIME);
if (column.getScale() != null && column.getScale() > 0) {
if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
builder.columnType(MYSQL_DATETIME);
} else if (column.getScale() != null && column.getScale() > 0) {
int timestampScale = column.getScale();
if (timestampScale > MAX_TIMESTAMP_SCALE) {
timestampScale = MAX_TIMESTAMP_SCALE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,19 @@

public class MySqlTypeMapper implements JdbcDialectTypeMapper {

private MySqlTypeConverter typeConverter;

public MySqlTypeMapper() {
this(MySqlTypeConverter.DEFAULT_INSTANCE);
}

public MySqlTypeMapper(MySqlTypeConverter typeConverter) {
this.typeConverter = typeConverter;
}

@Override
public Column mappingColumn(BasicTypeDefine typeDefine) {
return MySqlTypeConverter.INSTANCE.convert(typeDefine);
return typeConverter.convert(typeDefine);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;

public enum MySqlVersion {
V_5_5,
V_5_6,
V_5_7,
V_8;

public static MySqlVersion parse(String version) {
if (version != null) {
if (version.startsWith("5.5")) {
return V_5_5;
}
if (version.startsWith("5.6")) {
return V_5_6;
}
if (version.startsWith("5.7")) {
return V_5_7;
}
if (version.startsWith("8.0")) {
return V_8;
}
}
throw new UnsupportedOperationException("Unsupported MySQL version: " + version);
}

public boolean isBefore(MySqlVersion version) {
return this.compareTo(version) < 0;
}

public boolean isAtOrBefore(MySqlVersion version) {
return this.compareTo(version) <= 0;
}

public boolean isAfter(MySqlVersion version) {
return this.compareTo(version) > 0;
}

public boolean isAtOrAfter(MySqlVersion version) {
return this.compareTo(version) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -91,7 +92,8 @@ public void testBuild() {
"User table");

String createTableSql =
MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable)
MysqlCreateTableSqlBuilder.builder(
tablePath, catalogTable, MySqlTypeConverter.DEFAULT_INSTANCE)
.build(DatabaseIdentifier.MYSQL);
// create table sql is change; The old unit tests are no longer applicable
String expect =
Expand Down
Loading

0 comments on commit 058f559

Please sign in to comment.