Skip to main content
Version: 3.8.0

Flink On Local

环境准备

  1. 准备一台2核4G以上配置的Linux机器节点(CentOs、macOS等)

  2. Java SE Development Kit 8及以上版本安装

  3. 确保本机节点hostname对应的ip正确

    因为进程启动时候会去拿本机的ip地址,所以要保证/etc/hosts文件中hostname 对应ip地址正确

      hostname
    # 得到上面显示的结果,例如:baisui-host-1
    ping baisui-host-1
    # 确认ip节点是否为本机ip无误,如不正确,请在/etc/hosts将正确的hostname与ip映射关系加上
  4. 确保lsof安装成功,sudo yum install -y lsof

安装

  1. 下载

    wget http://mirror.qlangtech.com/3.8.0/tis/flink-tis-1.13.1-bin.tar.gz && rm -rf flink-tis-1.13.1 && mkdir flink-tis-1.13.1 && tar xvf flink-tis-1.13.1-bin.tar.gz -C ./flink-tis-1.13.1
  1. 启动Flink Cluster

      cd ./flink-tis-1.13.1
    ./bin/start-cluster.sh
  2. 确认端口是否正常

     telnet localhost 8081

总结

恭喜您已经把Flink Standalone模式的集群安装成功了

caution

TIS中使用Flink Standalon模式是经过TIS团队适配过的,修改了Flink JobManager中ClassLoader执行逻辑,所以此处安装不能使用Flink官方发布的安装包

FAQ

1. Could not acquire the minimum required resources

在Flink中已经添加一个任务之后,再向Flink提交新的任务,Flink JM节点日志报告: Could not acquire the minimum required resources 没有足够的资源进行启动

原因:

Flink运行没有足够的slot资源分配给新任务

方法:

修改Flink配置文件:

{FLINK_HOME}/conf/flink-conf.yaml
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1

默认值是1,需要在单个Flink节点上运行多个Flink任务,课修改成大于1的值就行(一般情况slot代表了服务节点的资源并行处理能力,一般配置于节点CPU核数相一致即可)

2. 启动时候由于服务器目录/opt/data/tis没有权限,导致tis-flink启动失败(后台没有对应的进程在运行?)

原因:

tis系统日志默认输出到/opt/data/tis目录,如果当前启动用户没有权限创建此目录时,启动flink-tis时会启动失败,失败日志如下:


Caused by: java.io.IOException: Unable to create directory /opt/data/tis
at org.apache.commons.io.FileUtils.forceMkdir(FileUtils.java:1391) ~[tis-flink-extends-dist-3.6.0-alpha.jar:?]
at com.qlangtech.plugins.incr.flink.TISFlinkClassLoaderFactory.makeDataDirUseable(TISFlinkClassLoaderFactory.java:132) ~[tis-flink-extends-dist-3.6.0-alpha.jar:?]
at com.qlangtech.plugins.incr.flink.TISFlinkClassLoaderFactory.buildServerLoaderFactory(TISFlinkClassLoaderFactory.java:154) ~[tis-flink-extends-dist-3.6.0-alpha.jar:?]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.defaultClassLoaderFactory(BlobLibraryCacheManager.java:196) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.jobmaster.JobManagerSharedServices.fromConfiguration(JobManagerSharedServices.java:124) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.Dispatcher.<init>(Dispatcher.java:171) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.StandaloneDispatcher.<init>(StandaloneDispatcher.java:40) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.SessionDispatcherFactory.createDispatcher(SessionDispatcherFactory.java:44) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.SessionDispatcherFactory.createDispatcher(SessionDispatcherFactory.java:27) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayServiceFactory.create(DefaultDispatcherGatewayServiceFactory.java:62) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.createDispatcher(SessionDispatcherLeaderProcess.java:102) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherIfRunning$0(SessionDispatcherLeaderProcess.java:96) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.runIfState(AbstractDispatcherLeaderProcess.java:222) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.runIfStateIs(AbstractDispatcherLeaderProcess.java:212) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.createDispatcherIfRunning(SessionDispatcherLeaderProcess.java:96) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_151]
... 6 more
2022-11-14 17:23:58,659 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-11-14 17:23:58,661 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-11-14 17:23:58,780 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.

修改方式是在一下配置文件加env.java.opts配置项

{FLINK_HOME}/conf/flink-conf.yaml
#具体目录根据自己情况修改

env.java.opts: "-Ddata.dir=/opt/data/tis"

3. 执行增量实例Cancel操作出现超时异常

象限描述:

本Case在创建增量实例时,使用了Chunjun Sink实现了整库表的同步功能,在手动取消(Cancel)实例过程中抛出以下异常,且一旦出现超时异常,Flink会将TM中剩下的全部slot耗尽,导致不能在重新部署新的Flink Job

2023-05-13 16:39:39,532 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Task did not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1735) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
2023-05-13 16:39:39,536 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1735) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

原因分析:

由于Chujun Sink 在执行关闭时,需要为每张表对应的StreamLoadManager执行close操作,而该close执行又是比较耗时的,而且从日志执行过程来看每个表对应的close是线性排队执行的,所以当通道本身选择的表 数据量比较多的情况下,会出现超时异常也就不足为奇了。

解决办法:

$FLINK_HOME/conf/flink-conf.yaml配置文件中设置 配置参数 task.cancellation.timeout,详细配置请查阅:https://study.sf.163.com/documents/read/service_support/dsc-t-d-0501