Skip to content

Commit

Permalink
mongo:执行批量insert命令时需要处理记录的字段个数不一致或字段顺序不同的情况 lealone#211
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower authored and qm committed May 14, 2024
1 parent b82c187 commit 1b603b1
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.bson.io.ByteBufferBsonInput;
import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.StatementBuilder;
import org.lealone.common.util.Utils;
import org.lealone.db.api.ErrorCode;
import org.lealone.db.session.ServerSession;
import org.lealone.db.table.Column;
Expand Down Expand Up @@ -46,9 +45,8 @@ private static void addRows(BsonDocument topDoc, MongoServerConnection conn,
Table table = getTable(topDoc, documents.get(0), "insert", session);
Insert insert = new Insert(session);
insert.setTable(table);
for (int i = 0; i < size; i++) {
ArrayList<Column> columns = Utils.newSmallArrayList();
ArrayList<Expression> values = Utils.newSmallArrayList();
outer: for (int i = 0; i < size; i++) {
Expression values[] = new Expression[table.getColumns().length];
HashSet<Column> set = new HashSet<>();
BsonDocument document = documents.get(i);
for (Entry<String, BsonValue> e : document.entrySet()) {
Expand All @@ -57,43 +55,45 @@ private static void addRows(BsonDocument topDoc, MongoServerConnection conn,
Column column = parseColumn(table, columnName);
if (column == null) {
// 如果后续写入的字段不存在,动态增加新的
column = addColumn(session, table, columnName, v);
addColumn(session, table, columnName, v);
insert.clearRows();
i = -1;
continue outer;
}
if (!set.add(column)) {
throw DbException.get(ErrorCode.DUPLICATE_COLUMN_NAME_1, column.getSQL());
}
try {
Value columnValuue = toValue(v);
columnValuue = column.convert(columnValuue);
values.add(ValueExpression.get(columnValuue));
values[column.getColumnId()] = ValueExpression.get(columnValuue);
} catch (Throwable t) {
// 如果后续写入的字段值的类型跟字段的类型不匹配,将字段的类型改成通用的varchar类型
column = alterColumnType(session, table, columnName);
values.add(toValueExpression(v));
alterColumnType(session, table, columnName);
insert.clearRows();
i = -1;
continue outer;
}
columns.add(column);
}
insert.setColumns(columns.toArray(new Column[columns.size()]));
insert.addRow(values.toArray(new Expression[values.size()]));
insert.addRow(values);
}
insert.setColumns(table.getColumns());
insert.prepare();
createAndSubmitYieldableUpdate(task, insert);
}

private static Column addColumn(ServerSession session, Table table, String columnName, BsonValue v) {
private static void addColumn(ServerSession session, Table table, String columnName, BsonValue v) {
StatementBuilder sql = new StatementBuilder();
sql.append("ALTER TABLE ").append(table.getName()).append(" ADD COLUMN ").append(columnName)
.append(" ");
appendColumnType(sql, v);
session.executeUpdateLocal(sql.toString());
return parseColumn(table, columnName);
}

private static Column alterColumnType(ServerSession session, Table table, String columnName) {
private static void alterColumnType(ServerSession session, Table table, String columnName) {
StatementBuilder sql = new StatementBuilder();
sql.append("ALTER TABLE ").append(table.getName()).append(" ALTER COLUMN ").append(columnName)
.append(" varchar");
session.executeUpdateLocal(sql.toString());
return parseColumn(table, columnName);
}
}
4 changes: 4 additions & 0 deletions lealone-sql/src/main/java/org/lealone/sql/dml/MerSert.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public void addRow(Expression[] expr) {
list.add(expr);
}

public void clearRows() {
list.clear();
}

@Override
public int getPriority() {
if (getCurrentRowNumber() > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ void insert() {
ArrayList<Document> documents = new ArrayList<>();
documents.add(createDocument(11, 21));
documents.add(createDocument(12, 22));
// 测试列不匹配的情况
documents.add(new Document().append("_id", ++id).append("f1", 111));
collection.insertMany(documents);

long count = collection.countDocuments();
System.out.println("total document count: " + count);
assertEquals(4, count);
assertEquals(5, count);
}

void delete() {
Expand Down

0 comments on commit 1b603b1

Please sign in to comment.