Ingest data from Snowflake to Elasticsearch

To take advantage of the powerful search capabilities offered by Elasticsearch®, many businesses keep a copy of searchable data in Elasticsearch. Elasticsearch is proven technology for traditional text search, as well as vector search for use in semantic search use cases. The Elasticsearch Relevance EngineTM (ESRE) enables you to add semantic search on proprietary data that can be integrated with generative AI technologies to build modern search experiences.

Snowflake is a fully managed SaaS (software as a service) that provides a single platform for data warehousing, data lakes, data engineering, data science, data application development, and secure sharing and consumption of real-time/shared data.

In this blog, we will see how to bring your snowflake data to Elasticsearch using below methods:

  1. Using Logstash® (periodic sync)

  2. Using Snowflake Elasticsearch Python Script (one time sync)

Prerequisites

Snowflake credentials

You will have received all below credentials after signup, or you can get them from the Snowflake panel.

  • Account username

  • Account password

  • Account Identifier

Elastic® credentials

  1. Visit https://cloud.elastic.co and sign up.

  2. Click on Create deployment. In the pop-up, you can change the settings or keep the default settings.

  3. Download or copy the deployment credentials (both username and password).

  4. Also copy the Cloud ID.

  5. Once you’re ready for deployment, click on Continue (or click on Open Kibana). It will redirect you to the Kibana® dashboard.

Using Logstash

Logstash is a free and open ETL tool where you can provide multiple sources as an input, transform (modify) it, and push to your favorite stash. One of the famous use cases of Logstash is reading logs from the file and pushing to Elasticsearch. We can also modify the data on the fly using a filter plugin, and it will push updated data to the output. 

We’re going to use the JDBC input plugin to pull the data from Snowflake and push to Elasticsearch using the Elasticsearch output plugin.

  1. Install Logstash by referring to the documentation.

  2. Go to the Maven Central Repository and download: https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc.

  3. Click on the directory for the version that you need and download the snowflake-jdbc-#.#.#.jar file. In my case, I have downloaded snowflake-jdbc-3.9.2.jar. (Refer to official documentation to learn more about the Snowflake JDBC Driver.)

  4. Create a pipeline by creating file sf-es.conf. Add the below snippet and replace all credentials.

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash_external_configs/driver/snowflake-jdbc-3.9.2.jar"
    jdbc_driver_class => "net.snowflake.client.jdbc.SnowflakeDriver"
    jdbc_connection_string => "jdbc:snowflake://<account_identifier>.snowflakecomputing.com/?db=SNOWFLAKE_SAMPLE_DATA&warehouse=COMPUTE_WH&schema=TPCH_SF1"
    jdbc_user => "<snowflake_username>"
    jdbc_password => "<snowflake_password>"
    schedule => "* * * * *"
    statement => "select * from customer limit 10;"
  }
}

filter {}

output {
  elasticsearch {
    cloud_id => "<elastic cloud_id>"
    cloud_auth => "<elastic_username>:<elastic_password>"
    index => "sf_customer"
  }
}

jdbc_connection_string :

db=SNOWFLAKE_SAMPLE_DATA
warehouse=COMPUTE_WH
schema=TPCH_SF1

Schedule: Here you can schedule to run this flow periodically using cron syntax. On every run, your data will be moved incrementally. You can check more on scheduling.

Please change according to your requirements.

JDBC Paging (Optional): This will cause a sql statement to be broken up into multiple queries. Each query will use limits and offsets to collectively retrieve the full result-set. You can use this to move all data in a single run.

Enable JDBC paging by adding below configurations:

jdbc_paging_enabled => true,
jdbc_paging_mode => "explicit",
jdbc_page_size => 100000
  1. Run Logstash
bin/logstash -f sf-es.conf

Snowflake-Elasticsearch Python script

If Logstash is not currently in place or has not been implemented, I have written a small Python utility, which is available here on GitHub, to pull data from Snowflake and push it to Elasticsearch. This will pull all your data at one time. So if you have a small amount of data to be migrated in a non-periodic manner, you can use this utility. 

Note: This is not a part of the official Elastic connectors. Elastic connectors provide support for various data sources. You can use this connector if you have a requirement to sync data from any supported data sources.

  1. Installation
git clone https://github.com/ashishtiwari1993/snowflake-elasticsearch-connector.git
cd snowflake-elasticsearch-connector
  1. Installing dependencies
pip install -r requirements.txt
  1. Change configs
  • Open config/connector.yml.

  • Replace credentials with the following:

snowflake:
  username: <sf_username>
  password: <sf_password>
  account: <sf_account_identifier>
  database: <db_name>
  table: <table_name>
  columns: ""
  warehouse: ""
  scheme: ""
  limit: 50

elasticsearch:
  host: https://localhost:9200
  username: elastic
  password: elastic@123
  ca_cert: /path/to/elasticsearch/config/certs/http_ca.crt
  index: <sf_customer>
  1. Run connector
python __main__.py

Snowflake to Elasticsearch python script

Verify data

  1. Log in to Kibana and go to ☰ > Management > Dev Tools.

  2. Copy and paste the following API GET request into the Console pane, and then click the ▶ (play) button. This queries all records in the new index.

GET sf_customer/_search
{
  "query": {
    "match_all": {}
  }
}

Output snowflake to elasticsearch

Conclusion

We have successfully migrated the data from Snowflake to Elastic Cloud. You can achieve the same on any Elasticsearch instance, whether it is in the cloud or on prem.

Start leveraging full text and semantic search capabilities on your data set. You can also connect your data with LLMs to build Question - Answer capabilities.

Recommended Articles