/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.Committer;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

public class CommitterOperator
extends AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Committable, Committable>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final Deque<Committable> inputs = new ArrayDeque<Committable>();
    private final boolean streamingCheckpointEnabled;
    private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;
    private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> committableSerializer;
    private ListState<ManifestCommittable> streamingCommitterState;
    private final SerializableFunction<String, Committer> committerFactory;
    private Committer committer;
    private boolean endInput = false;

    public CommitterOperator(boolean streamingCheckpointEnabled, SerializableFunction<String, Committer> committerFactory, SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>> committableSerializer) {
        this.streamingCheckpointEnabled = streamingCheckpointEnabled;
        this.committableSerializer = committableSerializer;
        this.committablesPerCheckpoint = new TreeMap<Long, ManifestCommittable>();
        this.committerFactory = (SerializableFunction)Preconditions.checkNotNull(committerFactory);
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        ListState commitUserState = context.getOperatorStateStore().getListState(new ListStateDescriptor("commit_user_state", String.class));
        ArrayList<String> commitUsers = new ArrayList<String>();
        ((Iterable)commitUserState.get()).forEach(commitUsers::add);
        if (context.isRestored()) {
            Preconditions.checkState((commitUsers.size() == 1 ? 1 : 0) != 0, (Object)("Expecting 1 commit user name when recovering from checkpoint but found " + commitUsers.size() + ". This is unexpected."));
        } else {
            Preconditions.checkState((boolean)commitUsers.isEmpty(), (Object)("Expecting 0 commit user name for a fresh sink state but found " + commitUsers.size() + ". This is unexpected."));
            String commitUser = UUID.randomUUID().toString();
            commitUserState.add((Object)commitUser);
            commitUsers.add(commitUser);
        }
        this.committer = (Committer)this.committerFactory.apply(commitUsers.get(0));
        this.streamingCommitterState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE)), (SimpleVersionedSerializer)this.committableSerializer.get());
        ArrayList<ManifestCommittable> restored = new ArrayList<ManifestCommittable>();
        ((Iterable)this.streamingCommitterState.get()).forEach(restored::add);
        this.streamingCommitterState.clear();
        this.commit(true, restored);
    }

    private void commit(boolean isRecover, List<ManifestCommittable> committables) throws Exception {
        if (isRecover) {
            if (!(committables = this.committer.filterRecoveredCommittables(committables)).isEmpty()) {
                this.committer.commit(committables);
                throw new RuntimeException("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
            }
        } else {
            this.committer.commit(committables);
        }
    }

    private ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs) throws Exception {
        return this.committer.combine(checkpoint, inputs);
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.pollInputs();
        this.streamingCommitterState.update(this.committables(this.committablesPerCheckpoint));
    }

    private List<ManifestCommittable> committables(NavigableMap<Long, ManifestCommittable> map) {
        return new ArrayList<ManifestCommittable>(map.values());
    }

    public void endInput() throws Exception {
        this.endInput = true;
        if (this.streamingCheckpointEnabled) {
            return;
        }
        this.pollInputs();
        this.commitUpToCheckpoint(Long.MAX_VALUE);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.commitUpToCheckpoint(this.endInput ? Long.MAX_VALUE : checkpointId);
    }

    private void commitUpToCheckpoint(long checkpointId) throws Exception {
        NavigableMap<Long, ManifestCommittable> headMap = this.committablesPerCheckpoint.headMap(checkpointId, true);
        this.commit(false, this.committables(headMap));
        headMap.clear();
    }

    public void processElement(StreamRecord<Committable> element) {
        this.output.collect(element);
        this.inputs.add((Committable)element.getValue());
    }

    public void close() throws Exception {
        this.committablesPerCheckpoint.clear();
        this.inputs.clear();
        super.close();
    }

    private void pollInputs() throws Exception {
        HashMap<Long, List> grouped = new HashMap<Long, List>();
        for (Committable committable : this.inputs) {
            grouped.computeIfAbsent(committable.checkpointId(), k -> new ArrayList()).add(committable);
        }
        for (Map.Entry entry : grouped.entrySet()) {
            Long cp = (Long)entry.getKey();
            List committables = (List)entry.getValue();
            if (this.committablesPerCheckpoint.containsKey(cp)) {
                throw new RuntimeException(String.format("Repeatedly commit the same checkpoint files. \nThe previous files is %s, \nand the subsequent files is %s", this.committablesPerCheckpoint.get(cp), committables));
            }
            this.committablesPerCheckpoint.put(cp, this.toCommittables(cp, committables));
        }
        this.inputs.clear();
    }
}

