001package ca.uhn.fhir.jpa.subscription.module.cache;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2020 University Health Network
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
025import ca.uhn.fhir.interceptor.api.Pointcut;
026import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
027import ca.uhn.fhir.jpa.subscription.module.channel.ISubscriptionDeliveryChannelNamer;
028import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
029import org.apache.commons.lang3.Validate;
030import org.hl7.fhir.instance.model.api.IBaseResource;
031import org.hl7.fhir.instance.model.api.IIdType;
032import org.hl7.fhir.r4.model.Subscription;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.springframework.beans.factory.annotation.Autowired;
036import org.springframework.stereotype.Component;
037
038import javax.annotation.PreDestroy;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.List;
042import java.util.Optional;
043
044/**
045 * Cache of active subscriptions.  When a new subscription is added to the cache, a new Spring Channel is created
046 * and a new MessageHandler for that subscription is subscribed to that channel.  These subscriptions, channels, and
047 * handlers are all caches in this registry so they can be removed it the subscription is deleted.
048 */
049
050// TODO KHS Does jpa need a subscription registry if matching is disabled?
051@Component
052public class SubscriptionRegistry {
053        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionRegistry.class);
054        private final ActiveSubscriptionCache myActiveSubscriptionCache = new ActiveSubscriptionCache();
055        @Autowired
056        private SubscriptionCanonicalizer mySubscriptionCanonicalizer;
057        @Autowired
058        private ISubscriptionDeliveryChannelNamer mySubscriptionDeliveryChannelNamer;
059        @Autowired
060        private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
061        @Autowired
062        private IInterceptorBroadcaster myInterceptorBroadcaster;
063
064        /**
065         * Constructor
066         */
067        public SubscriptionRegistry() {
068                super();
069        }
070
071        public ActiveSubscription get(String theIdPart) {
072                return myActiveSubscriptionCache.get(theIdPart);
073        }
074
075        public Collection<ActiveSubscription> getAll() {
076                return myActiveSubscriptionCache.getAll();
077        }
078
079        private Optional<CanonicalSubscription> hasSubscription(IIdType theId) {
080                Validate.notNull(theId);
081                Validate.notBlank(theId.getIdPart());
082                Optional<ActiveSubscription> activeSubscription = Optional.ofNullable(myActiveSubscriptionCache.get(theId.getIdPart()));
083                return activeSubscription.map(ActiveSubscription::getSubscription);
084        }
085
086        @SuppressWarnings("UnusedReturnValue")
087        private CanonicalSubscription registerSubscription(IIdType theId, IBaseResource theSubscription) {
088                Validate.notNull(theId);
089                String subscriptionId = theId.getIdPart();
090                Validate.notBlank(subscriptionId);
091                Validate.notNull(theSubscription);
092
093                CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
094
095                String channelName = mySubscriptionDeliveryChannelNamer.nameFromSubscription(canonicalized);
096
097                ourLog.info("Registering active subscription {}", subscriptionId);
098                ActiveSubscription activeSubscription = new ActiveSubscription(canonicalized, channelName);
099                mySubscriptionChannelRegistry.add(activeSubscription);
100                myActiveSubscriptionCache.put(subscriptionId, activeSubscription);
101
102                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
103                HookParams params = new HookParams()
104                        .add(CanonicalSubscription.class, canonicalized);
105                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
106
107                return canonicalized;
108        }
109
110        public void unregisterSubscription(String theSubscriptionId) {
111                Validate.notNull(theSubscriptionId);
112
113                ourLog.info("Unregistering active subscription {}", theSubscriptionId);
114                ActiveSubscription activeSubscription = myActiveSubscriptionCache.remove(theSubscriptionId);
115                if (activeSubscription != null) {
116                        mySubscriptionChannelRegistry.remove(activeSubscription);
117                }
118        }
119
120        @PreDestroy
121        public void unregisterAllSubscriptions() {
122                // Once to set flag
123                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
124                // Twice to remove
125                unregisterAllSubscriptionsNotInCollection(Collections.emptyList());
126        }
127
128        void unregisterAllSubscriptionsNotInCollection(Collection<String> theAllIds) {
129
130                List<String> idsToDelete = myActiveSubscriptionCache.markAllSubscriptionsNotInCollectionForDeletionAndReturnIdsToDelete(theAllIds);
131                for (String id : idsToDelete) {
132                        unregisterSubscription(id);
133                }
134        }
135
136        public synchronized boolean registerSubscriptionUnlessAlreadyRegistered(IBaseResource theSubscription) {
137                Optional<CanonicalSubscription> existingSubscription = hasSubscription(theSubscription.getIdElement());
138                CanonicalSubscription newSubscription = mySubscriptionCanonicalizer.canonicalize(theSubscription);
139
140                if (existingSubscription.isPresent()) {
141                        if (newSubscription.equals(existingSubscription.get())) {
142                                // No changes
143                                return false;
144                        }
145                        ourLog.info("Updating already-registered active subscription {}", theSubscription.getIdElement().toUnqualified().getValue());
146                        if (channelTypeSame(existingSubscription.get(), newSubscription)) {
147                                ourLog.info("Channel type is same.  Updating active subscription and re-using existing channel and handlers.");
148                                updateSubscription(theSubscription);
149                                return true;
150                        }
151                        unregisterSubscription(theSubscription.getIdElement().getIdPart());
152                }
153                if (Subscription.SubscriptionStatus.ACTIVE.equals(newSubscription.getStatus())) {
154                        registerSubscription(theSubscription.getIdElement(), theSubscription);
155                        return true;
156                } else {
157                        return false;
158                }
159        }
160
161        private void updateSubscription(IBaseResource theSubscription) {
162                IIdType theId = theSubscription.getIdElement();
163                Validate.notNull(theId);
164                Validate.notBlank(theId.getIdPart());
165                ActiveSubscription activeSubscription = myActiveSubscriptionCache.get(theId.getIdPart());
166                Validate.notNull(activeSubscription);
167                CanonicalSubscription canonicalized = mySubscriptionCanonicalizer.canonicalize(theSubscription);
168                activeSubscription.setSubscription(canonicalized);
169
170                // Interceptor call: SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED
171                HookParams params = new HookParams()
172                        .add(CanonicalSubscription.class, canonicalized);
173                myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_ACTIVE_SUBSCRIPTION_REGISTERED, params);
174        }
175
176        private boolean channelTypeSame(CanonicalSubscription theExistingSubscription, CanonicalSubscription theNewSubscription) {
177                return theExistingSubscription.getChannelType().equals(theNewSubscription.getChannelType());
178        }
179
180        public boolean unregisterSubscriptionIfRegistered(IBaseResource theSubscription, String theStatusString) {
181                if (hasSubscription(theSubscription.getIdElement()).isPresent()) {
182                        ourLog.info("Removing {} subscription {}", theStatusString, theSubscription.getIdElement().toUnqualified().getValue());
183                        unregisterSubscription(theSubscription.getIdElement().getIdPart());
184                        return true;
185                }
186                return false;
187        }
188
189        public int size() {
190                return myActiveSubscriptionCache.size();
191        }
192}