001package ca.uhn.fhir.jpa.subscription.module.subscriber; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2020 University Health Network 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.context.FhirContext; 024import ca.uhn.fhir.interceptor.api.HookParams; 025import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster; 026import ca.uhn.fhir.interceptor.api.Pointcut; 027import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription; 028import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription; 029import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry; 030import com.google.common.annotations.VisibleForTesting; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.springframework.beans.factory.annotation.Autowired; 034import org.springframework.messaging.Message; 035import org.springframework.messaging.MessageHandler; 036import org.springframework.messaging.MessagingException; 037 038public abstract class BaseSubscriptionDeliverySubscriber implements MessageHandler { 039 private static final Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionDeliverySubscriber.class); 040 041 @Autowired 042 protected FhirContext myFhirContext; 043 @Autowired 044 protected SubscriptionRegistry mySubscriptionRegistry; 045 @Autowired 046 private IInterceptorBroadcaster myInterceptorBroadcaster; 047 048 @Override 049 public void handleMessage(Message theMessage) throws MessagingException { 050 if (!(theMessage.getPayload() instanceof ResourceDeliveryMessage)) { 051 ourLog.warn("Unexpected payload type: {}", theMessage.getPayload()); 052 return; 053 } 054 055 ResourceDeliveryMessage msg = (ResourceDeliveryMessage) theMessage.getPayload(); 056 String subscriptionId = msg.getSubscriptionId(myFhirContext); 057 if (subscriptionId == null) { 058 ourLog.warn("Subscription has no ID, ignoring"); 059 return; 060 } 061 062 ActiveSubscription updatedSubscription = mySubscriptionRegistry.get(msg.getSubscription().getIdElement(myFhirContext).getIdPart()); 063 if (updatedSubscription != null) { 064 msg.setSubscription(updatedSubscription.getSubscription()); 065 } 066 067 try { 068 069 // Interceptor call: SUBSCRIPTION_BEFORE_DELIVERY 070 HookParams params = new HookParams() 071 .add(ResourceDeliveryMessage.class, msg) 072 .add(CanonicalSubscription.class, msg.getSubscription()); 073 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_DELIVERY, params)) { 074 return; 075 } 076 077 handleMessage(msg); 078 079 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 080 myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY, params); 081 082 } catch (Exception e) { 083 084 String errorMsg = "Failure handling subscription payload for subscription: " + subscriptionId; 085 ourLog.error(errorMsg, e); 086 087 // Interceptor call: SUBSCRIPTION_AFTER_DELIVERY 088 HookParams hookParams = new HookParams() 089 .add(ResourceDeliveryMessage.class, msg) 090 .add(Exception.class, e); 091 if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_DELIVERY_FAILED, hookParams)) { 092 return; 093 } 094 095 throw new MessagingException(theMessage, errorMsg, e); 096 } 097 } 098 099 public abstract void handleMessage(ResourceDeliveryMessage theMessage) throws Exception; 100 101 @VisibleForTesting 102 void setFhirContextForUnitTest(FhirContext theCtx) { 103 myFhirContext = theCtx; 104 } 105 106 @VisibleForTesting 107 void setInterceptorBroadcasterForUnitTest(IInterceptorBroadcaster theInterceptorBroadcaster) { 108 myInterceptorBroadcaster = theInterceptorBroadcaster; 109 } 110 111 @VisibleForTesting 112 void setSubscriptionRegistryForUnitTest(SubscriptionRegistry theSubscriptionRegistry) { 113 mySubscriptionRegistry = theSubscriptionRegistry; 114 } 115 116 public IInterceptorBroadcaster getInterceptorBroadcaster() { 117 return myInterceptorBroadcaster; 118 } 119}