mirror of
https://github.com/Snowflake-Labs/dlsync.git
synced 2025-12-18 00:51:27 +00:00
add support for pipes
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -9,4 +9,7 @@ log/
|
|||||||
target/
|
target/
|
||||||
.env
|
.env
|
||||||
Jenkinsfile-app
|
Jenkinsfile-app
|
||||||
out/
|
out/
|
||||||
|
.vscode/
|
||||||
|
Snowflake/
|
||||||
|
*.p8
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
create or replace pipe ${EXAMPLE_DB}.${MAIN_SCHEMA}.PIPE_STG_FINANCE_DATA
|
||||||
|
auto_ingest=true as
|
||||||
|
copy into ${EXAMPLE_DB}.${MAIN_SCHEMA}.STG_FINANCE_DATA (
|
||||||
|
BRAND_CODE,
|
||||||
|
GROSS,
|
||||||
|
NET,
|
||||||
|
TAX
|
||||||
|
)
|
||||||
|
from @${EXAMPLE_DB}.${MAIN_SCHEMA}.STAGE_DEV_S3 (file_format => ${EXAMPLE_DB}.${MAIN_SCHEMA}.FF_S3_CSV, PATTERN => "FINANCE/\d{4}/\d{2}/\d{2}/FINANCEDATA.csv");
|
||||||
17
pom.xml
17
pom.xml
@@ -9,8 +9,6 @@
|
|||||||
<version>2.0-SNAPSHOT</version>
|
<version>2.0-SNAPSHOT</version>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<maven.compiler.source>11</maven.compiler.source>
|
|
||||||
<maven.compiler.target>11</maven.compiler.target>
|
|
||||||
</properties>
|
</properties>
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
@@ -50,7 +48,7 @@
|
|||||||
<groupId>org.projectlombok</groupId>
|
<groupId>org.projectlombok</groupId>
|
||||||
<artifactId>lombok</artifactId>
|
<artifactId>lombok</artifactId>
|
||||||
<version>1.18.24</version>
|
<version>1.18.24</version>
|
||||||
<scope>provided</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-cli</groupId>
|
<groupId>commons-cli</groupId>
|
||||||
@@ -58,8 +56,8 @@
|
|||||||
<version>1.9.0</version>
|
<version>1.9.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>antlr4-runtime</groupId>
|
<groupId>org.antlr</groupId>
|
||||||
<artifactId>org.antlr</artifactId>
|
<artifactId>antlr4-runtime</artifactId>
|
||||||
<version>4.13.2</version>
|
<version>4.13.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
@@ -80,6 +78,15 @@
|
|||||||
</dependencies>
|
</dependencies>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<version>3.11.0</version> <!-- Use the latest stable -->
|
||||||
|
<configuration>
|
||||||
|
<release>11</release>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-surefire-plugin</artifactId>
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
|||||||
@@ -210,7 +210,12 @@ public class ScriptRepo {
|
|||||||
try {
|
try {
|
||||||
connection.setAutoCommit(false);
|
connection.setAutoCommit(false);
|
||||||
if(!onlyHashes) {
|
if(!onlyHashes) {
|
||||||
statement.executeUpdate(script.getContent());
|
if(script.getObjectType().getSingular() == "PIPE"){
|
||||||
|
statement.execute(script.getContent());
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
statement.executeUpdate(script.getContent());
|
||||||
|
}
|
||||||
log.debug("Creating object using the SQL: {}", script.getContent());
|
log.debug("Creating object using the SQL: {}", script.getContent());
|
||||||
}
|
}
|
||||||
updateScriptHash(script);
|
updateScriptHash(script);
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
package com.snowflake.dlsync.models;
|
package com.snowflake.dlsync.models;
|
||||||
|
|
||||||
public enum ScriptObjectType {
|
public enum ScriptObjectType {
|
||||||
VIEWS("VIEW"),FUNCTIONS("FUNCTION"),PROCEDURES("PROCEDURE"),FILE_FORMATS("FILE FORMAT"),TABLES("TABLE"),STREAMS("STREAM"),SEQUENCES("SEQUENCE"),STAGES("STAGE"),TASKS("TASK"),STREAMLITS("STREAMLIT");
|
VIEWS("VIEW"),FUNCTIONS("FUNCTION"),PROCEDURES("PROCEDURE"),FILE_FORMATS("FILE FORMAT"),TABLES("TABLE"),STREAMS("STREAM"),SEQUENCES("SEQUENCE"),STAGES("STAGE"),TASKS("TASK"),STREAMLITS("STREAMLIT"),PIPES("PIPE");
|
||||||
|
|
||||||
private final String singular;
|
private final String singular;
|
||||||
private ScriptObjectType(String type) {
|
private ScriptObjectType(String type) {
|
||||||
|
|||||||
@@ -505,13 +505,39 @@ class SqlTokenizerTest {
|
|||||||
assertEquals(1, scripts.size(), "There should be exactly one script parsed");
|
assertEquals(1, scripts.size(), "There should be exactly one script parsed");
|
||||||
|
|
||||||
Script script = scripts.iterator().next();
|
Script script = scripts.iterator().next();
|
||||||
assertEquals("STREAMLIT1", script.getObjectName(), "Object name should be VIEW1");
|
assertEquals("STREAMLIT1", script.getObjectName(), "Object name should be STREAMLIT1");
|
||||||
assertEquals("db1".toUpperCase(), script.getDatabaseName(), "Database name should be db1");
|
assertEquals("db1".toUpperCase(), script.getDatabaseName(), "Database name should be db1");
|
||||||
assertEquals("schema1".toUpperCase(), script.getSchemaName(), "Schema name should be schema1");
|
assertEquals("schema1".toUpperCase(), script.getSchemaName(), "Schema name should be schema1");
|
||||||
assertEquals(ScriptObjectType.STREAMLITS, script.getObjectType(), "Object type should be VIEWS");
|
assertEquals(ScriptObjectType.STREAMLITS, script.getObjectType(), "Object type should be STREAMLITS");
|
||||||
assertEquals(content, script.getContent(), "Script content should match the input content");
|
assertEquals(content, script.getContent(), "Script content should match the input content");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void parseScriptTypePipe() {
|
||||||
|
String filePath = "db_scripts/db1/schema1/PIPES/PIPE1.SQL";
|
||||||
|
String name = "PIPE1.SQL";
|
||||||
|
String scriptType = "PIPES";
|
||||||
|
String content = "CREATE OR REPLACE PIPE db1.schema1.PIPE1\n" +
|
||||||
|
" AUTO_INGEST = TRUE\n" +
|
||||||
|
" COMMENT = 'Loads data automatically from stage'\n" +
|
||||||
|
" AS COPY INTO db1.schema1.my_table\n" +
|
||||||
|
" FROM @my_stage/file_prefix\n" +
|
||||||
|
" FILE_FORMAT = (TYPE = 'CSV');";
|
||||||
|
|
||||||
|
Set<Script> scripts = SqlTokenizer.parseScript(filePath, name, scriptType, content);
|
||||||
|
|
||||||
|
assertNotNull(scripts, "Scripts should not be null");
|
||||||
|
assertEquals(1, scripts.size(), "There should be exactly one script parsed");
|
||||||
|
|
||||||
|
Script script = scripts.iterator().next();
|
||||||
|
assertEquals("PIPE1", script.getObjectName(), "Object name should be PIPE1");
|
||||||
|
assertEquals("db1".toUpperCase(), script.getDatabaseName(), "Database name should be db1");
|
||||||
|
assertEquals("schema1".toUpperCase(), script.getSchemaName(), "Schema name should be schema1");
|
||||||
|
assertEquals(ScriptObjectType.PIPES, script.getObjectType(), "Object type should be PIPES");
|
||||||
|
assertEquals(content, script.getContent(), "Script content should match the input content");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user