Flink 自定义窗口触发器

Flink 自定义窗口触发器

·

2 min read

自定义数量和超时时间的 CountWithTimeTrigger

package vehicle.excavate.operator;


import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

public class CountWithTimeTrigger<T, W extends Window> extends Trigger<T, W> {
    private final long maxCount;
    private final long timeoutMs;
    private final MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapState", String.class, Long.class);
    private final String countStr = "count";
    private final String timeStr = "time";

    private CountWithTimeTrigger(long maxCount, long timeoutMs) {
        this.maxCount = maxCount;
        this.timeoutMs = timeoutMs;
    }

    @Override
    // The onElement() method is called for each element that is added to a window.
    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
        // set the default value if first call
        if (!mapState.contains(countStr)) {
            mapState.put(countStr, 0L);
        }
        if (!mapState.contains(timeStr)) {
            mapState.put(timeStr, Long.MAX_VALUE);
        }
        final long count = mapState.get(countStr);
        final long deadline = mapState.get(timeStr);

        final long currentTimeMs = System.currentTimeMillis();
        final long newCount = count + 1;

        if (currentTimeMs >= deadline || newCount >= maxCount) {
            return fire(mapState);
        }

        if (deadline == Long.MAX_VALUE) {
            final long nextDeadline = currentTimeMs + timeoutMs;
            mapState.put(timeStr, nextDeadline);
            ctx.registerProcessingTimeTimer(nextDeadline);
        }

        mapState.put(countStr, newCount);

        return TriggerResult.CONTINUE;
    }

    @Override
    // The onEventTime() method is called when a registered event-time timer fires.
    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    // The onProcessingTime() method is called when a registered processing-time timer fires.
    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
        final long deadline = mapState.get(timeStr);
        if (deadline == time) {
            return fire(mapState);
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    // Finally the clear() method performs any action needed upon removal of the corresponding window.
    public void clear(W window, TriggerContext ctx) throws Exception {
        MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
        final long deadlineValue = mapState.get(timeStr);
        if (deadlineValue != Long.MAX_VALUE) {
            ctx.deleteProcessingTimeTimer(deadlineValue);
        }
        mapState.clear();
    }

    private TriggerResult fire(MapState<String, Long> mapState) throws Exception {
        mapState.put(timeStr, Long.MAX_VALUE);
        mapState.put(countStr, 0L);
        return TriggerResult.FIRE;
    }

    public static <T, W extends Window> CountWithTimeTrigger<T, W> of(long maxCount, long intervalMs) {
        return new CountWithTimeTrigger<>(maxCount, intervalMs);
    }
}

调用:

source.windowAll(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountWithTimeTrigger.of(10, 1000)))
    .process(...)

一直用的都是同一个窗口,PurgingTrigger 的作用是每次触发计算时清空窗口,如果不清理窗口,窗口的数据会一直累加

source.windowAll(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountWithTimeTrigger.of(10, 1000)))
    .process(new FakePlateOperator())
    .addSink(new MySink());

或者触发计算的时候用 FIRE_AND_PURGE

private TriggerResult fire(MapState<String, Long> mapState) throws Exception {
    mapState.put(timeStr, Long.MAX_VALUE);
    mapState.put(countStr, 0L);
    return TriggerResult.FIRE_AND_PURGE;
}