001package ca.uhn.fhir.jpa.subscription.module.cache; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2020 University Health Network 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.interceptor.api.HookParams; 024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 025import ca.uhn.fhir.interceptor.api.Pointcut; 026import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; 027import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer; 028import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry; 029import org.apache.commons.lang3.Validate; 030import org.hl7.fhir.instance.model.api.IBaseResource; 031import org.hl7.fhir.instance.model.api.IIdType; 032import org.hl7.fhir.r4.model.Subscription; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.springframework.beans.factory.annotation.Autowired; 036import org.springframework.stereotype.Component; 037 038import javax.annotation.PreDestroy; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.List; 042import java.util.Optional; 043 044/** 045 * Cache of active subscriptions. When a new subscription is added to the cache, a new Spring Channel is created 046 * and a new MessageHandler for that subscription is subscribed to that channel. These subscriptions, channels, and 047 * handlers are all caches in this registry so they can be removed it the subscription is deleted. 048 */ 049 050// TODO KHS Does jpa need a subscription registry if matching is disabled? 051@Component 052public class SubscriptionRegistry { 053 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class); 054 private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache(); 055 @Autowired 056 private SubscriptionCanonicalizer mySubscriptionCanonicalizer; 057 @Autowired 058 private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer; 059 @Autowired 060 private SubscriptionChannelRegistry mySubscriptionChannelRegistry; 061 @Autowired 062 private IInterceptorBroadcaster myInterceptorBroadcaster; 063 064 /** 065 * Constructor 066 */ 067 public SubscriptionRegistry() { 068 super(); 069 } 070 071 public ActiveSubscription get(String theIdPart) { 072 return myActiveSubscriptionCache.get(theIdPart); 073 } 074 075 public Collection<ActiveSubscription> getAll() { 076 return myActiveSubscriptionCache.getAll(); 077 } 078 079 private Optional<CanonicalSubscription> hasSubscription(IIdType theId) { 080 Validate.notNull(theId); 081 Validate.notBlank(theId.getIdPart()); 082 Optional<ActiveSubscription> activeSubscription = Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart())); 083 return activeSubscription.map(ActiveSubscription::getSubscription); 084 } 085 086 @SuppressWarnings("UnusedReturnValue") 087 private CanonicalSubscription registerSubscription(IIdType theId, IBaseResource theSubscription) { 088 Validate.notNull(theId); 089 String subscriptionId = theId.getIdPart(); 090 Validate.notBlank(subscriptionId); 091 Validate.notNull(theSubscription); 092 093 CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); 094 095 String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized); 096 097 ourLog.info("Registering active subscription {}", subscriptionId); 098 ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName); 099 mySubscriptionChannelRegistry.add(activeSubscription); 100 myActiveSubscriptionCache.put(subscriptionId, activeSubscription); 101 102 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED 103 HookParams params = new HookParams() 104 .add(CanonicalSubscription.class, canonicalized); 105 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); 106 107 return canonicalized; 108 } 109 110 public void unregisterSubscription(String theSubscriptionId) { 111 Validate.notNull(theSubscriptionId); 112 113 ourLog.info("Unregistering active subscription {}", theSubscriptionId); 114 ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId); 115 if (activeSubscription != null) { 116 mySubscriptionChannelRegistry.remove(activeSubscription); 117 } 118 } 119 120 @PreDestroy 121 public void unregisterAllSubscriptions() { 122 // Once to set flag 123 unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); 124 // Twice to remove 125 unregisterAllSubscriptionsNotInCollection(Collections.emptyList()); 126 } 127 128 void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) { 129 130 List<String> idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds); 131 for (String id : idsToDelete) { 132 unregisterSubscription(id); 133 } 134 } 135 136 public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) { 137 Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement()); 138 CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription); 139 140 if (existingSubscription.isPresent()) { 141 if (newSubscription.equals(existingSubscription.get())) { 142 // No changes 143 return false; 144 } 145 ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue()); 146 if (channelTypeSame(existingSubscription.get(), newSubscription)) { 147 ourLog.info("Channel type is same. Updating active subscription and re-using existing channel and handlers."); 148 updateSubscription(theSubscription); 149 return true; 150 } 151 unregisterSubscription(theSubscription.getIdElement().getIdPart()); 152 } 153 if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) { 154 registerSubscription(theSubscription.getIdElement(), theSubscription); 155 return true; 156 } else { 157 return false; 158 } 159 } 160 161 private void updateSubscription(IBaseResource theSubscription) { 162 IIdType theId = theSubscription.getIdElement(); 163 Validate.notNull(theId); 164 Validate.notBlank(theId.getIdPart()); 165 ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart()); 166 Validate.notNull(activeSubscription); 167 CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription); 168 activeSubscription.setSubscription(canonicalized); 169 170 // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED 171 HookParams params = new HookParams() 172 .add(CanonicalSubscription.class, canonicalized); 173 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params); 174 } 175 176 private boolean channelTypeSame(CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) { 177 return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType()); 178 } 179 180 public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) { 181 if (hasSubscription(theSubscription.getIdElement()).isPresent()) { 182 ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue()); 183 unregisterSubscription(theSubscription.getIdElement().getIdPart()); 184 return true; 185 } 186 return false; 187 } 188 189 public int size() { 190 return myActiveSubscriptionCache.size(); 191 } 192}