GitPedia

Kafka sink azure kusto

Kafka sink for Kusto

From Azure·Updated May 26, 2026·View on GitHub·

This repository contains the source code of the Kafka Connect Kusto sink connector. **"Kusto"** is the Microsoft internal project code name for Azure Data Explorer, Microsoft Azure's big data analytical database PaaS offering. The project is written primarily in Java, distributed under the MIT License license, first published in 2018. Key topics include: azure-data-explorer, kafka-connector, kusto.

Latest release: v5.3.1
May 20, 2026View Changelog →

Azure Data Explorer Kafka Connect Kusto Sink Connector

This repository contains the source code of the Kafka Connect Kusto sink connector.<br>
"Kusto" is the Microsoft internal project code name for Azure Data Explorer, Microsoft Azure's big data analytical
database PaaS offering.

Breaking changes are documented in the 16.1 Breaking changes from version 5 section below.<br>

Topics covered

<!-- TOC --> <!-- TOC --> <hr>

1. Overview

Azure Data Explorer is a first party Microsoft big data analytical database PaaS, purpose built for low latency
analytics of all manners of logs, all manners of telemetry and time series data.<br>

Kafka ingestion to Azure Data Explorer leverages Kafka Connect. Kafka Connect is an open source Apache Kafka ETL service
for code-free, configuration based, scalable, fault tolerant integration with Kafka from/to any system through
development of connector plugins (integration code). The Azure Data Explorer team has developed a sink connector, that
sinks from Kafka to Azure Data Explorer. <br>

Our connector is gold certified by Confluent - has gone through comprehensive review and testing for quality,
feature completeness, compliance with standards and for performance.<br>

The connector is open source and we welcome community contribution.

This connector sends metrics, logs and trace data to
Azure Data Explorer,
Azure Synapse Data Explorer and
Real time analytics in Fabric

<hr>

2. Integration design

Integration mode to Azure Data Explorer is queued or streaming ingestion leveraging the Azure Data Explorer Java SDK.
Events are dequeued from Kafka and per the flush* sink properties, batched or streamed, and shipped to Azure Data
Explorer as gzipped files. In case of batch ingestions, Azure Data Explorer has a configurable
table ingest batching policy,
based on which ingestion into tables occurs automatically. In case of streaming ingestion, configure 'streaming' as '
true' in 'kusto.tables.topics.mapping', by default streaming is set as false.

<hr>

3. Features supported

3.1. Validation of required properties on start-up and fail-fast

  • The connector fails fast if any of the "required" sink properties are unavailable
  • The connector checks for access to the cluster and database table and shuts down if inaccessible
  • The connector checks for any dependent properties that are selectively required and shuts down if unavailable (e.g. a
    converter specified that relies on schema registry but missing a schema registry URL)

3.2. Configurable behavior on errors

  • The connector supports configurable
  • shut down in the event of an error
    <br>OR
  • log the error and process as many events as possible

3.3. Configurable retries

  • The connector supports retries for transient errors with the ability to provide relevant parameters
  • and retries with exponential backoff

3.4. Serialization formats

  • The connector supports Avro, JSON, CSV formats
  • It supports Parquet and ORC file formats with the ByteArrayConverter specified below

3.5. Schema registry

  • The connector supports schema registry for avro and json

3.6. Schema evolution

  • The connector does not support schema evolution currently

3.7. Kafka Connect converters

  • The connector supports the following converters:
#ConverterDetails
1org.apache.kafka.connect.storage.StringConverterUse with csv/json
2org.apache.kafka.connect.json.JsonConverterUse with schemaless json
3io.confluent.connect.avro.AvroConverterUse with avro
4io.confluent.connect.json.JsonSchemaConverterUse with json with schema registry
5org.apache.kafka.connect.converters.ByteArrayConverterUse with ORC, Parquet files written as messages to Kafka
6io.confluent.connect.protobuf.ProtobufConverterUse with protobuf format with schema registry

3.8. Kafka Connect transformers

  • The connector does not support transformers. Prefer transformation on the server side in Kafka or ingestion time in
    Azure Data Explorer
    with update policies.

3.9. Topics to tables mapping

  • The connector supports multiple topics to multiple tables configuration per Kafka Connect worker.
  • It also supports wildcard mapping, where a topic name of * in the mapping will be used for any topic not explicitly listed.

3.10. Kafka Connect Dead Letter Queue

  • The connector supports user provided "Dead Letter Queue", a Kafka Connect construct; E.g. If Avro messages are written
    to a "dead letter queue" topic that is expecting Json, the avro messages are written to a configurable dead letter
    queue instead of just being dropped. This helps prevent data loss and also data integrity checks for messages that did
    not make it to the destination. Note that for a secure cluster, in addition to bootstrap server list and topic name,
    the security mechanism, the security protocol, jaas config have to be provided for the Kafka Connect worker and in the
    sink properties

3.11. Miscellaneous Dead Letter Queue

  • The connector supports user provided miscellaneous "Dead Letter Queue" for transient and non-deserialization errors (
    those not managed by Kafka Connect); E.g. If network connectivity is lost to Azure Data Explorer, the connector
    retries and eventually writes the queued up messages to the miscellaneous "Dead Letter Queue". Note that for a secure
    cluster, in addition to bootstrap server list and topic name, the security mechanism, the security protocol, jaas
    config have to be provided for the Kafka Connect worker and in the sink properties

3.12. Delivery semantics

  • Azure Data Explorer is an append only immutable database. Infrastructure failures and unavoidable external variables
    that can lead to duplicates can't be remediated via upsert commands as upserts are not supported. <br>

Therefore, the connector supports "At least once" delivery guarantees.

3.13. Overrides

  • The connector supports overrides at the sink level if overrides are specified at a Kafka Connect worker level. This
    is a Kafka Connect feature, not specific to the Kusto connector plugin.

3.14. Parallelism

  • As with all Kafka Connect connectors, parallelism comes through the setting of connector tasks count, a sink property

3.15. Authentication & Authorization to Azure Data Explorer

  • Azure Data Explorer supports Azure Active Directory authentication. For the Kusto Kafka connector, we need an Azure
    Active Directory Service Principal created and "admin" permissions granted to the Azure Data Explorer database.
  • The Service Principal can either be
    • an Enterprise Application, authenticated using the OAuth2 endpoint of Active Directory, using the Tenant ID,
      Application ID and Application Secret
    • a Managed Identity, using the private Instance MetaData Service accessible from within Azure VMs
  • Kafka Connect supports all security protocols supported by Kafka, as does our connector
  • See below for some security related config that needs to be applied at Kafka Connect worker level as well as in the
    sink properties

3.17. JMX Metrics

The connector exposes JMX metrics for monitoring ingestion performance and tracking failures. Metrics are registered
automatically when the sink task starts and unregistered when the task stops.

MBean name: com.microsoft.azure.kusto.kafka.connect.sink:type=KustoSinkMetrics

MetricTypeDescription
RecordsWrittenCounterTotal number of records successfully written to staging files
RecordsFailedCounterTotal number of records that failed during write
IngestionAttemptsCounterTotal number of file ingestion attempts to Azure Data Explorer
IngestionSuccessesCounterTotal number of successful file ingestions
IngestionFailuresCounterTotal number of failed file ingestions (after retries exhaust)
DlqRecordsSentCounterTotal number of records sent to the dead letter queue

How to monitor:

These metrics are accessible via any JMX-compatible monitoring tool (JConsole, VisualVM, Prometheus JMX Exporter, Datadog, etc.). To enable JMX remote access on your Kafka Connect workers, add the following JVM options:

KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

For Prometheus-based monitoring, use the JMX Exporter agent with a
configuration that scrapes the com.microsoft.azure.kusto.kafka.connect.sink domain.

<hr>

4. Connect worker properties

  • There are some core configs that need to be set at the Kafka connect worker level. Some of these are security configs
    and the (consumer) override policy. These for e.g. need to be baked into
    the Docker image covered further on in this document-

4.1. Confluent Cloud

The below covers Confluent Cloud-<br>
Link to end to end sample

ENV CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY=All

ENV CONNECT_SASL_MECHANISM=PLAIN
ENV CONNECT_SECURITY_PROTOCOL=SASL_SSL
ENV CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https
ENV CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<yourConfluentCloudAPIKey>\" password=\"<yourConfluentCloudAPISecret>\";"

4.2. HDInsight Kafka with Enterprise Security Package

The below covers HDInsight Kafka with Enterprise Security Package (Kerberos)-<br>
You will need the HDI privileged user keytab to consume from Kafka, the krb5.conf and jaas conf as well<br>
Link to end to end sample<br>
<br>

1. Keytab:<br>
Generate the keytab for the privileged user authorized to consume from Kafka in your HDI cluster as follows, lets call
the keytab, kafka-client-hdi.keytab-<br>

ktutil
addent -password -p <UPN>@<REALM> -k 1 -e RC4-HMAC
wkt kafka-client-hdi.keytab

scp this to your local machine to copy onto the Kafka Connect worker Docker image.

<br>

2. krb5.conf:<br>

The following is sample krb5.conf content; Create this on your local machine, modify to reflect your krb5 details; we
will use this to copy onto the Kafka Connect worker Docker image-<br>

[libdefaults]
        default_realm = <yourKerberosRealm>


[realms]
    <yourKerberosRealmName> = {
                admin_server = <yourKerberosRealmNameInLowerCase>
                kdc = <yourKerberosRealmNameInLowerCase>
                default_domain = <yourDomainInLowerCase>
        }

[domain_realm]
    <yourKerberosRealmNameInLowerCase> = <yourKerberosRealm>
    .<yourKerberosRealmNameInLowerCase> = <yourKerberosRealm>


[login]
        krb4_convert = true
        krb4_get_tickets = false

E.g.
Review
this sample
.
<br>

3. jaas.conf:<br>

The following is sample jaas.conf content-<br>

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka-client-hdi.keytab"
    principal="<yourUserPrincipal>@<yourKerberosRealm>";
};
<br>

4. Configs to add to the Docker image:<br>
This is covered in detail further on. It is specified here for the purpose of completeness of defining what goes onto
the worker config.<br>

COPY krb5.conf /etc/krb5.conf
COPY hdi-esp-jaas.conf /etc/hdi-esp-jaas.conf 
COPY kafka-client-hdi.keytab /etc/security/keytabs/kafka-client-hdi.keytab

ENV KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf"

ENV CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY=All

ENV CONNECT_SASL_MECHANISM=GSSAPI
ENV CONNECT_SASL_KERBEROS_SERVICE_NAME=kafka
ENV CONNECT_SECURITY_PROTOCOL=SASL_PLAINTEXT
ENV CONNECT_SASL_JAAS_CONFIG="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/security/keytabs/kafka-client-hdi.keytab\" principal=\"<yourKerberosUPN>@<yourKerberosRealm>\";"
<hr>

5. Sink properties

The following is complete set of connector sink properties-

#PropertyPurposeDetails
1connector.classClassname of the Kusto sinkHard code to com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector<br>Required
2topicsKafka topic specificationList of topics separated by commas<br>Required
3kusto.ingestion.urlKusto ingestion endpoint URLProvide the ingest URL of your ADX cluster<br>Use the following construct for the private URL - https://ingest-private-[cluster].kusto.windows.net<br>Required
4kusto.query.urlKusto query endpoint URLProvide the engine URL of your ADX cluster<br>Optional
5aad.auth.strategyCredentials for KustoStrategy to authenticate against Azure Active Directory, either application (default) or managed_identity or workload_identity.<br>Optional, application by default
6aad.auth.authorityCredentials for KustoProvide the tenant ID of your Azure Active Directory<br>Required when authentication is done with an application or when kusto.validation.table.enable is set to true
7aad.auth.appidCredentials for KustoProvide Azure Active Directory Service Principal Name<br>Required when authentication is done with an application or when kusto.validation.table.enable is set to true
8aad.auth.appkeyCredentials for KustoProvide Azure Active Directory Service Principal secret<br>Required when authentication is done with an application
9kusto.tables.topics.mappingMapping of topics to tablesProvide 1..many topic-table comma-separated mappings as follows-<br>[{'topic': '<topicName1>','db': '<databaseName>', 'table': '<tableName1>','format': '<format-e.g.avro/csv/json>', 'mapping':'<tableMappingName1>','streaming':'false'},{'topic': '<topicName2>','db': '<databaseName>', 'table': '<tableName2>','format': '<format-e.g.avro/csv/json>', 'mapping':'<tableMappingName2>','streaming':'false'}]<br>Required <br> Note : The attribute mapping (Ex:'mapping':''tableMappingName1') is an optional attribute. During ingestion, Azure Data Explorer automatically maps column according to the ingestion format. The topic field also supports * as a wildcard for any topic not explicitly mapped.
10key.converterDeserializationOne of the below supported-<br>org.apache.kafka.connect.storage.StringConverter<br> org.apache.kafka.connect.json.JsonConverter<br>io.confluent.connect.avro.AvroConverter<br>io.confluent.connect.json.JsonSchemaConverter<br> org.apache.kafka.connect.converters.ByteArrayConverter<br><br>Required
11value.converterDeserializationOne of the below supported-<br>org.apache.kafka.connect.storage.StringConverter<br> org.apache.kafka.connect.json.JsonConverter<br>io.confluent.connect.avro.AvroConverter<br>io.confluent.connect.json.JsonSchemaConverter<br> org.apache.kafka.connect.converters.ByteArrayConverter<br><br>Required
12value.converter.schema.registry.urlSchema validationURI of the Kafka schema registry<br>Optional
13value.converter.schemas.enableSchema validationSet to true if you have embedded schema with payload but are not leveraging the schema registry<br>Applicable for avro and json<br><br>Optional
14tasks.maxconnector parallelismSpecify the number of connector copy/sink tasks<br>Required
15flush.size.bytesPerformance knob for batchingMaximum bufer byte size per topic+partition combination that in combination with flush.interval.ms (whichever is reached first) should result in sinking to Kusto<br>Default - 1 MB<br>Required
16flush.interval.msPerformance knob for batchingMinimum time interval per topic+partition combo that in combination with flush.size.bytes (whichever is reached first) should result in sinking to Kusto<br>Default - 30 seconds<br>Required
17tempdir.pathLocal directory path on Kafka Connect worker to buffer files to before shipping to KustoDefault is value returned by System.getProperty("java.io.tmpdir") with a GUID attached to it<br><br>Optional
18behavior.on.errorConfigurable behavior in response to errors encounteredPossible values - log, ignore, fail<br><br>log - log the error, send record to dead letter queue, and continue processing<br>ignore - log the error, send record to dead letter queue, proceed with processing despite errors encountered<br>fail - shut down connector task upon encountering<br><br>Default - fail<br>Optional
19errors.retry.max.time.msConfigurable retries for transient errorsPeriod of time in milliseconds to retry for transient errors<br><br>Default - 300 ms<br>Optional
20errors.retry.backoff.time.msConfigurable retries for transient errorsPeriod of time in milliseconds to backoff before retry for transient errors<br><br>Default - 10 ms<br>Optional
21errors.deadletterqueue.bootstrap.serversChannel to write records that failed deserializationCSV or kafkaBroker:port <br>Optional
22errors.deadletterqueue.topic.nameChannel to write records that failed deserializationPre-created topic name <br>Optional
23errors.deadletterqueue.security.protocolChannel to write records that failed deserializationSecuritry protocol of secure Kafka cluster <br>Optional but when feature is used with secure cluster, is required
24errors.deadletterqueue.sasl.mechanismChannel to write records that failed deserializationSASL mechanism of secure Kafka cluster<br>Optional but when feature is used with secure cluster, is required
25errors.deadletterqueue.sasl.jaas.configChannel to write records that failed deserializationJAAS config of secure Kafka cluster<br>Optional but when feature is used with secure cluster, is required
26misc.deadletterqueue.bootstrap.serversChannel to write records that due to reasons other than deserializationCSV of kafkaBroker:port <br>Optional
27misc.deadletterqueue.topic.nameChannel to write records that due to reasons other than deserializationPre-created topic name <br>Optional
28misc.deadletterqueue.security.protocolChannel to write records that due to reasons other than deserializationSecuritry protocol of secure Kafka cluster <br>Optional but when feature is used with secure cluster, is required
29misc.deadletterqueue.sasl.mechanismChannel to write records that due to reasons other than deserializationSASL mechanism of secure Kafka cluster<br>Optional but when feature is used with secure cluster, is required
30misc.deadletterqueue.sasl.jaas.configChannel to write records that due to reasons other than deserializationJAAS config of secure Kafka cluster<br>Optional but when feature is used with secure cluster, is required
31consumer.override.bootstrap.serversSecurity details explicitly required for secure Kafka clustersBootstrap server:port CSV of secure Kafka cluster <br>Required for secure Kafka clusters
32consumer.override.security.protocolSecurity details explicitly required for secure Kafka clustersSecurity protocol of secure Kafka cluster <br>Required for secure Kafka clusters
33consumer.override.sasl.mechanismSecurity details explicitly required for secure Kafka clustersSASL mechanism of secure Kafka cluster<br>Required for secure Kafka clusters
34consumer.override.sasl.jaas.configSecurity details explicitly required for secure Kafka clustersJAAS config of secure Kafka cluster<br>Required for secure Kafka clusters
35consumer.override.sasl.kerberos.service.nameSecurity details explicitly required for secure Kafka clusters, specifically kerberized KafkaKerberos service name of kerberized Kafka cluster<br>Required for kerberized Kafka clusters
36consumer.override.auto.offset.resetConfigurable consuming from offsetPossible values are - earliest or latest<br>Optional
37consumer.override.max.poll.interval.msConfig to prevent duplicationSet to a value to avoid consumer leaving the group while the Connector is retrying <br>Optional
38kusto.validation.table.enableValidation config to verify the target table exists & the role of user has ingestion privilegesIf true , validates existence of table & the princpal has ingestor role. Defaults to false, has to be explicitly set to true to enable this check<br>Optional
38proxy.hostHost details of proxy serverHost details of proxy server configuration <br>Optional
38proxy.portPort details of proxy serverPort details of proxy server configuration <br>Optional
<hr>

6. Streaming ingestion

Kusto supports Streaming ingestion in order to
achieve sub-second latency.

This connector supports this
using Managed streaming client
.

Usage: configure per topic-table that streaming should be used. For example:

kusto.tables.topics.mapping=[{'topic': 't1','db': 'db', 'table': 't1','format': 'json', 'mapping':'map', 'streaming': true}].

Requirements: Streaming enabled on the
cluster. Streaming policy
configured on the table or database.

Additional configurations: flush.size.bytes and flush.interval.ms are still used to batch
records together before ingestion - flush.size.bytes should not be over 4MB, flush.interval.ms
is suggested to be low (hundreds of milliseconds).
We still recommend configuring ingestion batching policy at the table or database level, as the client falls back to
queued ingestion in case of failure and retry-exhaustion.

7. Roadmap

The following is the roadmap-<br>

#Roadmap item
1Schema evolution support
<hr>

8. Deployment overview

Kafka Connect connectors can be deployed in standalone mode (just for development) or in distributed mode (production)
.<br>

8.1. Standalone Kafka Connect deployment mode

This involves having the connector plugin jar in /usr/share/java of a Kafka Connect worker, reference to the same plugin
path in connect-standalone.properties, and launching of the connector from command line. This is not scalable, not fault
tolerant, and is not recommended for production.

8.2. Distributed Kafka Connect deployment mode

Distributed Kafka Connect essentially involves creation of a KafkaConnect worker cluster as shown in the diagram
below.<br>

  • Azure Kubernetes Service is a great infrastructure for the connect cluster, due to its managed and scalable nature
  • Kubernetes is a great platform for the connect cluster, due to its scalable nature and self-healing
  • Each orange polygon is a Kafka Connect worker and each green polygon is a sink connector instance
  • A Kafka Connect worker can have 1..many task instances which helps with scale
  • When a Kafka Connect worker is maxed out from a resource perspective (CPU, RAM), you can scale horizontally, add more
    Kafka Connect workers, ands tasks within them
  • Kafka Connect service manages rebalancing of tasks to Kafka topic partitions automatically without pausing the
    connector tasks in recent versions of Kafka
  • A Docker image needs to be created to deploy the Kusto sink connector in a distributed mode. This is detailed below.

CONNECTOR
<br>
<br>

<hr> <br>

9. Connector download/build from source

Multiple options are available-

9.1. Download a ready-to-use uber jar from our Github repo releases listing

https://github.com/Azure/kafka-sink-azure-kusto/releases

9.2. Download the connector from Confluent Connect Hub

https://www.confluent.io/hub/microsoftcorporation/kafka-sink-azure-kusto/

9.3. Build uber jar from source

The dependencies are-

1. Clone the repo<br>

bash
git clone git://github.com/Azure/kafka-sink-azure-kusto.git cd ./kafka-sink-azure-kusto

2. Build with maven<br>
For an Uber jar, run the below-

bash
mvn clean compile assembly:single

For the connector jar along with jars of associated dependencies, run the below-

bash
mvn clean install

Look
within target/components/packages/microsoftcorporation-kafka-sink-azure-kusto-<version>/microsoftcorporation-kafka-sink-azure-kusto-<version>/lib/
folder

<hr>

10. Test drive the connector - standalone mode

In a standalone mode (not recommended for production), the connector can be test driven in any of the following ways-

10.1. Self-contained Dockerized setup

Review this hands on lab
. It includes dockerized kafka, connector and Kafka producer to take away complexities and allow you to focus on the
connector aspect.

10.2. HDInsight Kafka, on an edge node

Review this hands on lab
. The lab referenced may have slightly outdated list of sink properties. Modify them to make current, leveraging the
latest sink properties detailed in section 5.

<hr>

11. Distributed deployment details

The following are the components and configuration needed in place.<br>

CONNECTOR
<br>
<br>

<hr> <br>

The following section strives to explain further, pictorially, what's involved with distributed deployment of the
connector-<br>

11.1. Docker image creation

  1. Create a Docker Hub account if it does not exist
  2. Install Docker desktop on your machine
  3. Build a docker image for the KafkaConnect worker that include any connect worker level configurations, and the Kusto
    connector jar
  4. Push the image to the Docker hub
    <br>

CONNECTOR
<br>
<br>

<hr> <br>

11.2. Provision Kafka Connect workers on an Azure Kubernetes Service cluster

  1. Provision KafkaConnect workers on our Azure Kubernetes Service cluster

When we start off, all we have is an empty Kubernetes cluster-

CONNECTOR
<br>
<br>

<hr> <br>

When we are done, we have a live KafkaConnect cluster that is integrated with Confluent Cloud-

CONNECTOR
<br>
<br>

<hr> <br>

Note: This still does not have copy tasks (connector tasks) running yet

11.3. Postman for Kafka Connect REST APIs or REST calls from your CLI

  1. Install Postman on our local machine<br>
  2. Import KafkaConnect REST call JSON collection from Github into Postman<br>
    https://github.com/Azure/azure-kusto-labs/blob/confluent-clound-hol/kafka-integration/confluent-cloud/rest-calls/Confluent-Cloud-ADX-HoL-1-STUB.postman_collection.json<br>

OR<br>

  1. Find the REST calls here to call from CLI

Note: Depending on Kafka security
configuration, update the security configuration in the sink properties
.

11.4. Launch the connector tasks using the Kafka Connect REST API

  1. Launch the Kafka-ADX copy tasks/REST call, otherwise called connector tasks from Postman or via curl command

This is what we will see, a Kusto sink connector cluster with copy tasks running.

CONNECTOR
<br>
<br>

<hr> <br>

Note: The diagram below depicts just one connector task per Kafka Connect worker. You can actually run 1..many connector
tasks till you max out the capacity.

CONNECTOR
<br>
<br>

<hr> <br>

Section 11, below, links to a hands-on lab to test drive the deployment. The lab is end to end. Prefer
the Confluent cloud lab
for the simplest deployment.

<hr>

12. Test drive the connector - distributed mode

The labs referenced below may have slightly outdated list of sink properties. Modify them to make current, leveraging
the latest sink properties detailed
in section 5.

12.1. HDInsight Kafka

For a non-secured HDInsight Kafka cluster

For a secured HDInsight Kafka cluster (Kerberised)

12.2. Confluent Cloud

Run through this end-to-end hands-on-lab

12.3. Confluent IaaS (operator based)

Run through this end-to-end hands-on-lab

<hr>

13.1. Docker image

We have deliberately not published a Docker image due to the multitude of versions of Apache Kafka bundled into the
Confluent platform versions, and multiple versions of our Kafka connector, not to mention the security configs specific
to each customer's Kafka deployment. We therefore recommend that a custom image be developed using
the appropriate version compatible with the Apache Kafka
version.

13.2. Helm chart

Similarly, we recommend leveraging the right version of the Helm
chart. Confluent base helm chart for Kafka Connect.

<hr>

14. Other

14.1. Feedback, issues and contribution

The connector plugin is open source. We welcome feedback, and contribution. Log an issue, in the issues tab as needed.
See section 14.

14.2. Scaling out/in

  • Connector tasks can be scaled out per Kafka Connect worker by pausing, editing and resuming the connector cluster
  • When tasks per worker are maxed out, Azure Kubernetes Service cluster can be scaled out for more nodes, and Kafka
    Connect workers can be provisioned on the same

14.3. Sizing

  • Confluent recommends Kafka Connect workers with minimum of 4 cores and 16 GB of RAM
  • Start with 3 workers (3 AKS nodes), and scale horizontally as needed
  • Number of tasks should ideally be equal to the number of Kafka topic partitions, not more
  • Play with the number of tasks, wokers, nodes till you see the performance you desire

14.4. Performance tuning

  • Kafka topic: number of partitions should be tuned for performance
  • Connectors: AKS right-sizing, connector tasks right-sizing, configure the right values for flush.size.bytes and
    flush.interval.ms
  • Kusto: Right-size Kusto cluster for ingestion (SKU and node count), tune the table or database
    ingestion batching policy
  • Format: Avro (with schema registry) and CSV perform more-or-less similarly from tests done

14.5. Upgrading to version 1.x from prior versions

To upgrade, you would have to stop the connector tasks, recreate your connect worker Docker image to include the latest
jar, update the sink properties to leverage the renamed and latest sink properties, reprovision the connect workers,
then launch the copy tasks. You can use the consumer.override* feature to manipulate offset to read from.

<hr>

15. Need Support?

  • Found a bug? Please help us fix it by thoroughly documenting it
    and filing an issue.
  • Have a feature request? Please post it
    on User Voice to help us prioritize
  • Have a technical question? Ask
    on Stack Overflow with tag "azure-data-explorer"
  • Need Support? Every customer with an active Azure subscription has access
    to support with
    guaranteed response time. Consider submitting a ticket and get assistance from Microsoft support team

16. Major version specifics

With version 1.0, we overhauled the connector. The following are the changes-

  1. We renamed some properties for consistency with standards
  2. Added support for schema registry
  3. Added support for more converters - we supported only stringConverter and ByteArrayConverter previously
  4. Improved upfront validation and fail fast
  5. Added support for configurable behavior on error
  6. Added support for configurable retries
  7. Added support for Kafka Connect dead letter queues
  8. Introduced additional dead letter queue property for those errors that are not handled by Kafka Connect through its
    dead letter queue feature
  9. Improved the delivery guarantees to "at least once" (no data loss)

Here is
our blog post
.

To upgrade, you would have to stop the connector tasks, recreate your connect worker Docker image to include the latest
jar, update the sink properties to leverage the renamed and latest sink properties, reprovision the connect workers,
then launch the copy tasks.

16.1 Breaking changes from version 5.0.0

BREAKING CHANGES

Changes to Java version in version 5.0.0 onward for the connector. There are constant upgrades done to Kusto SDK as well as
open sources connectors to move over to Java 17 that fix a set of CVE vulnerabilities.

Confluent runtimes and interop with Java are documented here
In line with the changes, the latest version of the connector is 5.0.0, which is compatible with Confluent Platform 8.0.0 and JDK 21 is standardized.

Version 5.1.2 uses JDK-17. This was rolled out to support users on JDK-17 and up (5.1.0 was on JDK-21).

<hr>

For information about what changes are included in each release, please see
the Release History section of this document.

17. Release History

Release VersionRelease DateChanges Included
0.1.02020-03-05<ul><li>Initial release</li></ul>
1.0.12020-08-04<ul><li>New feature: flush interval - stop aggregation by timer</li><li>New feature: Support orc avro and parquet via 1 file per message. kusto java sdk version</li><li>Bug fix: Connector didn't work well with the New java version</li><li>Bug fix: Fixed usage of compressed files and binary types</li><li>Bug fix: Client was closed when kafka task was called close() certain partitions. Now closing only on stop. Issue resulted in no refresh of the ingestion resources and caused failure on ingest when trying to add message to the azure queue.</li><li>Bug fix: In certain kafka pipelines - the connector files were deleted before ingestion.</li><li>New feature: Support for dlq</li><li>New feature: Support json and avro schema registry</li><li>New feature: Support json and avro converters</li><li>Bug fix: Correct committed offset value to be (+ 1) so as not to ingest last record twice</li></ul>
1.0.22020-10-06<ul><li>Bug fix: Cast of count of records to long instead of int, to accommodate larger databases.</li></ul>
1.0.32020-10-13<ul><li>Bug fix: Fix Multijson usage</li></ul>
2.0.02020-11-12<ul><li>Bug fix: Trying to create a new directory failed probably because it was already created due to a race condition.</li><li>Bug fix: Resetting the timer was not behind lock, which could result in a race condition of it being destroyed by other code.</li><li>New feature: Added required kusto.query.url parameter so that we can now specify a Kusto Query URL that isn't simply the default of the Kusto Ingestion URL prepended with "ingest-".</li><li>New feature: Renamed the kusto.url parameter to kusto.ingestion.url for clarity and consistency.</li></ul>
2.1.02021-07-11<ul><li>Upgrade Kusto Java SDK to 2.8.2.</li></ul>
2.2.02021-09-13<ul><li>New feature: Streaming ingestion has been added</li></ul>
3.0.02022-06-06<ul><li>New feature: Internal default batch set to 30 seconds</li><li>New feature: Update kusto sdk to latest version 3.1.1</li><li>Bug fix: Flush timer close / fix NPE</li></ul>
3.0.12022-06-13<ul><li>Bug fix:Close could ingest a file after offsets commit - causing duplications</li></ul>
3.0.22022-07-20<ul><li>New feature: Changes to support protobuf data ingestion</li></ul>
3.0.32022-08-09<ul><li>Bug fix: Library upgrade to fix CVE-2020-36518 Out-of-bounds Write</li></ul>
3.0.42022-09-05<ul><li>New feature: Make mapping optional , fixes Issue#76</li><li>New feature: Make table validation optional when the connector starts up (Refer: kusto.validation.table.enable)</li><li>Bug fix: Stop collecting messages when DLQ is not enabled. Provides better scaling & reduces GC pressure</li></ul>
3.0.52022-09-07<ul><li>New feature: Support authentication with Managed Identities</li></ul>
3.0.62022-11-28<ul><li>Upgrade Kusto Java SDK to 3.2.1 and fix failing unit test case (mitigate text4shell RCE vulnerability)</li></ul>
3.0.72022-12-06<ul><li>Upgrade Jackson version to the latest security version</li><li>Filter tombstone records & records that fail JSON serialization</li></ul>
3.0.82022-12-15<ul><li>New feature: Added Proxy support to KustoSinkTask</li></ul>
3.0.92022-12-19<ul><li>Bugfix: Restrict file permissions on file created for ingestion</li><li>Canonicalize file names</li><li>Refactor tests</li></ul>
4.0.02023-03-07<ul><li>Upgrade Kusto Java SDK to 4.0.3 and Kafka Clients to 3.4.0</li><li>Disable table access validation at start up by default</li></ul>
4.0.12023-03-26<ul><li>Upgrade Kusto Java SDK to 4.0.4</li></ul>
4.0.22023-06-28<ul><li>Upgrade Kusto Java SDK to 5.0.0</li><li>Fix vulnerabilities in libs</li></ul>
4.0.32023-07-23<ul><li>Upgrade Kusto Java SDK to 5.0.1</li><li>Fix vulnerabilities in libs</li></ul>
4.0.42023-09-27<ul><li>Upgrade Kusto Java SDK to 5.0.2</li><li>Fix vulnerabilities in snappy-java</li></ul>
4.0.52023-10-27<ul><li>Fix vulnerabilities by upgrading io.netty </li></ul>
4.0.72024-03-19<ul><li>Fix vulnerability CVE-2023-52428 and upgrade Kusto SDK libs</li></ul>
4.0.82024-04-22<ul><li>Fix vulnerability CVE-2024-29025 by upgrading io.netty</li></ul>
4.0.92024-04-22<ul><li>Fix vulnerability CVE-2024-29025 by upgrading io.netty referenced indirectly</li></ul>
4.0.102024-06-25<ul><li>Fix vulnerability CVE-2024-35255 by upgrading azure libs referenced indirectly & update Java SDK</li></ul>
4.1.12024-08-07<ul><li>Upgrade SDK dependencies</li></ul>
4.1.22024-08-07<ul><li>Upgrade SDK dependencies, Support for WIF based authentication</li></ul>
5.0.02025-07-28<ul><li>Bump Kusto SDK, Upgrade to latest confluent versions</li></ul>
5.1.02025-08-05<ul><li>Upgrade to Java 21</li></ul>
5.1.22025-10-05<ul><li>Provide support for JDK 17 and up. Update versions of libs</li></ul>
5.1.32025-12-26<ul><li>Fix vulnerability CVE-2025-67735 by upgrading io.netty to 4.1.129.Final</li></ul>
5.1.52026-01-21<ul><li>Update Kusto SDK and other dependencies</li><li>Fix tests and dependencies</li></ul>
5.1.62026-02-02<ul><li>Fix CVE by upgrading dependencies</li></ul>
5.2.02026-02-27<ul><li>Add JMX metrics support for monitoring connector performance</li><li>Add Docker quickstart with Grafana dashboards</li><li>Update dependency versions</li></ul>
5.2.12026-03-02<ul><li>Fix Jackson vulnerability by upgrading dependencies</li></ul>
5.2.22026-04-09<ul><li>Security fix: Add endpoint URL validation to prevent SSRF attacks via kusto.ingestion.url and kusto.query.url configuration properties. URLs are now validated against the SDK's trusted Azure Data Explorer endpoints (WellKnownKustoEndpoints.json) at configuration time and as a defense-in-depth check before creating connection strings.</li><li>Add comprehensive test coverage for endpoint URL validation across all Azure clouds and sovereign regions</li><li>Fix CVE-2026-33870 and CVE-2026-33871 by upgrading Netty to 4.1.132.Final</li></ul>
5.2.32026-04-17<ul><li>Security fix: Mitigate KQL command injection (CWE-943) in kusto.tables.topics.mapping by validating db, table, mapping, and format fields against an allowlist before interpolation into KQL commands</li></ul>
5.3.02026-04-27<ul><li>Add wildcard (*) topic-to-table mapping support in kusto.tables.topics.mapping to provide a default ingestion configuration for topics that are not explicitly mapped</li></ul>
5.3.12026-05-20<ul><li>Security fix: Bump Netty to 4.2.13.Final to remediate CVE-2026-42583, CVE-2026-42579, CVE-2026-42584, CVE-2026-42587, CVE-2026-41417, CVE-2026-42580, CVE-2026-42581, CVE-2026-42585, and CVE-2026-42578</li></ul>

18. Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.

In order to make the PR process efficient, please follow the below checklist:

  • There is an issue open concerning the code added - Either a bug or enhancement. Preferably the issue includes an
    agreed upon approach.
  • PR comment explains the changes done - This should be a TL;DR; as the rest of it should be documented in the
    related issue.
  • PR is concise - Try to avoid making drastic changes in a single PR. Split it into multiple changes if possible. If
    you feel a major change is needed, make sure the commit history is clear and maintainers can comfortably review both
    the code and the logic behind the change.
  • Please provide any related information needed to understand the change - Especially in the form of unit tests, but
    also docs, guidelines, use-case, best practices, etc as appropriate.
  • Checks should pass
  • Run mvn dependency-check:check. This should return no High/Medium vulnerabilities in any libraries/dependencies

This project has adopted the Microsoft Open Source Code of Conduct.
For more information see the Code of Conduct FAQ or
contact opencode@microsoft.com with any additional questions or comments.

Contributors

Showing top 12 contributors by commit count.

View all contributors on GitHub →

This article is auto-generated from Azure/kafka-sink-azure-kusto via the GitHub API.Last fetched: 6/14/2026