001package ca.uhn.fhir.jpa.subscription.module.channel; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2020 University Health Network 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.jpa.model.entity.ModelConfig; 024import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; 025import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; 026import com.google.common.annotations.VisibleForTesting; 027import com.google.common.collect.Multimap; 028import com.google.common.collect.MultimapBuilder; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031import org.springframework.beans.factory.annotation.Autowired; 032import org.springframework.messaging.MessageHandler; 033import org.springframework.messaging.SubscribableChannel; 034import org.springframework.stereotype.Component; 035 036import java.util.Collection; 037import java.util.Optional; 038 039@Component 040public class SubscriptionChannelRegistry { 041 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); 042 043 private final SubscriptionChannelCache mySubscriptionChannelCache = new SubscriptionChannelCache(); 044 // This map is a reference count so we know to destroy the channel when there are no more active subscriptions using it 045 // Key Channel Name, Value Subscription Id 046 private final Multimap<String, String> myActiveSubscriptionByChannelName = MultimapBuilder.hashKeys().arrayListValues().build(); 047 048 @Autowired 049 private SubscriptionDeliveryHandlerFactory mySubscriptionDeliveryHandlerFactory; 050 @Autowired 051 private SubscriptionChannelFactory mySubscriptionDeliveryChannelFactory; 052 @Autowired 053 private ModelConfig myModelConfig; 054 055 public synchronized void add(ActiveSubscription theActiveSubscription) { 056 if (!myModelConfig.isSubscriptionMatchingEnabled()) { 057 return; 058 } 059 String channelName = theActiveSubscription.getChannelName(); 060 ourLog.info("Adding subscription {} to channel {}", theActiveSubscription.getId(), channelName); 061 myActiveSubscriptionByChannelName.put(channelName, theActiveSubscription.getId()); 062 063 if (mySubscriptionChannelCache.containsKey(channelName)) { 064 ourLog.info("Channel {} already exists. Not creating.", channelName); 065 return; 066 } 067 068 SubscribableChannel deliveryChannel; 069 Optional<MessageHandler> deliveryHandler; 070 071 deliveryChannel = mySubscriptionDeliveryChannelFactory.newDeliveryChannel(channelName); 072 deliveryHandler = mySubscriptionDeliveryHandlerFactory.createDeliveryHandler(theActiveSubscription.getChannelType()); 073 074 SubscriptionChannelWithHandlers subscriptionChannelWithHandlers = new SubscriptionChannelWithHandlers(channelName, deliveryChannel); 075 deliveryHandler.ifPresent(subscriptionChannelWithHandlers::addHandler); 076 mySubscriptionChannelCache.put(channelName, subscriptionChannelWithHandlers); 077 } 078 079 public synchronized void remove(ActiveSubscription theActiveSubscription) { 080 if (!myModelConfig.isSubscriptionMatchingEnabled()) { 081 return; 082 } 083 String channelName = theActiveSubscription.getChannelName(); 084 ourLog.info("Removing subscription {} from channel {}", theActiveSubscription.getId() ,channelName); 085 boolean removed = myActiveSubscriptionByChannelName.remove(channelName, theActiveSubscription.getId()); 086 if (!removed) { 087 ourLog.warn("Failed to remove subscription {} from channel {}", theActiveSubscription.getId() ,channelName); 088 } 089 090 // This was the last one. Close and remove the channel 091 if (!myActiveSubscriptionByChannelName.containsKey(channelName)) { 092 SubscriptionChannelWithHandlers channel = mySubscriptionChannelCache.get(channelName); 093 if (channel != null) { 094 channel.close(); 095 } 096 mySubscriptionChannelCache.closeAndRemove(channelName); 097 } 098 } 099 100 public synchronized SubscriptionChannelWithHandlers get(String theChannelName) { 101 return mySubscriptionChannelCache.get(theChannelName); 102 } 103 104 public synchronized int size() { 105 return mySubscriptionChannelCache.size(); 106 } 107 108 @VisibleForTesting 109 public void logForUnitTest() { 110 ourLog.info("{} Channels: {}", this, size()); 111 mySubscriptionChannelCache.logForUnitTest(); 112 for (String key : myActiveSubscriptionByChannelName.keySet()) { 113 Collection<String> list = myActiveSubscriptionByChannelName.get(key); 114 for (String value : list) { 115 ourLog.info("ActiveSubscriptionByChannelName {}: {}", key, value); 116 } 117 } 118 } 119}