001package ca.uhn.fhir.jpa.subscription.module.subscriber;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2020 University Health Network
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.context.FhirContext;
024import ca.uhn.fhir.interceptor.api.HookParams;
025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
028import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
029import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
030import com.google.common.annotations.VisibleForTesting;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.springframework.beans.factory.annotation.Autowired;
034import org.springframework.messaging.Message;
035import org.springframework.messaging.MessageHandler;
036import org.springframework.messaging.MessagingException;
037
038public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandler {
039        private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class);
040
041        @Autowired
042        protected FhirContext myFhirContext;
043        @Autowired
044        protected SubscriptionRegistry mySubscriptionRegistry;
045        @Autowired
046        private IInterceptorBroadcaster myInterceptorBroadcaster;
047
048        @Override
049        public void handleMessage(Message theMessage) throws MessagingException {
050                if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) {
051                        ourLog.warn("Unexpected payload type: {}", theMessage.getPayload());
052                        return;
053                }
054
055                ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload();
056                String subscriptionId = msg.getSubscriptionId(myFhirContext);
057                if (subscriptionId == null) {
058                        ourLog.warn("Subscription has no ID, ignoring");
059                        return;
060                }
061
062                ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart());
063                if (updatedSubscription != null) {
064                        msg.setSubscription(updatedSubscription.getSubscription());
065                }
066
067                try {
068
069                        // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY
070                        HookParams params = new HookParams()
071                                .add(ResourceDeliveryMessage.class, msg)
072                                .add(CanonicalSubscription.class, msg.getSubscription());
073                        if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, params)) {
074                                return;
075                        }
076
077                        handleMessage(msg);
078
079                        // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY
080                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, params);
081
082                } catch (Exception e) {
083
084                        String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId;
085                        ourLog.error(errorMsg, e);
086
087                        // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY
088                        HookParams hookParams = new HookParams()
089                                .add(ResourceDeliveryMessage.class, msg)
090                                .add(Exception.class, e);
091                        if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, hookParams)) {
092                                return;
093                        }
094
095                        throw new MessagingException(theMessage, errorMsg, e);
096                }
097        }
098
099        public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception;
100
101        @VisibleForTesting
102        void setFhirContextForUnitTest(FhirContext theCtx) {
103                myFhirContext = theCtx;
104        }
105
106        @VisibleForTesting
107        void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) {
108                myInterceptorBroadcaster = theInterceptorBroadcaster;
109        }
110
111        @VisibleForTesting
112        void setSubscriptionRegistryForUnitTest(SubscriptionRegistry theSubscriptionRegistry) {
113                mySubscriptionRegistry = theSubscriptionRegistry;
114        }
115
116        public IInterceptorBroadcaster getInterceptorBroadcaster() {
117                return myInterceptorBroadcaster;
118        }
119}