如在 Process*Function中
//保存操作日志信息到state中ttl = 3day
private transient MapState<String, Map<String, String>> cmsOperateState;
private transient ScheduledExecutorService executor;
private transient volatile int currentSize;
open方法中
cmsOperateState = getRuntimeContext().getMapState(stateDesc);
// 创建异步线程池(建议单线程)
executor = Executors.newSingleThreadScheduledExecutor();
// 启动异步更新任务(300秒间隔)
executor.scheduleAtFixedRate(
() -> {
try {
currentSize = calculateStateSize();
} catch (Exception e) {
e.printStackTrace();
}
},
0, 300, TimeUnit.SECONDS
);
// 注册监控指标
getRuntimeContext().getMetricGroup()
.addGroup("MyStateMetrics").gauge("mapStateSize", () -> currentSize);
再定一一个方法
private int calculateStateSize() throws Exception {
LOG.error("calculateStateSize_start");
// 遍历状态的逻辑(需线程安全)
int size = 0;
Iterator<Map.Entry<String, Map<String, String>>> it = cmsOperateState.iterator();
while (it.hasNext()) {
it.next();
size++;
}
LOG.error("calculateStateSize_end:" + size);
return size;
}
不显示 没成功。
大模型说
在 Flink 中使用自定义 Metric(如 Gauge、Counter)不需要额外开启全局配置即可注册和采集指标,但需通过 Metric Reporter 配置将指标数据导出到外部监控系统(如 Prometheus、JMX)。以下是详细配置指南:
1. 默认行为
Flink 默认会将所有注册的 Metric 保存在内存中,但不会自动导出。需通过以下配置将指标暴露给外部系统:
2. 配置指标导出
在 flink-conf.yaml 中配置 Metric Reporter,支持多种协议:
(1) Prometheus 配置示例
yaml
启用 Prometheus Reporter
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999 # 暴露端口(默认 9249)
metrics.reporter.prom.interval: 5 SECONDS # 推送间隔
(2) JMX 配置示例
yaml
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
(3) 日志输出配置示例
yaml
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 1 MINUTE # 日志输出间隔
3. 代码中注册自定义 Metric
在 RichFunction 的 open() 方法中注册 Gauge:
java
public class MyRichFunction extends RichMapFunction<String, String> {
private transient MapState<String, Integer> mapState;
@Override
public void open(Configuration parameters) {
// 初始化状态
MapStateDescriptor<String, Integer> descriptor =
new MapStateDescriptor<>("mapState", String.class, Integer.class);
mapState = getRuntimeContext().getMapState(descriptor);
// 注册 Gauge
getRuntimeContext().getMetricGroup()
.gauge("mapStateSize", () -> {
int size = 0;
Iterator<Map.Entry<String, Integer>> it = mapState.iterator();
while (it.hasNext()) {
it.next();
size++;
}
return size;
});
}
}
4. 指标暴露验证
(1) Prometheus 指标查看
访问 http://
(2) Flink Web UI
在 Flink Web UI 的 TaskManager Metrics 选项卡中查找自定义指标(需配置 metrics.reporter.*)。
(3) REST API
通过 Flink REST API 获取指标:
bash
curl http://
5. 高级配置
(1) 指标作用域
通过 MetricGroup 定义指标层级:
java
getRuntimeContext().getMetricGroup()
.addGroup("custom_group") // 增加分组层级
.gauge("mapStateSize", ...);
对应指标名称为:custom_group.mapStateSize
(2) 标签支持(Prometheus)
在指标中附加标签:
java
MetricGroup group = getRuntimeContext().getMetricGroup()
.addGroup("region", "us-east");
group.gauge("mapStateSize", ...);
对应 Prometheus 指标格式:region_us_east_mapStateSize
6. 注意事项
性能影响
避免高频同步遍历大状态(如每秒多次),建议使用 异步更新 或 抽样统计。
对于 RocksDB 状态后端,遍历 MapState 会触发磁盘读取,需谨慎优化。
资源清理
在 close() 方法中关闭异步线程或连接池。
集群部署差异
Standalone 集群:直接修改 flink-conf.yaml。
YARN/K8s:通过 -Dmetrics.reporter.prom.port=9999 动态传参。
总结
必选配置:选择并启用至少一个 Metric Reporter(如 Prometheus)。
推荐实践:结合异步更新和 Prometheus + Grafana 实现实时监控。
避坑指南:确保 TaskManager 的防火墙开放 Reporter 端口(如 9999)。
没有评论