The Kafka Connector is used to communicate with Apache Kafka system. In Kafka, messages are stored in queues that are managed by Kafka brokers. Kafka broker maintains topics to which the processes using Kafka can subscribe. A message to a Kafka topic is published using Kafka Producer and messages can be read from Kafka using Kafka Consumer. A topic can be divided into partitions that can exist across multiple brokers. A partition is identified with an integer value. A message to a Kafka queue can be published using the PUT command in the following format.
PUT sourcepath <partition>/<key>/
PUT test2/testfile2.txt 1/somekey
Here the partition and key are optional. In the example, the first command publishes the content of the file `test/testfile.txt` to the configured Kafka topic. The next command publishes the content of the file `test2/testfile2.txt` to partition 1 of the configured topic with record key `somekey`.
Kafka Receiver can be enabled by checking `Enable Receiver`, setting `PollIntervalMs` to a positive integer value and setting `Consumer Group Id`. When Kafka Receiver is enabled, messages are polled and stored in the inbox. Receiver receives messages from Kafka Consumer as String and then converts them to byte stream while creating a Receive instance.
Kafka supports SSL and SASL authentication and encryption. Kafka Connector supports both SSL and SASL authentication. For SASL authentication, Kakfa connector supports the PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512 SASL mechanisms. It also supports both one-way and two-way authentication.
To enable basic one-way authentication using SSL select the Enable SSL check box, upload your client trust store, and enter the password for client trust store. Client trust store should have the server's root certification install in it. Kafka broker should be started with SSL authentication enabled.
To enable two-way authentication, select the "Enable Client Authentication" check box. Then, upload the client keystore, and enter the keystore password and key password. The client root certificate should be installed in the broker trust store.
For the SASL authentication, the Username and Password must be entered.
For information about how to enable SSL authentication on the broker, see the Kafka security documentation: https://kafka.apache.org/documentation/#security
File size limit
File size limit (in the connector) can be configured by setting MessageMaxByteSize, the default size is 1 MB. Kafka broker by default has a message size limit of 1 MB which can be configured by setting message.max.byte (in server properties). In order to increase the message size both these settings must be changed.
Kafka Connector Properties
Each instance of the Kafka Connector can be configured using the following settings:
|Host Cluster||The host cluster.||Yes|
|Topics||Kafka topics. Comma-separated (topic1,topic2)
|SASL Mechanism||SASL mechanism used to connect (Kafka property 'sasl.mechanism' property). Must match one of the servers 'sasl.enabled.mechanisms' setting values.||Yes|
|SASL Security Protocol||SASL security protocol used to connect (Kafka property 'security.protocol'). Must match the servers scheme of the 'listeners' (or 'advertised.listeners') setting matching the port in 'Host Cluster'.||Yes|
|Username||Username for SASL connection. Used to build Kafka 'sasl.jaas.config' property.|
|Password||Password for SASL connection. Used to build Kafka 'sasl.jaas.config' property.|
|Enable Receiver||Enable Kafka receiver client. Once set, the Kafka receiver will start polling from the topics.|
|Poll Interval Ms||Intervals at which Kafka consumer needs to be polled.|
|Auto Commit Offset||Kafka optbn for auto commit offset. (Kafka consumer property 'enable.auto.commit')|
|Auto Commit Interval Ms||Auto-commit interval_ms in case the auto commit offset is enabled. (Kafka consumer property 'auto.commit.interval ms.ms')||Yes|
|Consumer Group Id||Kafka consumer group id. (Kafka consumer property 'group.id')||Yes|
|Poll Time Out Ms||Incoming poll timeout.|
|Fetch Min Bytes||The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.||Yes|
|Heartbeat Interval Ms||The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.||Yes|
|Max Partition Fetch Bytes||The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.||Yes|
|Session Timeout Ms||The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.||Yes|
|Auto Offset Reset||What to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (for example, because that data has been deleted):
|Default Api Timeout Ms||Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a timeout parameter.||Yes|
|Exclude Internal Topics||Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to true, the only way to receive records from an internal topic is by subscribing to it.||Yes|
|Fetch Max Bytes||The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined by message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.||Yes|
|Isolation Level||Controls how to read messages written transactionally.
If set to read_committed, consumer.poll() will only return transactional messages that have been committed.
If set to read_uncommitted (the default), consumer.poll() will return all messages, even transactional messages that have been aborted. Non-transactional messages will be returned unconditionally in either mode.
|Max Poll Interval Ms||The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before this timeout expires, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.||Yes|
|Max Poll Records||The maximum number of records returned in a single call to poll().||Yes|
|Partition Assignment Strategy||The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used.||Yes|
|Send Buffer Bytes||The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.||Yes|
|Acks||Controls the criteria under which requests are considered complete. When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0, 1, or all (-1) replicas.||Yes|
|Delivery Timeout Ms||Kafka producer property delivery.timeout.ms. An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retriable send failures.||Yes|
|Compression Type||The compression type for all data generated by the producer. The default is none (that is, no compression). Valid values are none, gzip, snappy, lz4, or zstd. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio -- more batching means better compression.||Yes|
|Linger Ms||Kafka producer property linger.ms. Wait period for buffer space to be filled before sending the message.||Yes|
|Client Id||An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.|
|Max Bbck Ms||The configuration controls how long KafkaOutgoing.send() and KafkaOutgoing.partitbnsFor() will block. These methods can be blocked either because the buffer is full or metadata is unavailable. Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.||Yes|
|Max Request Size Bytes||The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.||Yes|
|Partitioner Class||Partitioner class that implements the org.apache.kafka.cfients.producer.Partitioner interface.||Yes|
|Batch Size Bytes||The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.||Yes|
|Buffer Memory Bytes||The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer will block for max.bbck.ms after which it will throw an exception.||Yes|
|Enable Idempotence||The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular, producer retries will no longer introduce duplicates. If set, the retries config will default to Integer.MAX VALUE and the acks config will default to all.||Yes|
|Retries||If the request fails, the producer can automatically retry. Enabling retries also opens up the possibility of duplicates.||Yes|
|Message Max Size Bytes||Message size limit for sending. Increasing this value will enable sending larger files from the connector. Broker configuration also needs to be changed to receive larger files. This value should be the same as message.max.bytes on broker confiq.||Yes|
|Connections Max Idle Ms||Close idle connections after the number of milliseconds specified by this config.||Yes|
|Receive Buffer Bytes||The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used.||Yes|
|Request Timeout Ms||Controls the maximum amount of time the client will wait for the response to a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted.||Yes|
|Client DNS Lookup||Controls how the client uses DNS lookups. If set to use_all_dns_ips, when the lookup returns multiple IP addresses for a hostname, attempts are made to connect all before failing the connection. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only each entry will be resolved and expanded into a list of canonical names.||Yes|
|Enable SSL||Enable Authentication and Encryption using SSL protocol.|
|SSL Trust Store||Upload SSL TrustStore file.|
|Trust Store Password|
|Enable Client Authentication||Enable server authenticating client|
|SSL Key Store||Upload the keystore file|
|SSL Keystore Password||Password for keystore|
|SSL Key Password||Password for certificate|
|SSL Trust Store Type||SSL Truststore type|
|SSL Keystore Type||SSL Keystore type|
|SSL Provider||Name of security provider for SSL connection.|
|SSL Cipher Suite||A cipher suite is a named combination of authentication, encryption, MAC, and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.|
|SSL Enabled Protocol||It should list at least one of the protocols configured on the broker side|
|SSL Endpoint Identification Algorithm||The endpoint identification algorithm to validate server hostname using server certificate.|
|Command Retries||The number of times the command should be retried when an error or exception occurs.
Valid range: [0-5].
|Command Retry Delay (seconds)||The number of seconds to wait between retries.
Valid range: [0-120].
|Do Not Send Zero Length Files||For PUT, a switch that controls whether to send a file if it is zero-length.|
|Delete Received Zero Length Files||For GET, a switch that controls whether to remove a received file that is zero-length.|
|Retrieve Directory Sort||For PUT, the sorting options for the list of outbound files.|
|Enable Debug||A switch that indicates whether to perform debug logging.|
|System Scheme Name||The URI scheme name used as a shortcut to this host.
|System Public||A switch that indicates whether the connector is public.|
Please sign in to leave a comment.