001package ca.uhn.fhir.jpa.subscription.module.channel;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2020 University Health Network
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.model.entity.ModelConfig;
024import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
025import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
026import com.google.common.annotations.VisibleForTesting;
027import com.google.common.collect.Multimap;
028import com.google.common.collect.MultimapBuilder;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.springframework.beans.factory.annotation.Autowired;
032import org.springframework.messaging.MessageHandler;
033import org.springframework.messaging.SubscribableChannel;
034import org.springframework.stereotype.Component;
035
036import java.util.Collection;
037import java.util.Optional;
038
039@Component
040public class SubscriptionChannelRegistry {
041        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
042
043        private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache();
044        // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it
045        // Key Channel Name, Value Subscription Id
046        private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build();
047
048        @Autowired
049        private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory;
050        @Autowired
051        private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory;
052        @Autowired
053        private ModelConfig myModelConfig;
054
055        public synchronized void add(ActiveSubscription theActiveSubscription) {
056                if (!myModelConfig.isSubscriptionMatchingEnabled()) {
057                        return;
058                }
059                String channelName = theActiveSubscription.getChannelName();
060                ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName);
061                myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId());
062
063                if (mySubscriptionChannelCache.containsKey(channelName)) {
064                        ourLog.info("Channel {} already exists.  Not creating.", channelName);
065                        return;
066                }
067
068                SubscribableChannel deliveryChannel;
069                Optional<MessageHandler> deliveryHandler;
070
071                deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName);
072                deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType());
073
074                SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel);
075                deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler);
076                mySubscriptionChannelCache.put(channelName, subscriptionChannelWithHandlers);
077        }
078
079        public synchronized void remove(ActiveSubscription theActiveSubscription) {
080                if (!myModelConfig.isSubscriptionMatchingEnabled()) {
081                        return;
082                }
083                String channelName = theActiveSubscription.getChannelName();
084                ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
085                boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId());
086                if (!removed) {
087                        ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName);
088                }
089
090                // This was the last one.  Close and remove the channel
091                if (!myActiveSubscriptionByChannelName.containsKey(channelName)) {
092                        SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName);
093                        if (channel != null) {
094                                channel.close();
095                        }
096                        mySubscriptionChannelCache.closeAndRemove(channelName);
097                }
098        }
099
100        public synchronized SubscriptionChannelWithHandlers get(String theChannelName) {
101                return mySubscriptionChannelCache.get(theChannelName);
102        }
103
104        public synchronized int size() {
105                return mySubscriptionChannelCache.size();
106        }
107
108        @VisibleForTesting
109        public void logForUnitTest() {
110                ourLog.info("{} Channels: {}", this, size());
111                mySubscriptionChannelCache.logForUnitTest();
112                for (String key : myActiveSubscriptionByChannelName.keySet()) {
113                        Collection<String> list = myActiveSubscriptionByChannelName.get(key);
114                        for (String value : list) {
115                                ourLog.info("ActiveSubscriptionByChannelName {}: {}", key, value);
116                        }
117                }
118        }
119}