package com.readboy.live.education.mq;

import com.google.android.exoplayer2.text.ttml.TtmlNode;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.readboy.live.education.AppConf;
import com.readboy.live.education.IDManager;
import com.readboy.live.education.extension.ReactiveXExtKt;
import com.readboy.live.education.fragment.LiveRoomFragment;
import com.readboy.live.education.mq.AMQPManager;
import com.readboy.live.education.util.AppHelper;
import com.readboy.live.education.util.FastSafeIterableMap;
import com.readboy.live.education.util.Helper;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.rxkotlin.SubscribersKt;
import io.reactivex.schedulers.Schedulers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.PropertyReference1Impl;
import kotlin.jvm.internal.Reflection;
import kotlin.reflect.KProperty;
import kotlin.text.Charsets;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import timber.log.Timber;

/* compiled from: AMQPManager.kt */
@Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0088\u0001\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000b\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\t\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0003\n\u0002\b\u0006\n\u0002\u0010\u0012\n\u0002\b\u000b\u0018\u0000 M2\u00020\u0001:\u0003LMNB\u0005¢\u0006\u0002\u0010\u0002J\u000e\u0010(\u001a\u00020\u001b2\u0006\u0010)\u001a\u00020\u001aJ0\u0010*\u001a\b\u0012\u0004\u0012\u00020\u00060+2\u0006\u0010,\u001a\u00020\u00052\u0006\u0010-\u001a\u00020\u00052\b\b\u0002\u0010.\u001a\u00020\u000f2\b\b\u0002\u0010/\u001a\u00020\u000fJ&\u00100\u001a\b\u0012\u0004\u0012\u0002010+2\u0006\u0010,\u001a\u00020\u00052\u0006\u0010-\u001a\u00020\u00052\u0006\u0010/\u001a\u00020\u000fH\u0002J\f\u00102\u001a\b\u0012\u0004\u0012\u00020\u001b0+J\u000e\u00103\u001a\b\u0012\u0004\u0012\u00020\u001b0+H\u0002J\u0016\u00104\u001a\b\u0012\u0004\u0012\u00020\u001b0+2\b\b\u0002\u00105\u001a\u00020\u000fJ\u000f\u00106\u001a\u0004\u0018\u00010\u001bH\u0002¢\u0006\u0002\u00107J$\u00108\u001a\u00020\u001b\"\u0004\b\u0000\u001092\f\u0010:\u001a\b\u0012\u0004\u0012\u0002H90;2\u0006\u0010<\u001a\u00020=H\u0002J)\u0010>\u001a\u00020\u001b\"\u0004\b\u0000\u001092\f\u0010:\u001a\b\u0012\u0004\u0012\u0002H90;2\u0006\u0010?\u001a\u0002H9H\u0002¢\u0006\u0002\u0010@J\b\u0010A\u001a\u00020\u0005H\u0002J\u0014\u0010B\u001a\b\u0012\u0004\u0012\u00020\u001b0+2\u0006\u0010C\u001a\u00020DJ\u001c\u0010E\u001a\b\u0012\u0004\u0012\u00020\u001b0+2\u0006\u0010-\u001a\u00020\u00052\u0006\u0010C\u001a\u00020DJ\u000e\u0010F\u001a\b\u0012\u0004\u0012\u00020\u001b0+H\u0002J\u000e\u0010G\u001a\u00020\u001b2\u0006\u0010)\u001a\u00020\u001aJ\b\u0010H\u001a\u00020\u001bH\u0002J\u001e\u0010I\u001a\b\u0012\u0004\u0012\u00020\u001b0+2\u0006\u0010,\u001a\u00020\u00052\b\b\u0002\u00105\u001a\u00020\u000fJ\u0016\u0010J\u001a\b\u0012\u0004\u0012\u00020\u00060+2\u0006\u0010K\u001a\u00020\u0006H\u0002R7\u0010\u0003\u001a\u001e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u00060\u0004j\u000e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u0006`\u00078BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\n\u0010\u000b\u001a\u0004\b\b\u0010\tR\u000e\u0010\f\u001a\u00020\rX\u0082\u0004¢\u0006\u0002\n\u0000R\u001e\u0010\u0010\u001a\u00020\u000f2\u0006\u0010\u000e\u001a\u00020\u000f@BX\u0086\u000e¢\u0006\b\n\u0000\u001a\u0004\b\u0010\u0010\u0011R\u001e\u0010\u0012\u001a\u00020\u000f2\u0006\u0010\u000e\u001a\u00020\u000f@BX\u0086\u000e¢\u0006\b\n\u0000\u001a\u0004\b\u0012\u0010\u0011R\u000e\u0010\u0013\u001a\u00020\u000fX\u0082\u000e¢\u0006\u0002\n\u0000R\u0010\u0010\u0014\u001a\u0004\u0018\u00010\u0015X\u0082\u000e¢\u0006\u0002\n\u0000R\u0010\u0010\u0016\u001a\u0004\u0018\u00010\u0017X\u0082\u000e¢\u0006\u0002\n\u0000R'\u0010\u0018\u001a\u000e\u0012\u0004\u0012\u00020\u001a\u0012\u0004\u0012\u00020\u001b0\u00198BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u001e\u0010\u000b\u001a\u0004\b\u001c\u0010\u001dR\u001b\u0010\u001f\u001a\u00020 8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b#\u0010\u000b\u001a\u0004\b!\u0010\"R6\u0010$\u001a*\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u00050&0%j\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0005\u0012\u0004\u0012\u00020\u00050&`'X\u0082\u0004¢\u0006\u0002\n\u0000¨\u0006O"}, d2 = {"Lcom/readboy/live/education/mq/AMQPManager;", "", "()V", "chanMap", "Ljava/util/HashMap;", "", "Lcom/readboy/live/education/mq/AMQPChannel;", "Lkotlin/collections/HashMap;", "getChanMap", "()Ljava/util/HashMap;", "chanMap$delegate", "Lkotlin/Lazy;", "connectionShutdownListener", "Lcom/rabbitmq/client/ShutdownListener;", "<set-?>", "", "isConnected", "()Z", "isConnecting", "isReconnecting", "keepAliveChecker", "Lio/reactivex/disposables/Disposable;", "mConnection", "Lcom/rabbitmq/client/Connection;", "mOnAMQPManagerListener", "Lcom/readboy/live/education/util/FastSafeIterableMap;", "Lcom/readboy/live/education/mq/AMQPManager$OnAMQPManagerListener;", "", "getMOnAMQPManagerListener", "()Lcom/readboy/live/education/util/FastSafeIterableMap;", "mOnAMQPManagerListener$delegate", "mScheduler", "Lio/reactivex/Scheduler;", "getMScheduler", "()Lio/reactivex/Scheduler;", "mScheduler$delegate", "pendingChans", "Ljava/util/ArrayList;", "Lkotlin/Pair;", "Lkotlin/collections/ArrayList;", "addOnAMQPManagerListener", "listener", "bindQueue", "Lio/reactivex/Observable;", "queueName", "routingKey", "ignoreExpiredRedeliver", "autoDeleted", "bindQueueInternal", "Lcom/rabbitmq/client/Channel;", "connect", "connectInternal", LiveRoomFragment.ACTION_DISCONNECT, "force", "disposeKeepAliveChecker", "()Lkotlin/Unit;", "emitError", "T", "emitter", "Lio/reactivex/ObservableEmitter;", "e", "", "emitSuccess", "item", "(Lio/reactivex/ObservableEmitter;Ljava/lang/Object;)V", "getConnectionName", "publishInteractiveMessage", IDManager.ACTION_MESSAGE, "", "publishMessage", "reconnect", "removeOnAMQPManagerListener", "scheduleKeepAliveChecker", "unbindQueue", "unbindQueueInternal", "chan", "AMQPConsumer", "Companion", "OnAMQPManagerListener", "Education_release"}, k = 1, mv = {1, 1, 13})
/* loaded from: classes2.dex */
public final class AMQPManager {
    static final /* synthetic */ KProperty[] $$delegatedProperties = {Reflection.property1(new PropertyReference1Impl(Reflection.getOrCreateKotlinClass(AMQPManager.class), "mScheduler", "getMScheduler()Lio/reactivex/Scheduler;")), Reflection.property1(new PropertyReference1Impl(Reflection.getOrCreateKotlinClass(AMQPManager.class), "chanMap", "getChanMap()Ljava/util/HashMap;")), Reflection.property1(new PropertyReference1Impl(Reflection.getOrCreateKotlinClass(AMQPManager.class), "mOnAMQPManagerListener", "getMOnAMQPManagerListener()Lcom/readboy/live/education/util/FastSafeIterableMap;"))};

    /* renamed from: Companion, reason: from kotlin metadata */
    public static final Companion INSTANCE = new Companion(null);
    private static final Lazy DEFAULT_CONNECTION_FACTORY$delegate = LazyKt.lazy(new Function0<ConnectionFactory>() { // from class: com.readboy.live.education.mq.AMQPManager$Companion$DEFAULT_CONNECTION_FACTORY$2
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // kotlin.jvm.functions.Function0
        @NotNull
        public final ConnectionFactory invoke() {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(AppConf.INSTANCE.getRabbitMQDomain());
            connectionFactory.setUsername(AppConf.INSTANCE.getRabbitMQUsername());
            connectionFactory.setPassword(AppConf.INSTANCE.getRabbitMQPassword());
            connectionFactory.setPort(AppConf.INSTANCE.getRabbitMQPort());
            connectionFactory.setVirtualHost(AppConf.INSTANCE.getRabbitMQVHost());
            connectionFactory.setAutomaticRecoveryEnabled(false);
            connectionFactory.setRequestedHeartbeat(15);
            return connectionFactory;
        }
    });
    private static final long KEEP_ALIVE_CHECK_INTERVAL = 10;
    private static final String ROUTING_KEY_INTERACTIVE = "interactive";
    private boolean isConnected;
    private boolean isConnecting;
    private boolean isReconnecting;
    private Disposable keepAliveChecker;
    private Connection mConnection;

    /* renamed from: mScheduler$delegate, reason: from kotlin metadata */
    private final Lazy mScheduler = LazyKt.lazy(new Function0<Scheduler>() { // from class: com.readboy.live.education.mq.AMQPManager$mScheduler$2
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // kotlin.jvm.functions.Function0
        @NotNull
        public final Scheduler invoke() {
            return Schedulers.newThread();
        }
    });

    /* renamed from: chanMap$delegate, reason: from kotlin metadata */
    private final Lazy chanMap = LazyKt.lazy(new Function0<HashMap<String, AMQPChannel>>() { // from class: com.readboy.live.education.mq.AMQPManager$chanMap$2
        @Override // kotlin.jvm.functions.Function0
        @NotNull
        public final HashMap<String, AMQPChannel> invoke() {
            return new HashMap<>();
        }
    });
    private final ArrayList<Pair<String, String>> pendingChans = new ArrayList<>();

    /* renamed from: mOnAMQPManagerListener$delegate, reason: from kotlin metadata */
    private final Lazy mOnAMQPManagerListener = LazyKt.lazy(new Function0<FastSafeIterableMap<OnAMQPManagerListener, Unit>>() { // from class: com.readboy.live.education.mq.AMQPManager$mOnAMQPManagerListener$2
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // kotlin.jvm.functions.Function0
        @NotNull
        public final FastSafeIterableMap<AMQPManager.OnAMQPManagerListener, Unit> invoke() {
            return new FastSafeIterableMap<>();
        }
    });
    private final ShutdownListener connectionShutdownListener = new ShutdownListener() { // from class: com.readboy.live.education.mq.AMQPManager$connectionShutdownListener$1
        /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
        @Override // com.rabbitmq.client.ShutdownListener
        public final void shutdownCompleted(ShutdownSignalException it) {
            FastSafeIterableMap mOnAMQPManagerListener;
            StringBuilder sb = new StringBuilder();
            sb.append("shutdown by connection's error: ");
            Intrinsics.checkExpressionValueIsNotNull(it, "it");
            sb.append(it.isHardError());
            sb.append(", by application action:");
            sb.append(it.isInitiatedByApplication());
            sb.append(", reason: ");
            sb.append(it.getReason());
            Timber.e(sb.toString(), new Object[0]);
            synchronized (AMQPManager.this) {
                mOnAMQPManagerListener = AMQPManager.this.getMOnAMQPManagerListener();
                Iterator<Map.Entry<K, V>> it2 = mOnAMQPManagerListener.iterator();
                while (it2.hasNext()) {
                    ((AMQPManager.OnAMQPManagerListener) ((Map.Entry) it2.next()).getKey()).onShutdown();
                }
                Unit unit = Unit.INSTANCE;
            }
            Observable concatMap = AMQPManager.disconnect$default(AMQPManager.this, false, 1, null).delay(5L, TimeUnit.SECONDS).concatMap(new Function<T, ObservableSource<? extends R>>() { // from class: com.readboy.live.education.mq.AMQPManager$connectionShutdownListener$1.2
                @Override // io.reactivex.functions.Function
                public final Observable<Unit> apply(@NotNull Unit it3) {
                    Observable<Unit> reconnect;
                    Intrinsics.checkParameterIsNotNull(it3, "it");
                    if (!AppHelper.INSTANCE.getAppInForeground()) {
                        return Observable.just(Unit.INSTANCE);
                    }
                    reconnect = AMQPManager.this.reconnect();
                    return reconnect;
                }
            });
            Intrinsics.checkExpressionValueIsNotNull(concatMap, "disconnect().delay(5, Ti…)\n            }\n        }");
            SubscribersKt.subscribeBy$default(concatMap, new Function1<Throwable, Unit>() { // from class: com.readboy.live.education.mq.AMQPManager$connectionShutdownListener$1.3
                @Override // kotlin.jvm.functions.Function1
                public /* bridge */ /* synthetic */ Unit invoke(Throwable th) {
                    invoke2(th);
                    return Unit.INSTANCE;
                }

                /* renamed from: invoke, reason: avoid collision after fix types in other method */
                public final void invoke2(@NotNull Throwable it3) {
                    Intrinsics.checkParameterIsNotNull(it3, "it");
                    Timber.e("re-connect error: " + it3.getLocalizedMessage(), new Object[0]);
                }
            }, new Function0<Unit>() { // from class: com.readboy.live.education.mq.AMQPManager$connectionShutdownListener$1.4
                @Override // kotlin.jvm.functions.Function0
                public /* bridge */ /* synthetic */ Unit invoke() {
                    invoke2();
                    return Unit.INSTANCE;
                }

                /* renamed from: invoke, reason: avoid collision after fix types in other method */
                public final void invoke2() {
                }
            }, (Function1) null, 4, (Object) null);
        }
    };

    /* compiled from: AMQPManager.kt */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u00006\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0012\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0010\u000e\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\b\u0002\u0018\u00002\u00020\u0001B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\u0014\u0010\u0007\u001a\u0004\u0018\u00010\b2\b\u0010\t\u001a\u0004\u0018\u00010\nH\u0002J,\u0010\u000b\u001a\u00020\f2\u0006\u0010\r\u001a\u00020\u000e2\u0006\u0010\u000f\u001a\u00020\u00102\b\u0010\u0011\u001a\u0004\u0018\u00010\u00122\b\u0010\t\u001a\u0004\u0018\u00010\nH\u0016R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n\u0000\u001a\u0004\b\u0005\u0010\u0006¨\u0006\u0013"}, d2 = {"Lcom/readboy/live/education/mq/AMQPManager$AMQPConsumer;", "Lcom/rabbitmq/client/DefaultConsumer;", "chan", "Lcom/readboy/live/education/mq/AMQPChannel;", "(Lcom/readboy/live/education/mq/AMQPChannel;)V", "getChan", "()Lcom/readboy/live/education/mq/AMQPChannel;", "formatMessage", "Lcom/readboy/live/education/mq/LiveCmd;", TtmlNode.TAG_BODY, "", "handleDelivery", "", "consumerTag", "", "envelope", "Lcom/rabbitmq/client/Envelope;", "properties", "Lcom/rabbitmq/client/AMQP$BasicProperties;", "Education_release"}, k = 1, mv = {1, 1, 13})
    /* loaded from: classes2.dex */
    public static final class AMQPConsumer extends DefaultConsumer {

        @NotNull
        private final AMQPChannel chan;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public AMQPConsumer(@NotNull AMQPChannel chan) {
            super(chan.getChannel());
            Intrinsics.checkParameterIsNotNull(chan, "chan");
            this.chan = chan;
        }

        private final LiveCmd formatMessage(byte[] r4) {
            if (r4 == null) {
                return null;
            }
            try {
                return (LiveCmd) new Gson().fromJson(new String(r4, Charsets.UTF_8), LiveCmd.class);
            } catch (JsonSyntaxException e) {
                Timber.e("unable to format body: " + e.getLocalizedMessage(), new Object[0]);
                return null;
            }
        }

        @NotNull
        public final AMQPChannel getChan() {
            return this.chan;
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleDelivery(@NotNull String consumerTag, @NotNull Envelope envelope, @Nullable AMQP.BasicProperties properties, @Nullable byte[] r9) {
            Intrinsics.checkParameterIsNotNull(consumerTag, "consumerTag");
            Intrinsics.checkParameterIsNotNull(envelope, "envelope");
            StringBuilder sb = new StringBuilder();
            sb.append("handleDelivery====> consumerTag: ");
            sb.append(consumerTag);
            sb.append(", envelope: ");
            sb.append(envelope);
            sb.append(", body: ");
            sb.append(r9 != null ? new String(r9, Charsets.UTF_8) : "");
            Timber.d(sb.toString(), new Object[0]);
            AMQPChannel aMQPChannel = this.chan;
            LiveCmd formatMessage = formatMessage(r9);
            if (formatMessage != null) {
                Timber.d("ignoreExpiredRedeliver: " + aMQPChannel.getIgnoreExpiredRedeliver() + " lastDeliverTime: " + aMQPChannel.getLastDeliverTime() + " time: " + formatMessage.getTime(), new Object[0]);
                if (envelope.isRedeliver() && aMQPChannel.getIgnoreExpiredRedeliver() && aMQPChannel.getLastDeliverTime() > formatMessage.getTime()) {
                    Timber.i("drop the redeliver message because ignoreExpiredRedeliver is on", new Object[0]);
                } else {
                    aMQPChannel.setLastDeliverTime(formatMessage.getTime());
                    aMQPChannel.getMsgPublisher().onNext(formatMessage);
                }
            }
            if (aMQPChannel.getChannel().isOpen()) {
                aMQPChannel.getChannel().basicAck(envelope.getDeliveryTag(), false);
                Timber.d("basicAck", new Object[0]);
            }
        }
    }

    /* compiled from: AMQPManager.kt */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000 \n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\t\n\u0000\n\u0002\u0010\u000e\n\u0000\b\u0086\u0003\u0018\u00002\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u001b\u0010\u0003\u001a\u00020\u00048BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u0007\u0010\b\u001a\u0004\b\u0005\u0010\u0006R\u000e\u0010\t\u001a\u00020\nX\u0082T¢\u0006\u0002\n\u0000R\u000e\u0010\u000b\u001a\u00020\fX\u0082T¢\u0006\u0002\n\u0000¨\u0006\r"}, d2 = {"Lcom/readboy/live/education/mq/AMQPManager$Companion;", "", "()V", "DEFAULT_CONNECTION_FACTORY", "Lcom/rabbitmq/client/ConnectionFactory;", "getDEFAULT_CONNECTION_FACTORY", "()Lcom/rabbitmq/client/ConnectionFactory;", "DEFAULT_CONNECTION_FACTORY$delegate", "Lkotlin/Lazy;", "KEEP_ALIVE_CHECK_INTERVAL", "", "ROUTING_KEY_INTERACTIVE", "", "Education_release"}, k = 1, mv = {1, 1, 13})
    /* loaded from: classes2.dex */
    public static final class Companion {
        static final /* synthetic */ KProperty[] $$delegatedProperties = {Reflection.property1(new PropertyReference1Impl(Reflection.getOrCreateKotlinClass(Companion.class), "DEFAULT_CONNECTION_FACTORY", "getDEFAULT_CONNECTION_FACTORY()Lcom/rabbitmq/client/ConnectionFactory;"))};

        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }

        public final ConnectionFactory getDEFAULT_CONNECTION_FACTORY() {
            Lazy lazy = AMQPManager.DEFAULT_CONNECTION_FACTORY$delegate;
            Companion companion = AMQPManager.INSTANCE;
            KProperty kProperty = $$delegatedProperties[0];
            return (ConnectionFactory) lazy.getValue();
        }
    }

    /* compiled from: AMQPManager.kt */
    @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u0018\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0010\b\n\u0002\b\u0002\bf\u0018\u00002\u00020\u0001J\u0010\u0010\u0002\u001a\u00020\u00032\u0006\u0010\u0004\u001a\u00020\u0005H&J\b\u0010\u0006\u001a\u00020\u0003H&¨\u0006\u0007"}, d2 = {"Lcom/readboy/live/education/mq/AMQPManager$OnAMQPManagerListener;", "", "onConnected", "", "heartbeatFrame", "", "onShutdown", "Education_release"}, k = 1, mv = {1, 1, 13})
    /* loaded from: classes2.dex */
    public interface OnAMQPManagerListener {
        void onConnected(int heartbeatFrame);

        void onShutdown();
    }

    @NotNull
    public static /* synthetic */ Observable bindQueue$default(AMQPManager aMQPManager, String str, String str2, boolean z, boolean z2, int i, Object obj) {
        if ((i & 4) != 0) {
            z = false;
        }
        if ((i & 8) != 0) {
            z2 = false;
        }
        return aMQPManager.bindQueue(str, str2, z, z2);
    }

    public final Observable<Channel> bindQueueInternal(final String queueName, final String routingKey, final boolean autoDeleted) {
        Observable<Channel> create = Observable.create(new ObservableOnSubscribe<T>() { // from class: com.readboy.live.education.mq.AMQPManager$bindQueueInternal$1
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(@NotNull ObservableEmitter<Channel> emitter) {
                Connection connection;
                Intrinsics.checkParameterIsNotNull(emitter, "emitter");
                Timber.i("bind queue...", new Object[0]);
                try {
                    connection = AMQPManager.this.mConnection;
                    if (connection == null) {
                        Intrinsics.throwNpe();
                    }
                    Channel createChannel = connection.createChannel();
                    createChannel.queueDeclare(queueName, true, false, autoDeleted, null);
                    Timber.i("queueDeclare", new Object[0]);
                    createChannel.queueBind(queueName, AppConf.INSTANCE.getRabbitMQExchange(), routingKey);
                    Timber.i("queueBind", new Object[0]);
                    createChannel.basicQos(0, 1, false);
                    Timber.i("basicQos", new Object[0]);
                    Timber.i("bind queue success", new Object[0]);
                    Timber.d("bind queue(" + queueName + ") to exchange.", new Object[0]);
                    AMQPManager.this.emitSuccess(emitter, createChannel);
                } catch (AlreadyClosedException e) {
                    AMQPManager.this.emitError(emitter, e);
                } catch (IOException e2) {
                    AMQPManager.this.emitError(emitter, e2);
                }
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(create, "Observable.create<Channe…itter, e)\n        }\n    }");
        return create;
    }

    private final Observable<Unit> connectInternal() {
        Observable<Unit> subscribeOn = Observable.create(new ObservableOnSubscribe<T>() { // from class: com.readboy.live.education.mq.AMQPManager$connectInternal$1
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(@NotNull ObservableEmitter<Unit> emitter) {
                ConnectionFactory default_connection_factory;
                String connectionName;
                ShutdownListener shutdownListener;
                Connection connection;
                Intrinsics.checkParameterIsNotNull(emitter, "emitter");
                Timber.i("connectInternal to amqp server...", new Object[0]);
                AMQPManager.this.isConnecting = true;
                try {
                    try {
                        AMQPManager aMQPManager = AMQPManager.this;
                        default_connection_factory = AMQPManager.INSTANCE.getDEFAULT_CONNECTION_FACTORY();
                        connectionName = AMQPManager.this.getConnectionName();
                        Connection newConnection = default_connection_factory.newConnection(connectionName);
                        shutdownListener = AMQPManager.this.connectionShutdownListener;
                        newConnection.addShutdownListener(shutdownListener);
                        aMQPManager.mConnection = newConnection;
                        AMQPManager.this.isConnected = true;
                        StringBuilder sb = new StringBuilder();
                        sb.append("create connection(");
                        connection = AMQPManager.this.mConnection;
                        sb.append(connection != null ? connection.getClientProvidedName() : null);
                        sb.append(") to amqp server.");
                        Timber.d(sb.toString(), new Object[0]);
                        Timber.i("create connection success.", new Object[0]);
                        AMQPManager.this.scheduleKeepAliveChecker();
                        AMQPManager.this.emitSuccess(emitter, Unit.INSTANCE);
                    } catch (IOException e) {
                        Timber.i("create connection fail with IOException: " + e.getLocalizedMessage(), new Object[0]);
                        AMQPManager.this.emitError(emitter, e);
                    } catch (TimeoutException e2) {
                        Timber.i("create connection fail with TimeoutException: " + e2.getLocalizedMessage(), new Object[0]);
                        AMQPManager.this.emitError(emitter, e2);
                    }
                } finally {
                    AMQPManager.this.isConnecting = false;
                }
            }
        }).subscribeOn(getMScheduler());
        Intrinsics.checkExpressionValueIsNotNull(subscribeOn, "Observable.create<Unit> …}.subscribeOn(mScheduler)");
        return subscribeOn;
    }

    @NotNull
    public static /* synthetic */ Observable disconnect$default(AMQPManager aMQPManager, boolean z, int i, Object obj) {
        if ((i & 1) != 0) {
            z = false;
        }
        return aMQPManager.disconnect(z);
    }

    public final Unit disposeKeepAliveChecker() {
        Disposable disposable = this.keepAliveChecker;
        if (disposable == null) {
            return null;
        }
        ReactiveXExtKt.disposeIfNot(disposable);
        return Unit.INSTANCE;
    }

    public final <T> void emitError(ObservableEmitter<T> emitter, Throwable e) {
        if (emitter.isDisposed()) {
            return;
        }
        emitter.onError(e);
    }

    public final <T> void emitSuccess(ObservableEmitter<T> emitter, T item) {
        if (emitter.isDisposed()) {
            return;
        }
        emitter.onNext(item);
        emitter.onComplete();
    }

    public final HashMap<String, AMQPChannel> getChanMap() {
        Lazy lazy = this.chanMap;
        KProperty kProperty = $$delegatedProperties[1];
        return (HashMap) lazy.getValue();
    }

    public final String getConnectionName() {
        return "amqp-client-" + Helper.INSTANCE.getUUID();
    }

    public final FastSafeIterableMap<OnAMQPManagerListener, Unit> getMOnAMQPManagerListener() {
        Lazy lazy = this.mOnAMQPManagerListener;
        KProperty kProperty = $$delegatedProperties[2];
        return (FastSafeIterableMap) lazy.getValue();
    }

    private final Scheduler getMScheduler() {
        Lazy lazy = this.mScheduler;
        KProperty kProperty = $$delegatedProperties[0];
        return (Scheduler) lazy.getValue();
    }

    public final Observable<Unit> reconnect() {
        Connection connection = this.mConnection;
        if (connection != null && ((connection == null || connection.isOpen()) && this.isConnected)) {
            Observable<Unit> just = Observable.just(Unit.INSTANCE);
            Intrinsics.checkExpressionValueIsNotNull(just, "Observable.just(Unit)");
            return just;
        }
        Timber.i("begin to reconnect to amqp server", new Object[0]);
        this.isReconnecting = true;
        Observable<Unit> doOnError = connect().concatMap((Function) new Function<T, ObservableSource<? extends R>>() { // from class: com.readboy.live.education.mq.AMQPManager$reconnect$1
            @Override // io.reactivex.functions.Function
            public final Observable<Unit> apply(@NotNull Unit it) {
                HashMap chanMap;
                Intrinsics.checkParameterIsNotNull(it, "it");
                chanMap = AMQPManager.this.getChanMap();
                return Observable.fromIterable(chanMap.entrySet()).concatMap(new Function<T, ObservableSource<? extends R>>() { // from class: com.readboy.live.education.mq.AMQPManager$reconnect$1.1
                    @Override // io.reactivex.functions.Function
                    @NotNull
                    public final Observable<AMQPChannel> apply(@NotNull Map.Entry<String, AMQPChannel> entry) {
                        Intrinsics.checkParameterIsNotNull(entry, "<name for destructuring parameter 0>");
                        String key = entry.getKey();
                        AMQPChannel value = entry.getValue();
                        return AMQPManager.bindQueue$default(AMQPManager.this, key, value.getRoutingKey(), false, value.getAutoDelete(), 4, null);
                    }
                }).map(new Function<T, R>() { // from class: com.readboy.live.education.mq.AMQPManager$reconnect$1.2
                    @Override // io.reactivex.functions.Function
                    public /* bridge */ /* synthetic */ Object apply(Object obj) {
                        apply((AMQPChannel) obj);
                        return Unit.INSTANCE;
                    }

                    public final void apply(@NotNull AMQPChannel it2) {
                        Intrinsics.checkParameterIsNotNull(it2, "it");
                    }
                });
            }
        }).doOnComplete(new Action() { // from class: com.readboy.live.education.mq.AMQPManager$reconnect$2
            @Override // io.reactivex.functions.Action
            public final void run() {
                Timber.i("reconnect completed.", new Object[0]);
                AMQPManager.this.isReconnecting = false;
            }
        }).doOnError(new Consumer<Throwable>() { // from class: com.readboy.live.education.mq.AMQPManager$reconnect$3
            @Override // io.reactivex.functions.Consumer
            public final void accept(Throwable th) {
                Timber.i("reconnect error", new Object[0]);
                AMQPManager.this.isReconnecting = false;
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(doOnError, "connect().concatMap {\n  …ing = false\n            }");
        return doOnError;
    }

    public final void scheduleKeepAliveChecker() {
        Timber.i("schedule a checker for keeping connection connectInternal state", new Object[0]);
        disposeKeepAliveChecker();
        Observable<Long> interval = Observable.interval(KEEP_ALIVE_CHECK_INTERVAL, TimeUnit.SECONDS);
        Intrinsics.checkExpressionValueIsNotNull(interval, "Observable.interval(KEEP…TERVAL, TimeUnit.SECONDS)");
        this.keepAliveChecker = SubscribersKt.subscribeBy$default(interval, (Function1) null, (Function0) null, new Function1<Long, Unit>() { // from class: com.readboy.live.education.mq.AMQPManager$scheduleKeepAliveChecker$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Unit invoke(Long l) {
                invoke2(l);
                return Unit.INSTANCE;
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(Long l) {
                boolean z;
                Observable reconnect;
                z = AMQPManager.this.isReconnecting;
                if (z || !AppHelper.INSTANCE.getAppInForeground()) {
                    return;
                }
                reconnect = AMQPManager.this.reconnect();
                SubscribersKt.subscribeBy$default(reconnect, new Function1<Throwable, Unit>() { // from class: com.readboy.live.education.mq.AMQPManager$scheduleKeepAliveChecker$1.1
                    @Override // kotlin.jvm.functions.Function1
                    public /* bridge */ /* synthetic */ Unit invoke(Throwable th) {
                        invoke2(th);
                        return Unit.INSTANCE;
                    }

                    /* renamed from: invoke, reason: avoid collision after fix types in other method */
                    public final void invoke2(@NotNull Throwable it) {
                        Intrinsics.checkParameterIsNotNull(it, "it");
                        Timber.e("re-connect error: " + it.getLocalizedMessage(), new Object[0]);
                    }
                }, new Function0<Unit>() { // from class: com.readboy.live.education.mq.AMQPManager$scheduleKeepAliveChecker$1.2
                    @Override // kotlin.jvm.functions.Function0
                    public /* bridge */ /* synthetic */ Unit invoke() {
                        invoke2();
                        return Unit.INSTANCE;
                    }

                    /* renamed from: invoke, reason: avoid collision after fix types in other method */
                    public final void invoke2() {
                    }
                }, (Function1) null, 4, (Object) null);
            }
        }, 3, (Object) null);
    }

    @NotNull
    public static /* synthetic */ Observable unbindQueue$default(AMQPManager aMQPManager, String str, boolean z, int i, Object obj) {
        if ((i & 2) != 0) {
            z = true;
        }
        return aMQPManager.unbindQueue(str, z);
    }

    private final Observable<AMQPChannel> unbindQueueInternal(final AMQPChannel chan) {
        Observable<AMQPChannel> subscribeOn = Observable.create(new ObservableOnSubscribe<T>() { // from class: com.readboy.live.education.mq.AMQPManager$unbindQueueInternal$1
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(@NotNull ObservableEmitter<AMQPChannel> emitter) {
                Intrinsics.checkParameterIsNotNull(emitter, "emitter");
                Timber.i("remove channel from connection...", new Object[0]);
                try {
                    AMQPChannel aMQPChannel = chan;
                    aMQPChannel.getChannel().queueUnbind(aMQPChannel.getQueueName(), AppConf.INSTANCE.getRabbitMQExchange(), aMQPChannel.getRoutingKey());
                    Timber.d("unbind queue(" + chan.getQueueName() + ") from current connection.", new Object[0]);
                    aMQPChannel.getChannel().close();
                    Timber.i("close channel from connection success.", new Object[0]);
                    AMQPManager.this.emitSuccess(emitter, chan);
                } catch (AlreadyClosedException unused) {
                    AMQPManager.this.emitSuccess(emitter, chan);
                } catch (IOException e) {
                    AMQPManager.this.emitError(emitter, e);
                } catch (TimeoutException e2) {
                    AMQPManager.this.emitError(emitter, e2);
                }
            }
        }).subscribeOn(getMScheduler());
        Intrinsics.checkExpressionValueIsNotNull(subscribeOn, "Observable.create<AMQPCh…}.subscribeOn(mScheduler)");
        return subscribeOn;
    }

    public final void addOnAMQPManagerListener(@NotNull OnAMQPManagerListener listener) {
        Intrinsics.checkParameterIsNotNull(listener, "listener");
        synchronized (this) {
            getMOnAMQPManagerListener().putIfAbsent(listener, Unit.INSTANCE);
        }
    }

    @NotNull
    public final Observable<AMQPChannel> bindQueue(@NotNull final String queueName, @NotNull final String routingKey, final boolean ignoreExpiredRedeliver, final boolean autoDeleted) {
        Intrinsics.checkParameterIsNotNull(queueName, "queueName");
        Intrinsics.checkParameterIsNotNull(routingKey, "routingKey");
        Observable<AMQPChannel> subscribeOn = unbindQueue(queueName, false).concatMap((Function) new Function<T, ObservableSource<? extends R>>() { // from class: com.readboy.live.education.mq.AMQPManager$bindQueue$1
            @Override // io.reactivex.functions.Function
            public final Observable<Unit> apply(@NotNull Unit it) {
                Connection connection;
                ArrayList arrayList;
                Intrinsics.checkParameterIsNotNull(it, "it");
                connection = AMQPManager.this.mConnection;
                if (connection != null) {
                    return Observable.just(Unit.INSTANCE);
                }
                Timber.i("add a pending channel", new Object[0]);
                arrayList = AMQPManager.this.pendingChans;
                arrayList.add(new Pair(queueName, routingKey));
                return Observable.error(new AMQPNotConnectException());
            }
        }).concatMap(new Function<T, ObservableSource<? extends R>>() { // from class: com.readboy.live.education.mq.AMQPManager$bindQueue$2
            @Override // io.reactivex.functions.Function
            @NotNull
            public final Observable<Channel> apply(@NotNull Unit it) {
                Observable<Channel> bindQueueInternal;
                Intrinsics.checkParameterIsNotNull(it, "it");
                bindQueueInternal = AMQPManager.this.bindQueueInternal(queueName, routingKey, autoDeleted);
                return bindQueueInternal;
            }
        }).map(new Function<T, R>() { // from class: com.readboy.live.education.mq.AMQPManager$bindQueue$3
            @Override // io.reactivex.functions.Function
            @NotNull
            public final AMQPChannel apply(@NotNull Channel it) {
                HashMap chanMap;
                AMQPChannel aMQPChannel;
                HashMap chanMap2;
                HashMap chanMap3;
                Intrinsics.checkParameterIsNotNull(it, "it");
                Timber.d("new bind chanel(queueName: " + queueName + ", routingKey: " + routingKey + ").", new Object[0]);
                chanMap = AMQPManager.this.getChanMap();
                if (chanMap.get(queueName) != null) {
                    chanMap3 = AMQPManager.this.getChanMap();
                    Object obj = chanMap3.get(queueName);
                    if (obj == null) {
                        Intrinsics.throwNpe();
                    }
                    aMQPChannel = (AMQPChannel) obj;
                    aMQPChannel.setChannel(it);
                    aMQPChannel.setRoutingKey(routingKey);
                } else {
                    aMQPChannel = new AMQPChannel(queueName, routingKey, it, ignoreExpiredRedeliver, autoDeleted);
                    chanMap2 = AMQPManager.this.getChanMap();
                    chanMap2.put(queueName, aMQPChannel);
                }
                aMQPChannel.getChannel().basicConsume(queueName, false, new AMQPManager.AMQPConsumer(aMQPChannel));
                Timber.i("basicConsume", new Object[0]);
                return aMQPChannel;
            }
        }).subscribeOn(getMScheduler());
        Intrinsics.checkExpressionValueIsNotNull(subscribeOn, "unbindQueue(queueName, f…}.subscribeOn(mScheduler)");
        return subscribeOn;
    }

    @NotNull
    public final Observable<Unit> connect() {
        Observable<Unit> doOnComplete = (this.isConnecting ? Observable.error(new AMQPConnectingException()) : connectInternal().retry(3L).subscribeOn(getMScheduler())).doOnComplete(new Action() { // from class: com.readboy.live.education.mq.AMQPManager$connect$1
            /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
            @Override // io.reactivex.functions.Action
            public final void run() {
                FastSafeIterableMap mOnAMQPManagerListener;
                ConnectionFactory default_connection_factory;
                synchronized (AMQPManager.this) {
                    mOnAMQPManagerListener = AMQPManager.this.getMOnAMQPManagerListener();
                    Iterator<Map.Entry<K, V>> it = mOnAMQPManagerListener.iterator();
                    while (it.hasNext()) {
                        AMQPManager.OnAMQPManagerListener onAMQPManagerListener = (AMQPManager.OnAMQPManagerListener) ((Map.Entry) it.next()).getKey();
                        default_connection_factory = AMQPManager.INSTANCE.getDEFAULT_CONNECTION_FACTORY();
                        onAMQPManagerListener.onConnected(default_connection_factory.getRequestedHeartbeat());
                    }
                    Unit unit = Unit.INSTANCE;
                }
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(doOnComplete, "if (isConnecting) {\n    …}\n            }\n        }");
        return doOnComplete;
    }

    @NotNull
    public final Observable<Unit> disconnect(final boolean z) {
        Observable<Unit> subscribeOn = Observable.create(new ObservableOnSubscribe<T>() { // from class: com.readboy.live.education.mq.AMQPManager$disconnect$1
            /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(@NotNull ObservableEmitter<Unit> emitter) {
                ArrayList arrayList;
                ArrayList arrayList2;
                ArrayList arrayList3;
                HashMap chanMap;
                HashMap chanMap2;
                Connection connection;
                ArrayList arrayList4;
                ShutdownListener shutdownListener;
                Intrinsics.checkParameterIsNotNull(emitter, "emitter");
                Timber.i("disconnect the connection of amqp server...", new Object[0]);
                try {
                    try {
                        connection = AMQPManager.this.mConnection;
                        if (connection != null) {
                            shutdownListener = AMQPManager.this.connectionShutdownListener;
                            connection.removeShutdownListener(shutdownListener);
                            connection.close();
                        }
                        Timber.d("disconnect connection success.", new Object[0]);
                        AMQPManager.this.disposeKeepAliveChecker();
                        AMQPManager.this.emitSuccess(emitter, Unit.INSTANCE);
                        AMQPManager.this.mConnection = (Connection) null;
                        AMQPManager.this.isConnected = false;
                        arrayList4 = AMQPManager.this.pendingChans;
                        arrayList4.clear();
                        if (!z) {
                            return;
                        }
                    } catch (AlreadyClosedException unused) {
                        Timber.i("disconnection the connection which is already closed.", new Object[0]);
                        AMQPManager.this.emitSuccess(emitter, Unit.INSTANCE);
                        AMQPManager.this.mConnection = (Connection) null;
                        AMQPManager.this.isConnected = false;
                        arrayList2 = AMQPManager.this.pendingChans;
                        arrayList2.clear();
                        if (!z) {
                            return;
                        }
                    } catch (IOException e) {
                        Timber.e("disconnect connection fail with IOException: " + e.getLocalizedMessage(), new Object[0]);
                        AMQPManager.this.emitError(emitter, e);
                        AMQPManager.this.mConnection = (Connection) null;
                        AMQPManager.this.isConnected = false;
                        arrayList = AMQPManager.this.pendingChans;
                        arrayList.clear();
                        if (!z) {
                            return;
                        }
                    }
                    chanMap2 = AMQPManager.this.getChanMap();
                    chanMap2.clear();
                } catch (Throwable th) {
                    AMQPManager.this.mConnection = (Connection) null;
                    AMQPManager.this.isConnected = false;
                    arrayList3 = AMQPManager.this.pendingChans;
                    arrayList3.clear();
                    if (z) {
                        chanMap = AMQPManager.this.getChanMap();
                        chanMap.clear();
                    }
                    throw th;
                }
            }
        }).subscribeOn(getMScheduler());
        Intrinsics.checkExpressionValueIsNotNull(subscribeOn, "Observable.create<Unit> …}.subscribeOn(mScheduler)");
        return subscribeOn;
    }

    /* renamed from: isConnected, reason: from getter */
    public final boolean getIsConnected() {
        return this.isConnected;
    }

    /* renamed from: isConnecting, reason: from getter */
    public final boolean getIsConnecting() {
        return this.isConnecting;
    }

    @NotNull
    public final Observable<Unit> publishInteractiveMessage(@NotNull byte[] r2) {
        Intrinsics.checkParameterIsNotNull(r2, "message");
        return publishMessage(ROUTING_KEY_INTERACTIVE, r2);
    }

    @NotNull
    public final Observable<Unit> publishMessage(@NotNull final String routingKey, @NotNull final byte[] r3) {
        Intrinsics.checkParameterIsNotNull(routingKey, "routingKey");
        Intrinsics.checkParameterIsNotNull(r3, "message");
        Observable<Unit> subscribeOn = Observable.create(new ObservableOnSubscribe<T>() { // from class: com.readboy.live.education.mq.AMQPManager$publishMessage$1
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(@NotNull ObservableEmitter<Unit> emitter) {
                Connection connection;
                Connection connection2;
                Intrinsics.checkParameterIsNotNull(emitter, "emitter");
                Timber.d(TtmlNode.START, new Object[0]);
                connection = AMQPManager.this.mConnection;
                if (connection == null) {
                    AMQPManager.this.emitError(emitter, new AMQPConnectingException());
                    return;
                }
                connection2 = AMQPManager.this.mConnection;
                if (connection2 == null) {
                    Intrinsics.throwNpe();
                }
                Channel createChannel = connection2.createChannel();
                Timber.d("create tmp channel", new Object[0]);
                try {
                    try {
                        createChannel.basicPublish(AppConf.INSTANCE.getRabbitMQExchange(), routingKey, new AMQP.BasicProperties(), r3);
                        Timber.d("basic publish", new Object[0]);
                        try {
                            createChannel.close();
                        } catch (IOException | TimeoutException unused) {
                        }
                        AMQPManager.this.emitSuccess(emitter, Unit.INSTANCE);
                        Timber.i("publish message success", new Object[0]);
                    } catch (IOException e) {
                        Timber.e("basic publish exception: " + e, new Object[0]);
                        AMQPManager.this.emitError(emitter, e);
                        try {
                            createChannel.close();
                        } catch (IOException | TimeoutException unused2) {
                        }
                    }
                } catch (Throwable th) {
                    try {
                        createChannel.close();
                    } catch (IOException | TimeoutException unused3) {
                    }
                    throw th;
                }
            }
        }).subscribeOn(Schedulers.io());
        Intrinsics.checkExpressionValueIsNotNull(subscribeOn, "Observable.create<Unit> …scribeOn(Schedulers.io())");
        return subscribeOn;
    }

    public final void removeOnAMQPManagerListener(@NotNull OnAMQPManagerListener listener) {
        Intrinsics.checkParameterIsNotNull(listener, "listener");
        synchronized (this) {
            getMOnAMQPManagerListener().remove(listener);
        }
    }

    @NotNull
    public final Observable<Unit> unbindQueue(@NotNull String queueName, final boolean force) {
        Intrinsics.checkParameterIsNotNull(queueName, "queueName");
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (Object obj : this.pendingChans) {
            int i2 = i + 1;
            if (i < 0) {
                CollectionsKt.throwIndexOverflow();
            }
            if (Intrinsics.areEqual((String) ((Pair) obj).getFirst(), queueName)) {
                arrayList.add(Integer.valueOf(i));
            }
            i = i2;
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.pendingChans.remove(((Number) it.next()).intValue());
        }
        AMQPChannel aMQPChannel = getChanMap().get(queueName);
        if (aMQPChannel != null) {
            Observable map = unbindQueueInternal(aMQPChannel).doOnNext(new Consumer<AMQPChannel>() { // from class: com.readboy.live.education.mq.AMQPManager$unbindQueue$3
                @Override // io.reactivex.functions.Consumer
                public final void accept(AMQPChannel aMQPChannel2) {
                    HashMap chanMap;
                    if (force) {
                        chanMap = AMQPManager.this.getChanMap();
                        chanMap.remove(aMQPChannel2.getQueueName());
                    }
                }
            }).map(new Function<T, R>() { // from class: com.readboy.live.education.mq.AMQPManager$unbindQueue$4
                @Override // io.reactivex.functions.Function
                public /* bridge */ /* synthetic */ Object apply(Object obj2) {
                    apply((AMQPChannel) obj2);
                    return Unit.INSTANCE;
                }

                public final void apply(@NotNull AMQPChannel it2) {
                    Intrinsics.checkParameterIsNotNull(it2, "it");
                }
            });
            Intrinsics.checkExpressionValueIsNotNull(map, "unbindQueueInternal(chan…       Unit\n            }");
            return map;
        }
        Observable<Unit> just = Observable.just(Unit.INSTANCE);
        Intrinsics.checkExpressionValueIsNotNull(just, "Observable.just(Unit)");
        return just;
    }
}
