Monday , December 23 2024

How To Transfer Data From MSSQL To Elasticsearch Using Logstash

In this article we will transfer the following data set from the AdventureWorks database, which is the sample database of SQL Server, to logstash with elasticsearch. If you want to move CSV or TXT File to Elasticsearch you can read the below article.

How To Move CSV or TXT To Elasticsearch

Source Query in AdventureWorks2014 Database

Create Elasticsearch Index With Mapping

If we transfer data from MS SQL without mapping in elasticsearch, all data is transferred as text. To transfer data with the correct data types, we need to mapping in elasticsearch.

According to the above data types, we create an index in elasticsearch as follows. You should note the replica and shard numbers. We have not specified in our example. You may want to read the following article for more information.

How To Create an Elasticsearch Indice using Kibana

We can see the mapping of the index we created with the command below.

In order to transfer the data from a source with logstash to Elasticsearch, we need to prepare a conf file on the server where logstash is installed.

Download SQL JDBC Driver

We should download SQL JDBC Driver before configuring the .conf file.

https://www.microsoft.com/en-us/download/details.aspx?id=54671

After downloading the file, we copy it to the server where logstash is installed. I copied it to /home/elasticsearch using xshell’s file transfer.

Then go to the /home/elasticsearch directory using the following command.

Extract the file.

Configure the .conf File on the Server Where Logstash is Installed

Create the conf file in the /etc/logstash/conf.d/ directory as follows, and save it as mssql_dataset.conf. This directory may change in later versions. You should find the “conf.d” directory and create the .conf file in this directory.

Explanation of mssql_dataset.conf

jdbc_driver_library

Find sqljdbc42.jar in the file we extracted above and write this directory

jdbc_connection_string

write your ip, port, username that will connect to SQL Server, and user password

jdbc_user

username that will connect to SQL Server

jdbc_password

password of the user

statement

sql query

hosts

write logstash ip

user => elasticsearch_authorized_user

password => elasticsearch_authorized_user_password

We can Secure Elasticsearch using Search Guard. If you installed Search Guard, write the elasticsearch user and password in here.

document_id

This column must be unique for constantly transferring changed data to elasticsearch. If you do not define this parameter, logstash can not transfer changed data.

Content of mssql_dataset.conf

We paste the above commands into the conf file.

Start Logstash and Transferring Data

Then I start the transfer with the help of the following command.

Because we schedule logstash to run every minute, it will check the records as follows.

If you close the session, logstash will stop working. You can run the above script to start the logstash again. Also you can run this script with nohup or you can use screen method.

nohup:

When I run the above command and check the nohup.out, I saw the below error.

Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the “path.data” setting.

Close the all sessions and run the command with nohup or find the logstash sessions and kill.

Then you can run the command with nohup without error.

How To See Tranferred Data on Kibana

Let’s see our data through the kibana by creating an index pattern.

We are connecting to Kibana to create an index pattern and click on discover. You can install Kibana by using the following article.

How To Install Kibana On Centos

Go to the Management tab and click “Create index pattern”.

Write the name of your index and click next in the index pattern section.

On the next screen, we select a date field from the “Time filter field name” section. Data can be filtered according to the date field we selected here. Timestamp means the date on which data is transferred to elasticsearch.

Finally, we click on Create Index Pattern.

After creating the Index Pattern, we can see our data by selecting the date range as below.

Or we can put filters as below.

Loading

About dbtut

We are a team with over 10 years of database management and BI experience. Our Expertises: Oracle, SQL Server, PostgreSQL, MySQL, MongoDB, Elasticsearch, Kibana, Grafana.

3 comments

  1. Hi, Could you elaborate on ” doc ” i see it in the mapping, but i am missing something because i cannot use it, or figure out what it means.

  2. Sendrayanto Efendi

    Please help, i got this error instead

    {
    “error” : {
    “root_cause” : [
    {
    “type” : “mapper_parsing_exception”,
    “reason” : “Root mapping definition has unsupported parameters: [type_name : {properties={@timestamp={type=date}, SellStartDate={format=YYYY-MM-dd, type=date}, Color={type=text, fields={keyword={ignore_above=15, type=keyword}}}, @version={type=text, fields={keyword={ignore_above=256, type=keyword}}}, SafetyStockLevel={type=short}, ProductID={type=integer}, ListPrice={type=float}, Weight={type=float}, Name={type=text, fields={keyword={ignore_above=25, type=keyword}}}}}]”
    }
    ],
    “type” : “mapper_parsing_exception”,
    “reason” : “Failed to parse mapping [_doc]: Root mapping definition has unsupported parameters: [type_name : {properties={@timestamp={type=date}, SellStartDate={format=YYYY-MM-dd, type=date}, Color={type=text, fields={keyword={ignore_above=15, type=keyword}}}, @version={type=text, fields={keyword={ignore_above=256, type=keyword}}}, SafetyStockLevel={type=short}, ProductID={type=integer}, ListPrice={type=float}, Weight={type=float}, Name={type=text, fields={keyword={ignore_above=25, type=keyword}}}}}]”,
    “caused_by” : {
    “type” : “mapper_parsing_exception”,
    “reason” : “Root mapping definition has unsupported parameters: [type_name : {properties={@timestamp={type=date}, SellStartDate={format=YYYY-MM-dd, type=date}, Color={type=text, fields={keyword={ignore_above=15, type=keyword}}}, @version={type=text, fields={keyword={ignore_above=256, type=keyword}}}, SafetyStockLevel={type=short}, ProductID={type=integer}, ListPrice={type=float}, Weight={type=float}, Name={type=text, fields={keyword={ignore_above=25, type=keyword}}}}}]”
    }
    },
    “status” : 400
    }

Leave a Reply

Your email address will not be published. Required fields are marked *