Kafka
批量读

- 配置项说明:
Bootstrap Servers
- 类型: 单行文本
- 必须: 是
- 默认值: 无
- 说明: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form
host1:port1,host2:port2,...
. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
Subscription Method
- 类型: 单行文本
- 必须: 是
- 默认值: 无
- 说明: You can choose to manually assign a list of partitions, or subscribe to all topics matching specified pattern to get dynamically assigned partitions.
Test Topic
- 类型: 单行文本
- 必须: 否
- 默认值: 无
- 说明: The Topic to test in case the can consume messages.
MessageFormat
- 类型: 单行文本
- 必须: 是
- 默认值: json
- 说明: The serialization used based on this.
Request Timeout, ms
- 类型: 整型数字
- 必须: 否
- 默认值: 30000
- 说明: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
猜测字段类型
- 类型: 单行文本
- 必须: 是
- 默认值: on
- 说明: 通过TIS提供的的内部算法,预先读取Kafka事件流中一定数量的记录,猜测对应列的类型,以帮助最大化提高录入表单效率。最后通过分析得到的类型不够准确,需要用户手动微调。
Group ID
- 类型: 单行文本
- 必须: 是
- 默认值: 无
- 说明: The Group ID is how you distinguish different consumer groups.
Client DNS Lookup
- 类型: 单选
- 必须: 否
- 默认值: use_all_dns_ips
- 说明: Controls how the client uses DNS lookups. If set to use_all_dns_ips, connect to each returned IP address in sequence until a successful connection is established. After a disconnection, the next IP is used. Once all IPs have been used once, the client resolves the IP(s) from the hostname again. If set to resolve_canonical_bootstrap_servers_only, resolve each bootstrap address into a list of canonical names. After the bootstrap phase, this behaves the same as use_all_dns_ips. If set to default (deprecated), attempt to connect to the first IP address returned by the lookup, even if the lookup returns multiple IP addresses.
Protocol
- 类型: 单行文本
- 必须: 是
- 默认值: PLAINTEXT
- 说明: The Protocol used to communicate with brokers.
Enable Auto Commit
- 类型: 单选
- 必须: 否
- 默认值: true
- 说明: If true, the consumer's offset will be periodically committed in the background.
Client ID
- 类型: 单行文本
- 必须: 否
- 默认值: 无
- 说明: An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
Retry Backoff, ms
- 类型: 整型数字
- 必须: 否
- 默认值: 100
- 说明: The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
Auto Commit Interval, ms
- 类型: 整型数字
- 必须: 否
- 默认值: 5000
- 说明: The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.
Max Poll Records
- 类型: 整型数字
- 必须: 否
- 默认值: 500
- 说明: The maximum number of records returned in a single call to poll(). Note, that max_poll_records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.
Receive Buffer, bytes
- 类型: 整型数字
- 必须: 否
- 默认值: 32768
- 说明: The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
Socket Setup TimeoutMs
- 类型: 整型数字
- 必须: 否
- 默认值: 10000
- 说明: The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.
Socket Setup Timeout MaxMs
- 类型: 整型数字
- 必须: 否
- 默认值: 30000
- 说明: The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.
Maximum Records
- 类型: 整型数字
- 必须: 否
- 默认值: 100000
- 说明: The Maximum to be processed per execution
Repeated Calls
- 类型: 整型数字
- 必须: 否
- 默认值: 3
- 说明: The number of repeated calls to poll() if no messages were received.
Polling Time
- 类型: 整型数字
- 必须: 否
- 默认值: 1000
- 说明: Amount of time Kafka connector should try to poll for messages.
批量写

- 配置项说明:
Bootstrap Servers
- 类型: 单行文本
- 必须: 是
- 默认值: 无
- 说明: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form
host1:port1,host2:port2,...
. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
Topic
- 类型: 单行文本
- 必须: 是
- 默认值: 无
- 说明: Topic pattern in which the records will be sent. '{stream}' to send the message to a specific topic based on these values. Notice that the topic name will be transformed to a standard naming convention.
Test Topic
- 类型: 单行文本
- 必须: 否
- 默认值: 无
- 说明: Topic to test if can produce messages.
Delivery Timeout
- 类型: 整型数字
- 必须: 是
- 默认值: 120000
- 说明: An upper bound on the time to report success or failure after a call to 'send()' returns.
Protocol
- 类型: 单行文本
- 必须: 是
- 默认值: PLAINTEXT
- 说明: Protocol used to communicate with brokers.
ACKs
类型: 单选
必须: 是
默认值: 1
说明:
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent.
all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证
Compression Type
- 类型: 单选
- 必须: 是
- 默认值: none
- 说明: The compression type for all data generated by the producer.
Send Buffer bytes
- 类型: 整型数字
- 必须: 是
- 默认值: 131072
- 说明: The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.
Client DNS Lookup
- 类型: 单选
- 必须: 是
- 默认值: use_all_dns_ips
- 说明: Controls how the client uses DNS lookups. If set to use_all_dns_ips, connect to each returned IP address in sequence until a successful connection is established. After a disconnection, the next IP is used. Once all IPs have been used once, the client resolves the IP(s) from the hostname again. If set to resolve_canonical_bootstrap_servers_only, resolve each bootstrap address into a list of canonical names. After the bootstrap phase, this behaves the same as use_all_dns_ips. If set to default (deprecated), attempt to connect to the first IP address returned by the lookup, even if the lookup returns multiple IP addresses.
Request Timeout
- 类型: 整型数字
- 必须: 是
- 默认值: 30000
- 说明: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Batch Size
类型: 整型数字
必须: 是
默认值: 16384
说明:
The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
控制发送者在发布到kafka之前等待批处理的字节数。满足batch.size和ling.ms之一,producer便开始发送消息
Linger ms
- 类型: 单行文本
- 必须: 是
- 默认值: 0
- 说明: The producer groups together any records that arrive in between request transmissions into a single batched request.
Client ID
- 类型: 单行文本
- 必须: 否
- 默认值: 无
- 说明: An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
Max Request Size
- 类型: 整型数字
- 必须: 是
- 默认值: 1048576
- 说明: The maximum size of a request in bytes.
Enable Idempotence
- 类型: 单选
- 必须: 是
- 默认值: false
- 说明: When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.
Max in Flight Requests per Connection
- 类型: 整型数字
- 必须: 是
- 默认值: 5
- 说明: The maximum number of unacknowledged requests the client will send on a single connection before blocking. Can be greater than 1, and the maximum value supported with idempotency is 5.
Retries
- 类型: 整型数字
- 必须: 是
- 默认值: 100
- 说明: Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
Socket Connection Setup Timeout
- 类型: 单行文本
- 必须: 是
- 默认值: 10000
- 说明: The amount of time the client will wait for the socket connection to be established.
Socket Connection Setup Max Timeout
- 类型: 单行文本
- 必须: 是
- 默认值: 30000
- 说明: The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum.
Buffer Memory
- 类型: 单行文本
- 必须: 是
- 默认值: 33554432
- 说明: The total bytes of memory the producer can use to buffer records waiting to be sent to the server.
Max Block ms
- 类型: 单行文本
- 必须: 是
- 默认值: 60000
- 说明: The configuration controls how long the KafkaProducer's send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction() methods will block.
Sync Producer
- 类型: 单选
- 必须: 否
- 默认值: false
- 说明: Wait synchronously until the record has been sent to Kafka.
Receive Buffer bytes
- 类型: 整型数字
- 必须: 是
- 默认值: 32768
- 说明: The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
实时读

- 配置项说明:
独立监听
类型: 单选
必须: 是
默认值: false
说明:
执行Flink任务过程中,监听分配独立的Slot计算资源不会与下游计算算子混合在一起。
如开启,带来的好处是运算时资源各自独立不会相互相互影响,弊端是,上游算子与下游算子独立在两个Solt中需要额外的网络传输开销
Startint Offsets
类型: 单行文本
必须: 是
默认值: Latest Offset
说明:
Kafka消费起始位置,有以下策略可供选择
Committed Offset
: Start from committed offset of the consuming group, without reset strategy. An exception will be thrown at runtime if there is no committed offsets.Earliest Offset
: Start from earliest offsetEarliest When None Committed Offset
: Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't existLatest Offset
: (default) Start from latest offsetTimestamp Offset
: Start from the first record whose timestamp is greater than or equals a timestamp (milliseconds)
实时写

- 配置项说明:
传输格式
类型: 单行文本
必须: 是
默认值: json
说明:
Kafka 传输文本格式 ,参数设置,详细请查看: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/overview/
semantic
类型: 单选
必须: 是
默认值: none
说明:
描述: sink 端是否支持二阶段提交
注意: 如果此参数为空,默认不开启二阶段提交,即 sink 端不支持 exactly_once 语义; 当前只支持 exactly-once 和 at-least-once
脚本类型
类型: 单行文本
必须: 是
默认值: StreamAPI
说明:
TIS 为您自动生成 Flink Stream 脚本,现支持两种类型脚本:
SQL
: 优点逻辑清晰,便于用户自行修改执行逻辑Stream API
:优点基于系统更底层执行逻辑执行、轻量、高性能
parallelism
- 类型: 整型数字
- 必须: 是
- 默认值: 1
- 说明: sink 并行度