如在 Process*Function中

//保存操作日志信息到state中ttl = 3day
    private transient MapState<String, Map<String, String>> cmsOperateState;
    private transient ScheduledExecutorService executor;
    private transient volatile int currentSize;

open方法中

 cmsOperateState = getRuntimeContext().getMapState(stateDesc);

            // 创建异步线程池(建议单线程)
            executor = Executors.newSingleThreadScheduledExecutor();

            // 启动异步更新任务(300秒间隔)
            executor.scheduleAtFixedRate(
                    () -> {
                        try {
                            currentSize = calculateStateSize();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    },
                    0, 300, TimeUnit.SECONDS
            );
            // 注册监控指标
            getRuntimeContext().getMetricGroup()
                    .addGroup("MyStateMetrics").gauge("mapStateSize", () -> currentSize);

再定一一个方法

    private int calculateStateSize() throws Exception {
        LOG.error("calculateStateSize_start");
        // 遍历状态的逻辑(需线程安全)
        int size = 0;
        Iterator<Map.Entry<String, Map<String, String>>> it = cmsOperateState.iterator();
        while (it.hasNext()) {
            it.next();
            size++;
        }
        LOG.error("calculateStateSize_end:" + size);
        return size;
    }

不显示 没成功。
大模型说
在 Flink 中使用自定义 Metric(如 Gauge、Counter)​不需要额外开启全局配置即可注册和采集指标,但需通过 ​Metric Reporter 配置将指标数据导出到外部监控系统(如 Prometheus、JMX)。以下是详细配置指南:

​1. 默认行为
Flink 默认会将所有注册的 Metric 保存在内存中,但不会自动导出。需通过以下配置将指标暴露给外部系统:

​2. 配置指标导出
在 flink-conf.yaml 中配置 Metric Reporter,支持多种协议:

(1) Prometheus 配置示例
yaml

启用 Prometheus Reporter

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999 # 暴露端口(默认 9249)
metrics.reporter.prom.interval: 5 SECONDS # 推送间隔
(2) JMX 配置示例
yaml
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
(3) 日志输出配置示例
yaml
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 1 MINUTE # 日志输出间隔
​3. 代码中注册自定义 Metric
在 RichFunction 的 open() 方法中注册 Gauge:

java
public class MyRichFunction extends RichMapFunction<String, String> {

private transient MapState<String, Integer> mapState;

@Override
public void open(Configuration parameters) {
    // 初始化状态
    MapStateDescriptor<String, Integer> descriptor = 
        new MapStateDescriptor<>("mapState", String.class, Integer.class);
    mapState = getRuntimeContext().getMapState(descriptor);

    // 注册 Gauge
    getRuntimeContext().getMetricGroup()
        .gauge("mapStateSize", () -> {
            int size = 0;
            Iterator<Map.Entry<String, Integer>> it = mapState.iterator();
            while (it.hasNext()) {
                it.next();
                size++;
            }
            return size;
        });
}

}
​4. 指标暴露验证
(1) Prometheus 指标查看
访问 http://:9999/metrics 查看指标数据,搜索 mapStateSize。

(2) Flink Web UI
在 Flink Web UI 的 ​TaskManager Metrics 选项卡中查找自定义指标(需配置 metrics.reporter.*)。

(3) REST API
通过 Flink REST API 获取指标:

bash
curl http://:8081/jobs//metrics?get=mapStateSize
​5. 高级配置
(1) 指标作用域
通过 MetricGroup 定义指标层级:

java
getRuntimeContext().getMetricGroup()

.addGroup("custom_group")  // 增加分组层级
.gauge("mapStateSize", ...);

对应指标名称为:custom_group.mapStateSize

(2) 标签支持(Prometheus)​
在指标中附加标签:

java
MetricGroup group = getRuntimeContext().getMetricGroup()

.addGroup("region", "us-east");

group.gauge("mapStateSize", ...);
对应 Prometheus 指标格式:region_us_east_mapStateSize

​6. 注意事项
​性能影响

避免高频同步遍历大状态(如每秒多次),建议使用 ​异步更新 或 ​抽样统计。
对于 RocksDB 状态后端,遍历 MapState 会触发磁盘读取,需谨慎优化。
​资源清理

在 close() 方法中关闭异步线程或连接池。
​集群部署差异

​Standalone 集群:直接修改 flink-conf.yaml。
​YARN/K8s:通过 -Dmetrics.reporter.prom.port=9999 动态传参。
​总结
​必选配置:选择并启用至少一个 Metric Reporter(如 Prometheus)。
​推荐实践:结合异步更新和 Prometheus + Grafana 实现实时监控。
​避坑指南:确保 TaskManager 的防火墙开放 Reporter 端口(如 9999)。