Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
Expand Down Expand Up @@ -88,7 +89,9 @@ public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {

sourceAttributes.put("source.realtime.mode", "log");
sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "iotdb-thrift-sink");
Expand Down Expand Up @@ -173,7 +176,9 @@ private void testSinkFormat(final String format, final boolean isAsyncLoad) thro
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "iotdb-thrift-sink");
Expand Down Expand Up @@ -213,7 +218,11 @@ private void testSinkFormat(final String format, final boolean isAsyncLoad) thro
handleFailure);

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq("testPipe").setIsTableModel(false)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq("testPipe").setIsTableModel(true)).getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
Expand Down Expand Up @@ -260,7 +269,9 @@ public void testWriteBackSink() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("forwarding-pipe-requests", "false");
sourceAttributes.put("source.database-name", "test.*");
sourceAttributes.put("source.table-name", "test.*");
Expand Down Expand Up @@ -377,7 +388,9 @@ private void doTest(BiConsumer<Map<String, List<Tablet>>, Map<String, List<Table
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("source.database-name", "test.*");
sourceAttributes.put("source.table-name", "test.*");
sourceAttributes.put("user", "root");
Expand Down Expand Up @@ -734,7 +747,9 @@ public void testLoadTsFileWithoutVerify() throws Exception {

sourceAttributes.put("source.realtime.mode", "batch");
sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "iotdb-thrift-sink");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;

import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
Expand All @@ -42,6 +47,9 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.fail;

Expand Down Expand Up @@ -245,7 +253,7 @@ public void testReadPipeIsolation() {
}

@Test
public void testCaptureTreeAndTableIsolation() throws Exception {
public void testCaptureTreeAndTableIgnoredByDialectIsolation() throws Exception {
final String treePipeName = "tree_a2b";
final String tablePipeName = "table_a2b";

Expand All @@ -272,7 +280,7 @@ public void testCaptureTreeAndTableIsolation() throws Exception {
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 2. Create table pipe by table session
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -292,30 +300,105 @@ public void testCaptureTreeAndTableIsolation() throws Exception {
}

// Show tree pipe by tree session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 3. Drop pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(false)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(false))
.getCode());
client.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode());
}
}

@Test
public void testSameNameTreeOnlyAndTableOnlyPipeIsolation() throws Exception {
final String pipeName = "same_name_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("stop pipe " + pipeName);
}

Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("drop pipe " + pipeName);
}

Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("start pipe " + pipeName);
statement.execute("drop pipe " + pipeName);
}

Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
}

@Test
public void testCaptureCornerCases() {
public void testSameNamePipeWithCaptureAttributesStillIsolated() throws Exception {
final String pipeName = "same_name_conflict_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

// 1. Create tree pipe but capture table data
try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s"
+ " with source ('capture.tree'='true','capture.table'='true')"
+ " with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
}

@Test
public void testCaptureAttributesAreIgnoredByDialect() {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

// 1. Create tree pipe with capture attributes pointing to table data
try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
Expand All @@ -333,12 +416,12 @@ public void testCaptureCornerCases() {
}

// Show tree pipe by tree session
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 2. Create table pipe but capture tree data
// 2. Create table pipe with capture attributes pointing to tree data
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
Expand Down Expand Up @@ -373,14 +456,114 @@ public void testCaptureCornerCases() {
+ " with sink ("
+ "'node-urls'='%s')",
"p3", receiverDataNode.getIpAndPortString()));
fail();
} catch (final SQLException ignored) {
} catch (final SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

// Show tree pipe by tree session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
}

@Test
public void testDirectRpcCreationDialectCompatibility() throws Exception {
final String pipeName = "rpc_same_name_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.ip", receiverDataNode.getIp());
sinkAttributes.put("sink.port", String.valueOf(receiverDataNode.getPort()));

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.createPipe(new TCreatePipeReq(pipeName, sinkAttributes)).getCode());

Assert.assertEquals(1, showPipes(client, false).size());
Assert.assertEquals(0, showPipes(client, true).size());
Assert.assertTrue(
showPipes(client, false)
.get(0)
.pipeExtractor
.contains(
SystemConstant.SQL_DIALECT_KEY + "=" + SystemConstant.SQL_DIALECT_TREE_VALUE));

final Map<String, String> tableSourceAttributes = new HashMap<>();
tableSourceAttributes.put(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq(pipeName, sinkAttributes)
.setExtractorAttributes(tableSourceAttributes))
.getCode());

Assert.assertEquals(1, showPipes(client, false).size());
Assert.assertEquals(1, showPipes(client, true).size());
Assert.assertTrue(
showPipes(client, true)
.get(0)
.pipeExtractor
.contains(
SystemConstant.SQL_DIALECT_KEY + "=" + SystemConstant.SQL_DIALECT_TABLE_VALUE));

Assert.assertNotEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.createPipe(new TCreatePipeReq(pipeName, sinkAttributes)).getCode());
}
}

@Test
public void testLegacyLifecycleRpcPrefersTreePipeThenTablePipe() throws Exception {
final String pipeName = "legacy_same_name_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(1, showPipes(client, false).size());
Assert.assertEquals(1, showPipes(client, true).size());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe(pipeName).getCode());
Assert.assertEquals(0, showPipes(client, false).size());
Assert.assertEquals(1, showPipes(client, true).size());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe(pipeName).getCode());
Assert.assertEquals(0, showPipes(client, false).size());
Assert.assertEquals(0, showPipes(client, true).size());
}
}

private List<TShowPipeInfo> showPipes(
final SyncConfigNodeIServiceClient client, final boolean isTableModel) throws Exception {
final List<TShowPipeInfo> showPipeResult =
client.showPipe(
new TShowPipeReq()
.setIsTableModel(isTableModel)
.setUserName(SessionConfig.DEFAULT_USER))
.pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
return showPipeResult;
}
}
Loading
Loading