From 5c06909d8283ab07cafb0948857c7356e97fff50 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= <kamil.bregula@snowflake.com>
Date: Tue, 15 Mar 2022 22:36:07 +0100
Subject: [PATCH] Merge pull request #17052 from [BEAM-13818] [SnowflakeIO] Add
 support for unencrypted private keys

* [BEAM-13818] [SnowflakeIO] Add support for unencrypted private keys

* fixup! [BEAM-13818] [SnowflakeIO] Add support for unencrypted private keys
---
 build.gradle.kts                              |   3 +-
 sdks/java/io/snowflake/build.gradle           |   1 +
 .../beam/sdk/io/snowflake/KeyPairUtils.java   | 113 ++++++++++++--
 .../beam/sdk/io/snowflake/SnowflakeIO.java    |  66 ++++++--
 .../beam/sdk/io/snowflake/test/TestUtils.java |  22 ++-
 .../unit/DataSourceConfigurationTest.java     | 142 ++++++++++++++++--
 .../test/unit/write/StreamingWriteTest.java   |   4 +-
 ...key.p8 => valid_encrypted_test_rsa_key.p8} |   0
 .../valid_unencrypted_test_rsa_key.p8         |  28 ++++
 sdks/python/apache_beam/io/snowflake.py       |  12 +-
 .../en/documentation/io/built-in/snowflake.md |   4 -
 11 files changed, 338 insertions(+), 57 deletions(-)
 rename sdks/java/io/snowflake/src/test/resources/{valid_test_rsa_key.p8 => valid_encrypted_test_rsa_key.p8} (100%)
 create mode 100644 sdks/java/io/snowflake/src/test/resources/valid_unencrypted_test_rsa_key.p8

diff --git a/build.gradle.kts b/build.gradle.kts
index c52201eb05e..207c2c694a9 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -108,7 +108,8 @@ tasks.rat {
 
     // test p8 file for SnowflakeIO
     "sdks/java/io/snowflake/src/test/resources/invalid_test_rsa_key.p8",
-    "sdks/java/io/snowflake/src/test/resources/valid_test_rsa_key.p8",
+    "sdks/java/io/snowflake/src/test/resources/valid_encrypted_test_rsa_key.p8",
+    "sdks/java/io/snowflake/src/test/resources/valid_unencrypted_test_rsa_key.p8",
 
     // Mockito extensions
     "sdks/java/io/amazon-web-services2/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker",
diff --git a/sdks/java/io/snowflake/build.gradle b/sdks/java/io/snowflake/build.gradle
index 3fe925046ef..59115cef969 100644
--- a/sdks/java/io/snowflake/build.gradle
+++ b/sdks/java/io/snowflake/build.gradle
@@ -33,6 +33,7 @@ dependencies {
   implementation group: 'net.snowflake', name: 'snowflake-jdbc', version: '3.12.11'
   implementation group: 'com.opencsv', name: 'opencsv', version: '5.0'
   implementation 'net.snowflake:snowflake-ingest-sdk:0.9.9'
+  implementation "org.bouncycastle:bcprov-jdk15on:1.70"
   implementation library.java.joda_time
   testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
   testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/KeyPairUtils.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/KeyPairUtils.java
index ca5753cc396..ee8098fa3e6 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/KeyPairUtils.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/KeyPairUtils.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.snowflake;
 
 import java.io.IOException;
+import java.io.StringReader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -27,32 +28,112 @@ import java.security.NoSuchAlgorithmException;
 import java.security.PrivateKey;
 import java.security.spec.InvalidKeySpecException;
 import java.security.spec.PKCS8EncodedKeySpec;
-import java.util.Base64;
 import javax.crypto.EncryptedPrivateKeyInfo;
 import javax.crypto.SecretKeyFactory;
 import javax.crypto.spec.PBEKeySpec;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.bouncycastle.util.encoders.Base64;
+import org.bouncycastle.util.encoders.DecoderException;
+import org.bouncycastle.util.io.pem.PemObject;
+import org.bouncycastle.util.io.pem.PemReader;
 
 public class KeyPairUtils {
+  private static final String ENCRYPTED_PRIVATE_KEY = "ENCRYPTED PRIVATE KEY";
+  private static final String UNENCRYPTED_PRIVATE_KEY = "PRIVATE KEY";
+
+  private enum KeyEncryptionState {
+    ENCRYPT,
+    UNENCRYPTED,
+    UNKNOWN
+  }
 
   public static PrivateKey preparePrivateKey(String privateKey, String privateKeyPassphrase) {
+    try {
+      KeyFactory keyFactory = KeyFactory.getInstance("RSA");
+      KeyEncryptionState encryptionState = guessKeyEncryptionState(privateKey);
+      if (encryptionState == KeyEncryptionState.ENCRYPT
+          && Strings.isNullOrEmpty(privateKeyPassphrase)) {
+        throw new RuntimeException(
+            "The private key is encrypted but no private key key passphrase has been provided.");
+      }
 
-    privateKey = privateKey.replace("-----BEGIN ENCRYPTED PRIVATE KEY-----", "");
-    privateKey = privateKey.replace("-----END ENCRYPTED PRIVATE KEY-----", "");
+      if (encryptionState == KeyEncryptionState.UNENCRYPTED
+          && !Strings.isNullOrEmpty(privateKeyPassphrase)) {
+        throw new RuntimeException(
+            "The private key is unencrypted but private key key passphrase has been provided.");
+      }
 
-    try {
-      EncryptedPrivateKeyInfo pkInfo =
-          new EncryptedPrivateKeyInfo(Base64.getMimeDecoder().decode(privateKey));
-      PBEKeySpec keySpec = new PBEKeySpec(privateKeyPassphrase.toCharArray());
-      SecretKeyFactory pbeKeyFactory = SecretKeyFactory.getInstance(pkInfo.getAlgName());
-      PKCS8EncodedKeySpec encodedKeySpec = pkInfo.getKeySpec(pbeKeyFactory.generateSecret(keySpec));
+      byte[] decoded;
 
-      KeyFactory keyFactory = KeyFactory.getInstance("RSA");
-      return keyFactory.generatePrivate(encodedKeySpec);
-    } catch (IOException
-        | NoSuchAlgorithmException
-        | InvalidKeySpecException
-        | InvalidKeyException ex) {
-      throw new RuntimeException("Can't create private key");
+      if (encryptionState == KeyEncryptionState.UNKNOWN) {
+        decoded = Base64.decode(privateKey);
+      } else {
+        PemReader pr = new PemReader(new StringReader(privateKey));
+        PemObject pemObject = pr.readPemObject();
+        decoded = pemObject.getContent();
+        pr.close();
+      }
+
+      if (Strings.isNullOrEmpty(privateKeyPassphrase)) {
+        // unencrypted private key file
+        PKCS8EncodedKeySpec encodedKeySpec = new PKCS8EncodedKeySpec(decoded);
+        return keyFactory.generatePrivate(encodedKeySpec);
+      } else {
+        // encrypted private key file
+        EncryptedPrivateKeyInfo pkInfo = new EncryptedPrivateKeyInfo(decoded);
+        PBEKeySpec keySpec = new PBEKeySpec(privateKeyPassphrase.toCharArray());
+        SecretKeyFactory pbeKeyFactory = SecretKeyFactory.getInstance(pkInfo.getAlgName());
+        PKCS8EncodedKeySpec encodedKeySpec =
+            pkInfo.getKeySpec(pbeKeyFactory.generateSecret(keySpec));
+        return keyFactory.generatePrivate(encodedKeySpec);
+      }
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(
+          "Private key encryption algorithm not supported. This may mean that the private key was generated by OpenSSL 1.1.1g or newer "
+              + "which uses an encryption algorithm by default which has compatibility issues in some JVM environments. "
+              + "For details, see: "
+              + "https://community.snowflake.com/s/article/Private-key-provided-is-invalid-or-not-supported-rsa-key-p8--data-isn-t-an-object-ID"
+              + " "
+              + e.getMessage());
+    } catch (InvalidKeySpecException
+        | IOException
+        | IllegalArgumentException
+        | NullPointerException
+        | InvalidKeyException
+        | DecoderException e) {
+      throw new RuntimeException("Can't create private key: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Tries to determine whether the private key is encrypted or not based on the file headers.
+   *
+   * <p>If this is not possible (e.g. there are no headers), returns {@link
+   * KeyEncryptionState#UNKNOWN}
+   */
+  private static KeyEncryptionState guessKeyEncryptionState(String privateKey) {
+    PemReader pr = new PemReader(new StringReader(privateKey));
+    try {
+      PemObject pemObject = pr.readPemObject();
+      if (pemObject == null) {
+        // If it is not a PEM file then it is not possible to determine the encryption state
+        return KeyEncryptionState.UNKNOWN;
+      }
+      if (ENCRYPTED_PRIVATE_KEY.equals(pemObject.getType())) {
+        return KeyEncryptionState.ENCRYPT;
+      } else if (UNENCRYPTED_PRIVATE_KEY.equals(pemObject.getType())) {
+        return KeyEncryptionState.UNENCRYPTED;
+      } else {
+        throw new RuntimeException(
+            "Invalid type of PEM file: "
+                + pemObject.getType()
+                + ". Supported types: "
+                + ENCRYPTED_PRIVATE_KEY
+                + ", "
+                + UNENCRYPTED_PRIVATE_KEY);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Can't read parse private key");
     }
   }
 
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
index 1071997c852..90bc9ee188b 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java
@@ -1409,9 +1409,6 @@ public class SnowflakeIO {
     @Nullable
     public abstract PrivateKey getPrivateKey();
 
-    @Nullable
-    public abstract String getPrivateKeyPath();
-
     @Nullable
     public abstract ValueProvider<String> getRawPrivateKey();
 
@@ -1463,8 +1460,6 @@ public class SnowflakeIO {
 
       abstract Builder setPrivateKey(PrivateKey privateKey);
 
-      abstract Builder setPrivateKeyPath(String privateKeyPath);
-
       abstract Builder setRawPrivateKey(ValueProvider<String> rawPrivateKey);
 
       abstract Builder setPrivateKeyPassphrase(ValueProvider<String> privateKeyPassphrase);
@@ -1584,6 +1579,21 @@ public class SnowflakeIO {
           .build();
     }
 
+    /**
+     * Sets key pair authentication.
+     *
+     * @param username - Snowflake username.
+     * @param privateKeyPath - Private key path.
+     */
+    public DataSourceConfiguration withKeyPairPathAuth(
+        ValueProvider<String> username, String privateKeyPath) {
+      String privateKey = KeyPairUtils.readPrivateKeyFile(privateKeyPath);
+      return builder()
+          .setUsername(username)
+          .setRawPrivateKey(ValueProvider.StaticValueProvider.of(privateKey))
+          .build();
+    }
+
     /**
      * Sets key pair authentication.
      *
@@ -1602,6 +1612,21 @@ public class SnowflakeIO {
           .build();
     }
 
+    /**
+     * Sets key pair authentication.
+     *
+     * @param username - Snowflake username.
+     * @param privateKeyPath - Private key path.
+     */
+    public DataSourceConfiguration withKeyPairPathAuth(String username, String privateKeyPath) {
+      String privateKey = KeyPairUtils.readPrivateKeyFile(privateKeyPath);
+
+      return builder()
+          .setUsername(ValueProvider.StaticValueProvider.of(username))
+          .setRawPrivateKey(ValueProvider.StaticValueProvider.of(privateKey))
+          .build();
+    }
+
     /**
      * Sets key pair authentication.
      *
@@ -1620,6 +1645,17 @@ public class SnowflakeIO {
           .build();
     }
 
+    /**
+     * Sets key pair authentication.
+     *
+     * @param username - Snowflake username.
+     * @param rawPrivateKey - Raw private key.
+     */
+    public DataSourceConfiguration withKeyPairRawAuth(
+        ValueProvider<String> username, ValueProvider<String> rawPrivateKey) {
+      return builder().setUsername(username).setRawPrivateKey(rawPrivateKey).build();
+    }
+
     /**
      * Sets key pair authentication.
      *
@@ -1636,6 +1672,19 @@ public class SnowflakeIO {
           .build();
     }
 
+    /**
+     * Sets key pair authentication.
+     *
+     * @param username - Snowflake username.
+     * @param rawPrivateKey - Raw private key.
+     */
+    public DataSourceConfiguration withKeyPairRawAuth(String username, String rawPrivateKey) {
+      return builder()
+          .setUsername(ValueProvider.StaticValueProvider.of(username))
+          .setRawPrivateKey(ValueProvider.StaticValueProvider.of(rawPrivateKey))
+          .build();
+    }
+
     /**
      * Sets URL of Snowflake server in following format:
      * jdbc:snowflake://<account_name>.snowflakecomputing.com
@@ -1782,15 +1831,12 @@ public class SnowflakeIO {
         } else if (isNotEmpty(getUsername()) && getPrivateKey() != null) {
           basicDataSource.setUser(getUsername().get());
           basicDataSource.setPrivateKey(getPrivateKey());
-        } else if (isNotEmpty(getUsername())
-            && isNotEmpty(getPrivateKeyPassphrase())
-            && isNotEmpty(getRawPrivateKey())) {
+        } else if (isNotEmpty(getUsername()) && isNotEmpty(getRawPrivateKey())) {
           PrivateKey privateKey =
               KeyPairUtils.preparePrivateKey(
-                  getRawPrivateKey().get(), getPrivateKeyPassphrase().get());
+                  getRawPrivateKey().get(), getValueOrNull(getPrivateKeyPassphrase()));
           basicDataSource.setPrivateKey(privateKey);
           basicDataSource.setUser(getUsername().get());
-
         } else if (isNotEmpty(getUsername()) && isNotEmpty(getPassword())) {
           basicDataSource.setUser(getUsername().get());
           basicDataSource.setPassword(getPassword().get());
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
index 2dbceacef9f..8807a972eaf 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/TestUtils.java
@@ -55,7 +55,10 @@ public class TestUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
 
-  private static final String VALID_PRIVATE_KEY_FILE_NAME = "valid_test_rsa_key.p8";
+  private static final String VALID_ENCRYPTED_PRIVATE_KEY_FILE_NAME =
+      "valid_encrypted_test_rsa_key.p8";
+  private static final String VALID_UNENCRYPTED_PRIVATE_KEY_FILE_NAME =
+      "valid_unencrypted_test_rsa_key.p8";
   private static final String INVALID_PRIVATE_KEY_FILE_NAME = "invalid_test_rsa_key.p8";
   private static final String PRIVATE_KEY_PASSPHRASE = "snowflake";
 
@@ -164,16 +167,25 @@ public class TestUtils {
     return lines;
   }
 
+  public static String getValidEncryptedPrivateKeyPath(Class c) {
+    return getPrivateKeyPath(c, VALID_ENCRYPTED_PRIVATE_KEY_FILE_NAME);
+  }
+
+  public static String getValidUnencryptedPrivateKeyPath(Class c) {
+    return getPrivateKeyPath(c, VALID_UNENCRYPTED_PRIVATE_KEY_FILE_NAME);
+  }
+
   public static String getInvalidPrivateKeyPath(Class c) {
     return getPrivateKeyPath(c, INVALID_PRIVATE_KEY_FILE_NAME);
   }
 
-  public static String getValidPrivateKeyPath(Class c) {
-    return getPrivateKeyPath(c, VALID_PRIVATE_KEY_FILE_NAME);
+  public static String getRawValidEncryptedPrivateKey(Class c) throws IOException {
+    byte[] keyBytes = Files.readAllBytes(Paths.get(getValidEncryptedPrivateKeyPath(c)));
+    return new String(keyBytes, StandardCharsets.UTF_8);
   }
 
-  public static String getRawValidPrivateKey(Class c) throws IOException {
-    byte[] keyBytes = Files.readAllBytes(Paths.get(getValidPrivateKeyPath(c)));
+  public static String getRawValidUnencryptedPrivateKey(Class c) throws IOException {
+    byte[] keyBytes = Files.readAllBytes(Paths.get(getValidUnencryptedPrivateKeyPath(c)));
     return new String(keyBytes, StandardCharsets.UTF_8);
   }
 
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java
index 317020627f2..37d04ed5926 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java
@@ -17,10 +17,15 @@
  */
 package org.apache.beam.sdk.io.snowflake.test.unit;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
 import javax.sql.DataSource;
 import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
 import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
@@ -162,7 +167,13 @@ public class DataSourceConfigurationTest {
   @Test
   public void testSettingUsernamePasswordAuth() {
 
-    configuration = configuration.withUsernamePasswordAuth(USERNAME, PASSWORD);
+    configuration =
+        configuration.withUsernamePasswordAuth(USERNAME, PASSWORD).withServerName(SERVER_NAME);
+
+    // Make sure DataSource can be built
+    DataSource dataSource = configuration.buildDatasource();
+    assertEquals(SnowflakeBasicDataSource.class, dataSource.getClass());
+
     assertEquals(USERNAME, configuration.getUsername().get());
     assertEquals(PASSWORD, configuration.getPassword().get());
   }
@@ -180,13 +191,18 @@ public class DataSourceConfigurationTest {
   public void testSettingOAuth() {
     String token = "token";
 
-    configuration = configuration.withOAuth(token);
+    configuration = configuration.withOAuth(token).withServerName(SERVER_NAME);
+
+    // Make sure DataSource can be built
+    DataSource dataSource = configuration.buildDatasource();
+    assertEquals(SnowflakeBasicDataSource.class, dataSource.getClass());
+
     assertEquals(token, configuration.getOauthToken().get());
   }
 
   @Test
   public void testSettingKeyPairAuthWithProperPathToKey() {
-    String privateKeyPath = TestUtils.getValidPrivateKeyPath(getClass());
+    String privateKeyPath = TestUtils.getValidEncryptedPrivateKeyPath(getClass());
     String keyPassphrase = TestUtils.getPrivateKeyPassphrase();
 
     configuration =
@@ -194,8 +210,12 @@ public class DataSourceConfigurationTest {
             .withServerName(SERVER_NAME)
             .withKeyPairPathAuth(USERNAME, privateKeyPath, keyPassphrase);
 
+    // Make sure DataSource can be built
+    DataSource dataSource = configuration.buildDatasource();
+    assertEquals(SnowflakeBasicDataSource.class, dataSource.getClass());
+
     assertEquals(USERNAME, configuration.getUsername().get());
-    //  TODO  assertEquals(privateKeyPath, configuration.getPrivateKeyPath());
+    assertNotNull(configuration.getRawPrivateKey());
     assertEquals(keyPassphrase, configuration.getPrivateKeyPassphrase().get());
   }
 
@@ -210,12 +230,57 @@ public class DataSourceConfigurationTest {
             .withKeyPairPathAuth(USERNAME, privateKeyPath, keyPassphrase);
 
     Exception ex = assertThrows(RuntimeException.class, () -> configuration.buildDatasource());
-    assertEquals("Can't create private key", ex.getMessage());
+    assertThat(ex.getMessage(), containsString("Can't create private key: "));
+  }
+
+  @Test
+  public void testSettingKeyPairAuthWithProperPathToUnencryptedKeyAndPassphrase() {
+    String privateKeyPath = TestUtils.getValidUnencryptedPrivateKeyPath(getClass());
+    String keyPassphrase = "test-passphrase";
+
+    configuration =
+        configuration
+            .withServerName(SERVER_NAME)
+            .withKeyPairPathAuth(USERNAME, privateKeyPath, keyPassphrase);
+
+    Exception ex = assertThrows(RuntimeException.class, () -> configuration.buildDatasource());
+    assertThat(
+        ex.getMessage(),
+        containsString(
+            "The private key is unencrypted but private key key passphrase has been provided."));
+  }
+
+  @Test
+  public void testSettingKeyPairAuthWithProperPathToEncryptedKeyAndNoPassphrase() {
+    String privateKeyPath = TestUtils.getValidEncryptedPrivateKeyPath(getClass());
+
+    configuration =
+        configuration.withServerName(SERVER_NAME).withKeyPairPathAuth(USERNAME, privateKeyPath);
+
+    Exception ex = assertThrows(RuntimeException.class, () -> configuration.buildDatasource());
+    assertThat(
+        ex.getMessage(),
+        containsString(
+            "The private key is encrypted but no private key key passphrase has been provided."));
+  }
+
+  @Test
+  public void testSettingKeyPairAuthWithProperPathToEncryptedKeyAndInvalidPassphrase() {
+    String privateKeyPath = TestUtils.getValidEncryptedPrivateKeyPath(getClass());
+    String keyPassphrase = "invalid-passphrase";
+
+    configuration =
+        configuration
+            .withServerName(SERVER_NAME)
+            .withKeyPairPathAuth(USERNAME, privateKeyPath, keyPassphrase);
+
+    Exception ex = assertThrows(RuntimeException.class, () -> configuration.buildDatasource());
+    assertThat(ex.getMessage(), containsString("Can't create private key: "));
   }
 
   @Test
   public void testSettingKeyPairAuthWithProperPathToKeyAndMissingUsername() {
-    String privateKeyPath = TestUtils.getValidPrivateKeyPath(getClass());
+    String privateKeyPath = TestUtils.getValidEncryptedPrivateKeyPath(getClass());
     String keyPassphrase = TestUtils.getPrivateKeyPassphrase();
 
     configuration =
@@ -229,7 +294,29 @@ public class DataSourceConfigurationTest {
 
   @Test
   public void testSettingKeyPairAuthWithProperRawKey() throws IOException {
-    String rawPrivateKey = TestUtils.getRawValidPrivateKey(getClass());
+    String rawPrivateKey = TestUtils.getRawValidEncryptedPrivateKey(getClass());
+    String keyPassphrase = TestUtils.getPrivateKeyPassphrase();
+
+    configuration =
+        configuration
+            .withServerName(SERVER_NAME)
+            .withKeyPairRawAuth(USERNAME, rawPrivateKey, keyPassphrase);
+
+    // Make sure DataSource can be built
+    DataSource dataSource = configuration.buildDatasource();
+    assertEquals(SnowflakeBasicDataSource.class, dataSource.getClass());
+
+    assertEquals(USERNAME, configuration.getUsername().get());
+    assertEquals(rawPrivateKey, configuration.getRawPrivateKey().get());
+    assertEquals(keyPassphrase, configuration.getPrivateKeyPassphrase().get());
+  }
+
+  @Test
+  public void testSettingKeyPairAuthWithProperRawEncryptedKeyWithoutHeaders() throws IOException {
+    String rawPrivateKey =
+        Arrays.stream(TestUtils.getRawValidEncryptedPrivateKey(getClass()).split("\n"))
+            .filter(d -> !d.contains("---"))
+            .collect(Collectors.joining("\n"));
     String keyPassphrase = TestUtils.getPrivateKeyPassphrase();
 
     configuration =
@@ -237,14 +324,51 @@ public class DataSourceConfigurationTest {
             .withServerName(SERVER_NAME)
             .withKeyPairRawAuth(USERNAME, rawPrivateKey, keyPassphrase);
 
+    // Make sure DataSource can be built
+    DataSource dataSource = configuration.buildDatasource();
+    assertEquals(SnowflakeBasicDataSource.class, dataSource.getClass());
+
     assertEquals(USERNAME, configuration.getUsername().get());
     assertEquals(rawPrivateKey, configuration.getRawPrivateKey().get());
     assertEquals(keyPassphrase, configuration.getPrivateKeyPassphrase().get());
   }
 
+  @Test
+  public void testSettingKeyPairAuthWithProperRawUnencryptedKeyWithoutHeaders() throws IOException {
+    String rawPrivateKey =
+        Arrays.stream(TestUtils.getRawValidUnencryptedPrivateKey(getClass()).split("\n"))
+            .filter(d -> !d.contains("---"))
+            .collect(Collectors.joining("\n"));
+
+    configuration =
+        configuration.withServerName(SERVER_NAME).withKeyPairRawAuth(USERNAME, rawPrivateKey);
+
+    // Make sure DataSource can be built
+    DataSource dataSource = configuration.buildDatasource();
+    assertEquals(SnowflakeBasicDataSource.class, dataSource.getClass());
+
+    assertEquals(USERNAME, configuration.getUsername().get());
+    assertEquals(rawPrivateKey, configuration.getRawPrivateKey().get());
+  }
+
+  @Test
+  public void testSettingKeyPairAuthWithProperRawUnencryptedKey() throws IOException {
+    String rawPrivateKey = TestUtils.getRawValidUnencryptedPrivateKey(getClass());
+
+    configuration =
+        configuration.withServerName(SERVER_NAME).withKeyPairRawAuth(USERNAME, rawPrivateKey);
+
+    // Make sure DataSource can be built
+    DataSource dataSource = configuration.buildDatasource();
+    assertEquals(SnowflakeBasicDataSource.class, dataSource.getClass());
+
+    assertEquals(USERNAME, configuration.getUsername().get());
+    assertEquals(rawPrivateKey, configuration.getRawPrivateKey().get());
+  }
+
   @Test
   public void testSettingKeyPairAuthWithProperRawKeyAndMissingUsername() throws IOException {
-    String rawPrivateKey = TestUtils.getRawValidPrivateKey(getClass());
+    String rawPrivateKey = TestUtils.getRawValidEncryptedPrivateKey(getClass());
     String keyPassphrase = TestUtils.getPrivateKeyPassphrase();
 
     configuration =
@@ -282,7 +406,7 @@ public class DataSourceConfigurationTest {
             .withKeyPairRawAuth(USERNAME, rawPrivateKey, keyPassphrase);
 
     Exception ex = assertThrows(RuntimeException.class, () -> configuration.buildDatasource());
-    assertEquals("Can't create private key", ex.getMessage());
+    assertThat(ex.getMessage(), containsString("Can't create private key: "));
   }
 
   @Test
diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
index 7c68fc2e506..a7f2cbe17e9 100644
--- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
+++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/write/StreamingWriteTest.java
@@ -192,7 +192,7 @@ public class StreamingWriteTest {
         SnowflakeIO.DataSourceConfiguration.create()
             .withKeyPairPathAuth(
                 options.getUsername(),
-                TestUtils.getValidPrivateKeyPath(getClass()),
+                TestUtils.getValidEncryptedPrivateKeyPath(getClass()),
                 TestUtils.getPrivateKeyPassphrase())
             .withServerName(options.getServerName())
             .withSchema("PUBLIC")
@@ -251,7 +251,7 @@ public class StreamingWriteTest {
         SnowflakeIO.DataSourceConfiguration.create()
             .withKeyPairPathAuth(
                 options.getUsername(),
-                TestUtils.getValidPrivateKeyPath(getClass()),
+                TestUtils.getValidEncryptedPrivateKeyPath(getClass()),
                 TestUtils.getPrivateKeyPassphrase())
             .withServerName(options.getServerName())
             .withSchema("PUBLIC")
diff --git a/sdks/java/io/snowflake/src/test/resources/valid_test_rsa_key.p8 b/sdks/java/io/snowflake/src/test/resources/valid_encrypted_test_rsa_key.p8
similarity index 100%
rename from sdks/java/io/snowflake/src/test/resources/valid_test_rsa_key.p8
rename to sdks/java/io/snowflake/src/test/resources/valid_encrypted_test_rsa_key.p8
diff --git a/sdks/java/io/snowflake/src/test/resources/valid_unencrypted_test_rsa_key.p8 b/sdks/java/io/snowflake/src/test/resources/valid_unencrypted_test_rsa_key.p8
new file mode 100644
index 00000000000..d6035b56cfb
--- /dev/null
+++ b/sdks/java/io/snowflake/src/test/resources/valid_unencrypted_test_rsa_key.p8
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCagQWoNCBerdfV
+Ua+VX7RaFMWOzBW66NU5YW3LTpsRKFEApDxrtgEzaKZMu6zw2Roqt/wXG5ce7wCk
+YlYD7aL/geOZ8WCEvLfZyXvrWP2yanu1WMmtSIvpZVf2Xqx7NgO5f6Je8/wiyQPR
+W8l8uHtX8YeYoPBAdpIu5KdNSezzJQ4jss5u0d+AW1LJrqQxdApFdX6K8FHPpBOr
+z/IXsqDs1CK0la59Y0N43XpxHyx0xOlYp/4z8AVoud83AnuwHPlH8mlWmbpISYwL
+yPq8H1XzTTP7UtP+fiQZwynS5bmdC2+wvaBlFydIHB8LQstpspO0x+xGt/f0D01k
+m2GHzlmJAgMBAAECggEAHUjzT4yBzjBZRp+zDjcTsjdZstVQERtsvvGOpAiEL95a
+bCq1IR1gYOQnEUhJYWVlJk+1QpQCMuTOVFonP5tdV9zAzT8JMl+q4WblTBiUDatN
+Q3RO1b94dYa2A4ayXrBASHq+xgys79L2HEqviCpXhrXLI8ztfOp8jtLcOzQV64gB
+HeE3JoY5pPb+VyfVkFcN24r7HgJEMYxFRnvGjEhLOE2QaPnKKc9RbSw/2V0Z+lNO
+UIVZDA4wdAsklXQL0RvlhRepP9G9tmK2ZBvQaCPDxXVfA8+BdhDIA7fSvvgM8qiK
+N/YWMecZOgBj9NA+eZFCKtKIiPLxUt9fPTpJw1c/mQKBgQDJcb23s1YE8fLaS6aI
+T0YsJifaHRkoXu+pi02ukqzFhbMJk0xwGgv/TsTfuQT+SrREV/YICn8XbNWz1xTz
+VDeP7g2zL2cOKBXzs9yXm1JU49gpmfOwx9ZnXkWFaFxTtPHHLj9YMUhCNim2hmdF
+NmmKKm1mKx+UoJ8qSb8oXpiGKwKBgQDEWOFTFGb0AQUcPsJ6bqJ3j8SKUXRcLrTZ
+2exgc/ENeC1TMaUtAK13l+fIOxy1ST1vN8Su3lFjUxLwIiFbksMbGbwwICEfnR9C
+TmXiiAvcuQra9Mt4qmNbM8QSp+SH7/u3a4dt1XoBXRf/zW/WnnSb+8TWFG8qSP7p
+Q8x1ONoZGwKBgCa/aByztDITUAFJV+kURo0rcbL0seggCr1z/Cc+M1lzFDMdUDCn
+8fLT9e6Kqjk3qWEUAlEdo8CfwKNtFayQYHeg9KWo0ovYjhBoYmAPElEd2nB2GnfA
+mK8dZQX6Qvge7/q+HJUcNaf2vyNKywZ3Junaq1xhoxt1oegCc3CScYgtAoGAHp5t
+pPMNiweNsHXpBgayt0poL9m795QhB8gAIJp0ANG437xiMb5lTr0VnBf7xhmBxxI/
+TbXFo2OVFHV+FjWwz00YF7kq8S8jbMelA8qLbwQ8OSDrigmoLNTJws8DyF8YNeZC
+3FHqEMRftbhntGRJcvm0HF5bZ5Cyj9XO0IjpGfkCgYBlLqG0kx1N9/wQRuWEPn2+
+NHlhROYFeyMQBZknU3o5/gG/Z2sFQ9g8tJdXYOrypXcTVQHTn9PKb4ux6ouScKd/
+A3tuhxPCswUu70bK0HI+FHHoQlINK4JJ9PIcvJSWsOw86kBmyjThdjesanoZYhNL
+Kum8E8X8hFnpAF34O+id/Q==
+-----END PRIVATE KEY-----
diff --git a/sdks/python/apache_beam/io/snowflake.py b/sdks/python/apache_beam/io/snowflake.py
index 39522e88f65..d789a72001d 100644
--- a/sdks/python/apache_beam/io/snowflake.py
+++ b/sdks/python/apache_beam/io/snowflake.py
@@ -197,7 +197,6 @@ class ReadFromSnowflake(beam.PTransform):
         password=password,
         private_key_path=private_key_path,
         raw_private_key=raw_private_key,
-        private_key_passphrase=private_key_passphrase,
         o_auth_token=o_auth_token,
     )
 
@@ -401,7 +400,6 @@ class WriteToSnowflake(beam.PTransform):
         password=password,
         private_key_path=private_key_path,
         raw_private_key=raw_private_key,
-        private_key_passphrase=private_key_passphrase,
         o_auth_token=o_auth_token,
     )
     WriteDisposition.VerifyParam(write_disposition)
@@ -483,13 +481,7 @@ class WriteDisposition:
 
 
 def verify_credentials(
-    username,
-    password,
-    private_key_path,
-    raw_private_key,
-    private_key_passphrase,
-    o_auth_token):
+    username, password, private_key_path, raw_private_key, o_auth_token):
   if not (o_auth_token or (username and password) or
-          (username and
-           (private_key_path or raw_private_key) and private_key_passphrase)):
+          (username and (private_key_path or raw_private_key))):
     raise RuntimeError('Snowflake credentials are not set correctly.')
diff --git a/website/www/site/content/en/documentation/io/built-in/snowflake.md b/website/www/site/content/en/documentation/io/built-in/snowflake.md
index b28c4bf861d..5e7ff9a85d6 100644
--- a/website/www/site/content/en/documentation/io/built-in/snowflake.md
+++ b/website/www/site/content/en/documentation/io/built-in/snowflake.md
@@ -112,8 +112,6 @@ To use key pair authentication with SnowflakeIO, invoke your pipeline with one o
 
   {{< /highlight >}}
 
-**Important notice**: Only encrypted private key are supported. Unencrypted (without pasphrase) private key are not supported. For details, see: [BEAM-13818](https://issues.apache.org/jira/browse/BEAM-13818).
-
 ### OAuth token
 SnowflakeIO also supports OAuth token.
 
@@ -956,6 +954,4 @@ SnowflakeIO currently has the following limitations.
 
 1. Streaming writing supports only pair key authentication. For details, see: [BEAM-13817](https://issues.apache.org/jira/browse/BEAM-13817).
 
-1. Only encrypted private key are supported. Unencrypted private key are not supported. For details, see: [BEAM-13818](https://issues.apache.org/jira/browse/BEAM-13818).
-
 1. The role parameter configured in `SnowflakeIO.DataSourceConfiguration` object is ignored for streaming writing. For details, see: [BEAM-13819](https://issues.apache.org/jira/browse/BEAM-13819)
\ No newline at end of file
-- 
GitLab